import time, sys, json from botocore.vendored import requests from datetime import datetime, timedelta #from auth import nvdToken #### READ ME ### ## # -- rename file from .txt to .py # -- Checks the NVD Database for any new or updated vulnerabilities that have been released within the past 60 minutes. # -- This file has been modified to run in lambda, to which lambda needs run hourly. # -- To test locally, uncomment lines 18, 117 and 118. Comment line 19 # -- Data has been redacted and the webhook would need to be updated with a working Slack Webhook # -- Obtaining an API token from NVD is not required for this call. 24 calls within a day is well within their API Token-less requirements # -- Vulnerabilities may be mssing a v3.1, v3.0 and/or v2 CVSS scores. A "missingScore" will replace the value in this case. # -- Lastly - due to the amount of vulns that occur, the minimum CVSS rating is 8. Adjust this accordingly. #def main(): def lambda_handler(event, context): #currentHour, currentDate, currentMonth, currentYear = getDateTime() #lambda runs in UTC. I was having trouble setting the Z value in the url #to 00:00, so I set the deltaHours to 8 hours back instead of 1 to #compensate for this setback pHour, pDate, pMonth, pYear = getPreviousDateTime(8) x = 0 # time.sleep(6) #while x < 50: results = askNVD(pHour, pDate, pMonth, pYear) messageText = "" for i in results: messageText += f"{i}\n" sendSlackMessage(messageText) def getPreviousDateTime(deltaHours): lastHourDate = datetime.now() - timedelta(hours = deltaHours) pHour = lastHourDate.strftime("%H") pDate = lastHourDate.strftime("%d") pMonth = lastHourDate.strftime("%m") pYear = lastHourDate.strftime("%Y") return pHour, pDate, pMonth, pYear def askNVD(pHour, pDate, pMonth, pYear): nvdToken = "4eebab29-2d4e-4f86-b1d2-290cea49eb3e" rootUrl = "https://services.nvd.nist.gov/rest/json/cves/2.0/?" customUrl = f'lastModStartDate={pYear}-{pMonth}-{pDate}T{pHour}:00:00.000%2D07:00&lastModEndDate={pYear}-{pMonth}-{pDate}T{pHour}:59:59.999%2D07:00' fullUrl = rootUrl + customUrl headers = { 'Accept': 'application/json', 'Content-Type': 'application/json', 'Authorization': 'Bearer {}'.format(nvdToken) } response = requests.get(fullUrl, headers=headers)#, json=data) jsonResponse = response.json() noResults = jsonResponse["resultsPerPage"] results = [] myCount = 0 while myCount < noResults: cveID = jsonResponse["vulnerabilities"][myCount]["cve"]["id"] cveDesc = jsonResponse["vulnerabilities"][myCount]["cve"]["descriptions"][0]["value"] cveDescTrunc = cveDesc[0:99] try: cveScore = jsonResponse["vulnerabilities"][myCount]["cve"]["metrics"]["cvssMetricV31"][0]["cvssData"]["baseScore"] except: try: cveScore = jsonResponse["vulnerabilities"][myCount]["cve"]["metrics"]["cvssMetricV30"][0]["cvssData"]["baseScore"] except: try: cveScore = jsonResponse["vulnerabilities"][myCount]["cve"]["metrics"]["cvssMetricV2"][0]["cvssData"]["baseScore"] except: cveScore = "missingScore" if cveScore != "missingScore": if float(cveScore) >= 8: cveUrl = f"https://nvd.nist.gov/vuln/detail/{cveID}" lineItem = f"<{cveUrl}|{cveID}> {cveScore} {cveDescTrunc}..." results.append(lineItem) myCount += 1 return results def sendSlackMessage(data): """Send a Slack message to a channel via a webhook. Args: payload (dict): Dictionary containing Slack message, i.e. {"text": "This is a test"} webhook (str): Full Slack webhook URL for your chosen channel. Returns: HTTP response code, i.e. """ webhook = "https://hooks.slack.com/services/[redacted]/[redacted]/[redacted]" payload = { "text": data, "username": "NVD", "icon_url": "https://cdn.vox-cdn.com/thumbor/cIgoJOxP3gYNl7z4wSzGsXygVtI=/0x0:2524x1730/1400x1400/filters:focal(1262x865:1263x866)/cdn.vox-cdn.com/uploads/chorus_asset/file/24528220/Screenshot_2023_03_22_at_09.43.05.png" } reqResults = requests.post(webhook, json.dumps(payload)) print(reqResults) return # if __name__ == "__main__": # main()