from gvm.connections import TLSConnection from gvm.protocols.gmpv208 import Gmp, AliveTest from gvm.transforms import EtreeTransform from gvm.xml import pretty_print from time import time, sleep import logging import base64 import json from typing import Optional, Dict, List, Tuple import yaml from functools import reduce import os import git # GVM Xpath Constants GVM_XPATH_ID = '@id' GVM_XPATH_NAME_TEXT = 'name/text()' GVM_XPATH_REPORT_ID_TEXT = "report_id/text()" GVM_XPATH_STATUS = '@status' GVM_XPATH_STATUS_TEXT = '@status_text' GVM_XPATH_STATUS_TEXT_2 = '@status/text' GVM_XPATH_STATUS_TEXT_3 = 'status/text()' GVM_XPATH_PROGRESS_TEXT = 'progress/text()' GVM_XPATH_INUSE_TEXT = 'in_use/text()' GVM_XPATH_LAST_REPORT_ID = 'last_report/report/@id' GVM_XPATH_REPORT_TEXT = 'report/text()' # GVM Status Constants GVM_STATUS_OK = "200" GVM_STATUS_CREATE_OK = "201" # Custom Exceptions class GvmException(Exception): pass # More Readable print function def pretty_json(j) -> str: return json.dumps(j,sort_keys=True,indent=4) # Class containing config ids class Configs: config = "9866edc1-8869-4e80-acac-d15d5647b4d9" scanner = "08b69003-5fc2-4037-a479-93b440211c73" ovs_ssh_credential = "b9af5845-8b87-4378-bca4-cee39a894c17" # Class containining format ids class ReportFormats: anonymous_xml = "5057e5cc-b825-11e4-9d0e-28d24461215b" csv_results = "c1645568-627a-11e3-a660-406186ea4fc5" itg = "77bd6c4a-1f62-11e1-abf0-406186ea4fc5" pdf = "c402cc3e-b531-11e1-9163-406186ea4fc5" txt = "a3810a62-1f62-11e1-9219-406186ea4fc5" xml = "a994b278-1f62-11e1-96ac-406186ea4fc5" from dataclasses import dataclass @dataclass class ResultReport(): oid: str severity: float threat: str port: str def __str__(self): msg = f"{self.oid},{self.severity}," msg += f"{self.threat},{self.port}" return msg class PortList: """ This class helps the managing of the GVM port_list object Attributes: client = client used to interact with gvm server (created from GVMClient class) name: str = name of port list object id: str = id returned after the creation in_use: str = state if the port_list object is in use """ def __init__(self, client, name: str, ports: List[str]): self.client = client self.name = name self.ports = ','.join(ports) # Retrieve port_list objs by name res = self.__get_info(filter = name) if len(res) == 0: # If no result retrieved, create it self.create() else: logging.debug("Already created. Collected from server") if len(res) > 1: # If one result has been collected, consider the first one msg = f"The port_list name {name} retrieved {len(res)} results" logging.warning(msg) logging.warning("The first one will be considered") # If one result has been collected, consider it self.name = res[0]['name'] self.id = res[0]['id'] self.in_use = res[0]['in_use'] # Search port_lists by id/name def __get_info(self, filter: str = "rows=-1") -> List[Dict[str, str]]: res = [] pls = self.client.get_port_lists(filter_string = filter) \ .xpath('port_list') for pl in pls: pl_name = str(pl.xpath(GVM_XPATH_NAME_TEXT)[0]) pl_id = str(pl.xpath(GVM_XPATH_ID)[0]) pl_in_use = str(pl.xpath(GVM_XPATH_INUSE_TEXT)[0]) res.append({"name": pl_name, "id": pl_id, "in_use": pl_in_use}) return res def create(self) -> None: res = self.client.create_port_list(self.name, self.ports) status = res.xpath(GVM_XPATH_STATUS)[0] status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] if status == GVM_STATUS_CREATE_OK: pl_id = str(res.xpath(GVM_XPATH_ID)[0]) res = self.__get_info(filter = pl_id) if len(res) > 0: if len(res) > 1: # Multiple objs retrieved, consider the first one msg = f"The port_list name {self.name}" msg += f" retrieved {len(res)} results" logging.warning(msg) logging.warning("The first one will be considered") self.name = res[0]['name'] self.id = res[0]['id'] self.in_use = res[0]['in_use'] msg = "Created port list obj. " msg += f"Name: {self.name}, id: {self.id}, ports: {self.ports}" logging.debug(msg) else: # No obj retrieved. Error during creation msg = f"The port_list name {self.name} retrieved 0 results after creations" logging.error(msg) else: msg = "ERROR during Port list creation. " msg += f"Status code: {status}, msg: {status_text}" logging.error(msg) raise GvmException(msg) def __str__(self): d = {'name': self.name, 'id': self.id, 'in_use': self.in_use} return pretty_json(d) def delete(self): logging.debug(f"Deletion port_list {self.name}") res = self.client.delete_port_list(self.id) self.client = None status = res.xpath(GVM_XPATH_STATUS)[0] status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] if status == GVM_STATUS_OK: logging.info(f"Port_list {self} DELETED") else: logging.error(f"ERROR during the port_list deletion {status}: {status_text}") class Target: """ This class helps the managing of the GVM target object Attributes: client = client used to interact with gvm server (created from GVMClient class) name: str = name of target object id: str = id returned after the creation in_use: str = state if the target object is in use """ def __init__(self, client, name: str, host: str, port_list: PortList): self.client = client self.name = name self.host = host self.pl = port_list # Retrieve targets objs by name res = self.__get_info(filter = name) if len(res) == 0: # If no result retrieved, create it self.create() else: logging.debug("Already created. Collected from server") if len(res) > 1: # If one result has been collected, consider the first one msg = f"The target name {name} retrieved {len(res)} results" logging.warning(msg) logging.warning("The first one will be considered") # If one result has been collected, consider it self.name = res[0]['name'] self.id = res[0]['id'] self.in_use = res[0]['in_use'] def __get_info(self, filter: str) -> List[Dict[str, str]]: res = [] targets = self.client.get_targets(filter_string = filter) \ .xpath('target') for target in targets: t_name = str(target.xpath(GVM_XPATH_NAME_TEXT)[0]) t_id = str(target.xpath(GVM_XPATH_ID)[0]) t_in_use = str(target.xpath(GVM_XPATH_INUSE_TEXT)[0]) res.append({"name": t_name, "id": t_id, "in_use": t_in_use}) return res def create(self) -> None: res = self.client.create_target( name = self.name, comment = "", hosts = [self.host], port_list_id = self.pl.id, ssh_credential_id = Configs.ovs_ssh_credential, alive_test = AliveTest('Consider Alive')) status = res.xpath(GVM_XPATH_STATUS)[0] status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] if status == GVM_STATUS_CREATE_OK: t_id = str(res.xpath(GVM_XPATH_ID)[0]) res = self.__get_info(filter = t_id) if len(res) == 0: # No obj retrieved. Error during creation msg = f"The target name {self.name} retrieved 0 results after creation" logging.error(msg) else: if len(res) > 1: # Multiple objs retrieved, consider the first one msg = f"The target id {t_id} retrieved {len(res)} results" logging.warning(msg) logging.warning("The first one will be considered") self.name = res[0]['name'] self.id = res[0]['id'] self.in_use = res[0]['in_use'] msg = "Created target obj. " msg += f"Name: {self.name}, id: {self.id}, host: {self.host}" logging.debug(msg) else: msg = "ERROR during Target creation. " msg += f"Status code: {status}, msg: {status_text}" raise GvmException(msg) def __str__(self): d = {'name': self.name, 'id': self.id, "in_use": self.in_use, 'host': self.host} return pretty_json(d) def delete(self): logging.debug(f"Deletion target {self.name}") res = self.client.delete_target(self.id) self.client = None status = res.xpath(GVM_XPATH_STATUS)[0] status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] if status == GVM_STATUS_OK: logging.info(f"Target {self} DELETED") else: logging.error(f"ERROR during the target deletion {status}: {status_text}") class Task: """ This class helps the managing of the GVM task object Attributes: client = client used to interact with gvm server (created from GVMClient class) name: str = name of task object id: str = id returned after the creation in_use: str = state if the task object is in use report_id: str = report id once task is completed Methods: start = starts the task stop = stops the task delete = deletes the tasks get_progress = retrieves current task status wait = waits for the task is completed save_report = saves report on file get_report_info = retrieves report information """ # Constants WAIT_SECONDS = 10 def __init__(self, client, name: str, target: Target) -> None: self.client = client self.name = name self.target = target # Retrieve task objs by name res = self.__get_info(filter = name) if len(res) == 0: # If no result retrieved, create it self.create() else: logging.debug("Already created. Collected from server") if len(res) > 1: # If one result has been collected, consider the first one msg = f"The port_list name {name} retrieved {len(res)} results" logging.warning(msg) logging.warning("The first one will be considered") self.name = res[0]['name'] self.id = res[0]['id'] self.in_use = res[0]['in_use'] self.status = res[0]['status'] self.report_id = res[0].get("report_id", None) def __get_info(self, filter: str) -> List[dict]: res = [] tasks = self.client.get_tasks(filter_string = filter) \ .xpath('task') for t in tasks: t_name = str(t.xpath(GVM_XPATH_NAME_TEXT)[0]) t_id = str(t.xpath(GVM_XPATH_ID)[0]) t_in_use = str(t.xpath(GVM_XPATH_INUSE_TEXT)[0]) t_status = str(t.xpath(GVM_XPATH_STATUS_TEXT_3)[0]) t_dict = {"name": t_name, "id": t_id, "in_use": t_in_use, "status": t_status} try: t_report_id = t.xpath(GVM_XPATH_LAST_REPORT_ID)[0] except Exception: pass else: t_dict['report_id'] = t_report_id res.append(t_dict) return res def create(self) -> None: res = self.client.create_task( name = self.name, config_id = Configs.config, target_id = self.target.id, scanner_id = Configs.scanner) status = res.xpath(GVM_XPATH_STATUS)[0] status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] if status == GVM_STATUS_CREATE_OK: t_id = str(res.xpath(GVM_XPATH_ID)[0]) res = self.__get_info(filter = t_id) if len(res) == 0: # No obj retrieved. Error during creation msg = f"The task id {t_id} retrieved 0 results after creation" logging.error(msg) else: if len(res) > 1: # Multiple objs retrieved, consider the first one msg = f"The task id {t_id} retrieved {len(res)} results" logging.warning(msg) logging.warning("The first one will be considered") self.name = res[0]['name'] self.id = res[0]['id'] self.in_use = res[0]['in_use'] self.status = res[0]['status'] self.report_id = res[0].get("report_id", None) msg = "Created task obj. " msg += f"Name: {self.name}, id: {self.id}" logging.debug(msg) else: msg = "ERROR during Task creation. " msg += f"Status code: {status}, msg: {status_text}" raise GvmException(msg) def __str__(self): d = {'name': self.name, 'id': self.id, 'in_use': self.in_use, 'status': self.status, 'report_id': self.report_id} return pretty_json(d) def start(self): res = self.client.start_task(self.id) self.report_id = res.xpath(GVM_XPATH_REPORT_ID_TEXT)[0] logging.info(f"Task {self} STARTED") def stop(self): res = self.client.stop_task(self.id) pretty_print(res) logging.info(f"Task {self} STARTED") def delete(self): logging.debug(f"Deletion task {self.name}") res = self.client.delete_task(self.id) self.client = None status = res.xpath(GVM_XPATH_STATUS)[0] status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] if status == GVM_STATUS_OK: logging.info(f"Task {self} DELETED") else: logging.error(f"ERROR during the task deletion {status}: {status_text}") def update_status(self): task_info = self.client.get_tasks(filter_string = self.id) \ .xpath('task')[0] self.name = task_info.xpath(GVM_XPATH_NAME_TEXT)[0] self.status = task_info.xpath(GVM_XPATH_STATUS_TEXT_3)[0] # New -> Requested -> Queued -> Running -> Done self.progress = int(task_info.xpath(GVM_XPATH_PROGRESS_TEXT)[0])# 0 0 0 0 -> 100 -1 self.in_use = task_info.xpath(GVM_XPATH_INUSE_TEXT)[0] try: self.report_id = task_info.xpath(GVM_XPATH_LAST_REPORT_ID)[0] except Exception: pass """d = { "name": self.name, "status": self.status, "progress":self.progress, "in_use":self.in_use, } logging.debug(f"update_status: \n {pretty_json(d)}")""" def wait(self, timeout: int = 7200) -> bool: start_time = time() logging.debug("Waiting for scans ends the task") while True: self.update_status() if self.status not in ["New","Requested","Queued","Processing", "Running","Done"]: # ["Interrupted", ...] logging.warning(f"Task in the undesired status: '{self.status}'") return False if self.status == "Done" and self.progress == -1: logging.info("Task completed") return True if time() - start_time > timeout: logging.error("TIMEOUT during waiting for task ending") return False logging.debug(f"Waiting for the task ends. Now {int(time() - start_time)}s from start. Status: {self.status}") sleep(self.WAIT_SECONDS) def save_report(self, format: str, filename: str): res = self.client.get_report(self.report_id, report_format_id=format, ignore_pagination=True, details=True) code = str(res.xpath(GVM_XPATH_REPORT_TEXT)[0]) with open(filename, "wb") as fh: fh.write(base64.b64decode(code)) def get_report(self) -> List[Tuple[str,str,str,str]]: res = self.client.get_report(self.report_id, report_format_id=ReportFormats.anonymous_xml, ignore_pagination=True, details="1") o_ids: list[str] = res.xpath('report/report/results/result/nvt/@oid') severities: list[str] = res.xpath('report/report/results/result/nvt/severities/@score') severities: list[float] = list(map(lambda a : float(a), severities)) treats: list[str] = res.xpath('report/report/results/result/threat/text()') ports: list[str] = res.xpath('report/report/results/result/port/text()') return [ResultReport(o,s,t,p) for o,s,t,p in zip(o_ids, severities, treats, ports)] class GVMClient(): """ This class provides API to interact with GVM in order to get, create and delete port_lists, targets, tasks and reports """ CONNECTION_RETRIES = 5 LOCAL_IP = "127.0.0.1" def __init__(self, auth_n: str, auth_p: str, host_ip: str = LOCAL_IP): self.auth_name = auth_n self.auth_passwd = auth_p self.host_ip = host_ip self.client = None self.create_client() def create_client(self): retry = self.CONNECTION_RETRIES while(retry > 0): try: logging.debug('Creation of the GMP Client') logging.debug(f'host_ip: {self.host_ip}') self.client = Gmp(TLSConnection(hostname = self.host_ip), transform=EtreeTransform()) break except Exception: logging.error(f"Connection error with the gmp endpoint. Remaining {retry} retries") retry -= 1 sleep(0.5) logging.debug('GMP Client Created') if retry == 0: raise GvmException("Impossible connect to the gmp endpoint even after 5 retries") self.client.authenticate(self.auth_name, self.auth_passwd) logging.debug('GMP Client Authenticated') def get_client(self): return self.client def get_version(self) -> str: res = self.client.get_version() return str(res.xpath('version/text()')[0]) class ReportManager(): # CLASSIFICATION configuration SEVERITY_THR = 4 MSG_OK = "OK" MSG_NOK = "NOK" DEFAULT_SEVERITY = -1 MAX_SEVERITY = 9 DEFAULT_THREAT = "None" # REPORT Keywords REPORT_DEPLOYMENT = "deployment" REPORT_GLOBAL = "global" REPORT_SEVERITY = "severity" REPORT_THREAT = "threat" # OIDS Classes OID_ACCEPTED = 'accepted-oids' OID_NEW = 'new-oids' OID_DROPPED = 'dropped-oids' OID_OS = 'os-related-oids' OID_CLASSES = (OID_ACCEPTED, OID_NEW, OID_DROPPED, OID_OS) # OS security repository configuration OS_GIT_REPO = "baltig.infn.it/infn-cloud/os_security_checks.git" OS_SEC_BRANCH = "new-oids" OS_SEC_USER = "GIT_OS_SEC_USER" OS_SEC_TOKEN = "GIT_OS_SEC_TOKEN" OS_SEC_DEST_DIR = "os-sc-repo" OS_SEC_FILENAME = "os-oids.yaml" OS_COMMIT_MESSAGE = 'Added oid(s)' # Security scans repository configuration SS_GIT_REPO = "baltig.infn.it/infn-cloud/security-scans.git" SS_SEC_USER = "GIT_SEC_USER" SS_SEC_TOKEN = "GIT_SEC_TOKEN" SS_SEC_DEST_DIR = "ss-repo" SS_SEC_CHILD_DIR = "queues" SS_SEC_ACCEPTED_FILES = ['accepted.txt'] SS_SEC_KNOWN_FILES = ['held.txt', 'new.txt', 'overridden.txt'] def __init__(self, os_name: str, is_os: bool) -> None: self.os_name = os_name self.is_os = is_os self.imported_oids: Dict[str, List[ResultReport]] = dict() self.import_os_sec_repo() self.import_security_oids() def import_yaml_file(self) -> dict: if os.path.isfile(self.os_file): with open(self.os_file, 'r') as ifile: oids = yaml.load(ifile) return oids else: return dict() def import_os_sec_repo(self): self.os_file = os.path.join(self.OS_SEC_DEST_DIR, self.OS_SEC_FILENAME) user = os.environ.get(self.OS_SEC_USER) token = os.environ.get(self.OS_SEC_TOKEN) repo_url = f"https://{user}:{token}@{self.OS_GIT_REPO}" try: git.Repo.clone_from(repo_url, self.OS_SEC_DEST_DIR, branch = self.OS_SEC_BRANCH) except Exception as e: logging.error(f"Impossible clone the os scans repository, {e}") self.os_oids = dict() self.os_all_oids = [] else: os_oids = self.import_yaml_file() logging.debug("Imported host os security oids") logging.debug(pretty_json(os_oids)) if not isinstance(os_oids, dict): logging.warning("Impossible parse the oids yaml file") self.os_oids = dict() self.os_all_oids = [] else: self.os_oids = os_oids try: self.os_all_oids = list(reduce(lambda x,y: x + y, os_oids.values())) logging.debug("Imported os security oids") logging.debug(pretty_json(self.os_all_oids)) except Exception as e: logging.warning("Impossible extract oids from imported yaml") self.os_all_oids = [] def import_security_oids(self) -> None: user = os.environ.get(self.SS_SEC_USER) token = os.environ.get(self.SS_SEC_TOKEN) repo_url = f"https://{user}:{token}@{self.SS_GIT_REPO}" files_dir = os.path.join(self.SS_SEC_DEST_DIR,self.SS_SEC_CHILD_DIR) try: git.Repo.clone_from(repo_url, self.SS_SEC_DEST_DIR) except Exception as e: logging.warning(f"Impossible clone the ss scans repository, {e}") self.accepted_oids = [] self.known_oids = [] else: accepted_oids: List[str] = [] known_oids: List[str] = [] for f in self.SS_SEC_ACCEPTED_FILES: filename = os.path.join(files_dir,f) with open(filename, 'r') as file: accepted_oids += [line.strip() for line in file.readlines() if not line.startswith('#')] for f in self.SS_SEC_KNOWN_FILES: filename = os.path.join(files_dir,f) with open(filename, 'r') as file: known_oids += [line.strip() for line in file.readlines() if not line.startswith('#')] self.accepted_oids = accepted_oids self.known_oids = known_oids def import_report(self, host: str, report: List[ResultReport]): self.imported_oids[host] = report def init_glob_vars(self): self.report = dict() self.report[self.REPORT_DEPLOYMENT] = {self.REPORT_SEVERITY: self.DEFAULT_SEVERITY, self.REPORT_THREAT: self.DEFAULT_THREAT} self.oids = dict() def init_host_vars(self,host: str, r: ResultReport): self.oids[host] = {self.OID_ACCEPTED: [], self.OID_DROPPED: [], self.OID_NEW: [], self.OID_OS: []} self.report[host] = dict() self.report[host][r.port] = {self.REPORT_SEVERITY: self.DEFAULT_SEVERITY, self.REPORT_THREAT: self.DEFAULT_THREAT} self.report[host][self.REPORT_GLOBAL] = {self.REPORT_SEVERITY: self.DEFAULT_SEVERITY, self.REPORT_THREAT: self.DEFAULT_THREAT} def update_summary(self,host, r: ResultReport) -> None: # Evaluate max port severity per host if r.port not in self.report[host] or \ r.severity > self.report[host][r.port][self.REPORT_SEVERITY]: self.report[host][r.port] = {self.REPORT_SEVERITY: r.severity, self.REPORT_THREAT: r.threat} # Evaluate max global severity per host if r.severity > self.report[host][self.REPORT_GLOBAL][self.REPORT_SEVERITY]: self.report[host][self.REPORT_GLOBAL] = {self.REPORT_SEVERITY: r.severity, self.REPORT_THREAT: r.threat} # Evaluate Global max severity if r.severity > self.report[self.REPORT_DEPLOYMENT][self.REPORT_SEVERITY]: self.report[self.REPORT_DEPLOYMENT] = {self.REPORT_SEVERITY: r.severity, self.REPORT_THREAT: r.threat} def classify_reports(self) -> None: # Init global aggregated variables self.init_glob_vars() for host, host_report in self.imported_oids.items(): for res_report in host_report: # Init aggregated variables per host self.init_host_vars(host,res_report) # Skip if oid is not relevant if res_report.severity < self.SEVERITY_THR: continue # Classify oid if not self.is_os and res_report.oid in self.os_all_oids: self.oids[host][self.OID_OS].append(res_report) elif res_report.oid in self.accepted_oids: self.oids[host][self.OID_ACCEPTED].append(res_report) self.update_summary(host, res_report) elif res_report.oid in self.known_oids: self.oids[host][self.OID_DROPPED].append(res_report) else: self.oids[host][self.OID_NEW].append(res_report) self.update_summary(host, res_report) # Extract global estimation if self.report[self.REPORT_DEPLOYMENT][self.REPORT_SEVERITY] < self.SEVERITY_THR: self.report[self.REPORT_GLOBAL] = self.MSG_OK else: self.report[self.REPORT_GLOBAL] = self.MSG_NOK def get_summary(self) -> str: return pretty_json(self.report) def get_classified_oids(self) -> str: return pretty_json(self.oids) def create_msg(self, r: ResultReport): msg = f" Detected oid: {r.oid}, severity: {r.severity}" msg += f", threat: {r.threat} and port: {r.port}\n" return msg def write_data(self, summary_filename: str, oids_filename: str): # Save on file report summary self.write_summary(summary_filename) # If this script scanned a single_vm deployment # delete the empty OID_OS section in oid and # push the new os oids if self.is_os: del self.oids[self.OID_OS] self.write_new_oids() #self.push_new_oids() # Save on file all classifies oids self.write_oids(oids_filename) def write_new_oids(self) -> None: # Overwrite the detected oids to the host oids self.os_oids[self.os_name] = self.oids[self.OID_ACCEPTED] + \ self.oids[self.OID_NEW] with open(self.os_file, 'w') as file: yaml.dump(self.os_oids, file) def write_oids(self, oids_filename) -> None: with open(oids_filename, 'w') as file: yaml.dump(self.os_oids, file) def push_new_oids(self): repo = git.Repo(self.OS_SEC_DEST_DIR) repo.git.add(self.os_file) repo.index.commit(self.OS_COMMIT_MESSAGE) origin = repo.remote('origin') origin.push() logging.info("New oid file successfully pushed") def write_summary(self, summary_filename) -> None: with open(summary_filename, 'w') as file: yaml.dump(self.report, file)