from gvm.connections import TLSConnection from gvm.protocols.gmpv208 import Gmp, AliveTest from gvm.transforms import EtreeTransform from gvm.xml import pretty_print from time import time, sleep import logging import base64 import json from typing import Optional, Dict, List, Tuple # GVM Xpath Constants GVM_XPATH_ID = '@id' GVM_XPATH_NAME_TEXT = 'name/text()' GVM_XPATH_REPORT_ID_TEXT = "report_id/text()" GVM_XPATH_STATUS = '@status' GVM_XPATH_STATUS_TEXT = '@status_text' GVM_XPATH_STATUS_TEXT_2 = '@status/text' GVM_XPATH_STATUS_TEXT_3 = 'status/text()' GVM_XPATH_PROGRESS_TEXT = 'progress/text()' GVM_XPATH_INUSE_TEXT = 'in_use/text()' GVM_XPATH_LAST_REPORT_ID = 'last_report/report/@id' GVM_XPATH_REPORT_TEXT = 'report/text()' # GVM Status Constants GVM_STATUS_OK = "200" GVM_STATUS_CREATE_OK = "201" # Custom Exceptions class GvmException(Exception): pass # More Readable print function def pretty_json(j): return json.dumps(j,sort_keys=True,indent=4) # Class containing config ids class Configs: config = "9866edc1-8869-4e80-acac-d15d5647b4d9" scanner = "08b69003-5fc2-4037-a479-93b440211c73" ovs_ssh_credential = "b9af5845-8b87-4378-bca4-cee39a894c17" # Class containining format ids class ReportFormats: anonymous_xml = "5057e5cc-b825-11e4-9d0e-28d24461215b" csv_results = "c1645568-627a-11e3-a660-406186ea4fc5" itg = "77bd6c4a-1f62-11e1-abf0-406186ea4fc5" pdf = "c402cc3e-b531-11e1-9163-406186ea4fc5" txt = "a3810a62-1f62-11e1-9219-406186ea4fc5" xml = "a994b278-1f62-11e1-96ac-406186ea4fc5" class PortList: """ This class helps the managing of the GVM port_list object Attributes: client = client used to interact with gvm server (created from GVMClient class) name: str = name of port list object id: str = id returned after the creation in_use: str = state if the port_list object is in use """ def __init__(self, client, name: str, ports: List[str]): self.client = client self.name = name self.ports = ','.join(ports) # Retrieve port_list objs by name res = self.__get_info(filter = name) if len(res) == 0: # If no result retrieved, create it self.create() else: logging.debug("Already created. Collected from server") if len(res) > 1: # If one result has been collected, consider the first one msg = f"The port_list name {name} retrieved {len(res)} results" logging.warning(msg) logging.warning("The first one will be considered") # If one result has been collected, consider it self.name = res[0]['name'] self.id = res[0]['id'] self.in_use = res[0]['in_use'] # Search port_lists by id/name def __get_info(self, filter: str = "rows=-1") -> List[Dict[str, str]]: res = [] pls = self.client.get_port_lists(filter_string = filter) \ .xpath('port_list') for pl in pls: pl_name = str(pl.xpath(GVM_XPATH_NAME_TEXT)[0]) pl_id = str(pl.xpath(GVM_XPATH_ID)[0]) pl_in_use = str(pl.xpath(GVM_XPATH_INUSE_TEXT)[0]) res.append({"name": pl_name, "id": pl_id, "in_use": pl_in_use}) return res def create(self) -> None: res = self.client.create_port_list(self.name, self.ports) status = res.xpath(GVM_XPATH_STATUS)[0] status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] if status == GVM_STATUS_CREATE_OK: pl_id = str(res.xpath(GVM_XPATH_ID)[0]) res = self.__get_info(filter = pl_id) if len(res) > 0: if len(res) > 1: # Multiple objs retrieved, consider the first one msg = f"The port_list name {self.name}" msg += f" retrieved {len(res)} results" logging.warning(msg) logging.warning("The first one will be considered") self.name = res[0]['name'] self.id = res[0]['id'] self.in_use = res[0]['in_use'] msg = "Created port list obj. " msg += f"Name: {self.name}, id: {self.id}, ports: {self.ports}" logging.debug(msg) else: # No obj retrieved. Error during creation msg = f"The port_list name {self.name} retrieved 0 results after creations" logging.error(msg) else: msg = "ERROR during Port list creation. " msg += f"Status code: {status}, msg: {status_text}" logging.error(msg) raise GvmException(msg) def __str__(self): d = {'name': self.name, 'id': self.id, 'in_use': self.in_use} return pretty_json(d) def delete(self): logging.debug(f"Deletion port_list {self.name}") res = self.client.delete_port_list(self.id) self.client = None status = res.xpath(GVM_XPATH_STATUS)[0] status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] if status == GVM_STATUS_OK: logging.info(f"Port_list {self} DELETED") else: logging.error(f"ERROR during the port_list deletion {status}: {status_text}") class Target: """ This class helps the managing of the GVM target object Attributes: client = client used to interact with gvm server (created from GVMClient class) name: str = name of target object id: str = id returned after the creation in_use: str = state if the target object is in use """ def __init__(self, client, name: str, host: str, port_list: PortList): self.client = client self.name = name self.host = host self.pl = port_list # Retrieve targets objs by name res = self.__get_info(filter = name) if len(res) == 0: # If no result retrieved, create it self.create() else: logging.debug("Already created. Collected from server") if len(res) > 1: # If one result has been collected, consider the first one msg = f"The target name {name} retrieved {len(res)} results" logging.warning(msg) logging.warning("The first one will be considered") # If one result has been collected, consider it self.name = res[0]['name'] self.id = res[0]['id'] self.in_use = res[0]['in_use'] def __get_info(self, filter: str) -> List[Dict[str, str]]: res = [] targets = self.client.get_targets(filter_string = filter) \ .xpath('target') for target in targets: t_name = str(target.xpath(GVM_XPATH_NAME_TEXT)[0]) t_id = str(target.xpath(GVM_XPATH_ID)[0]) t_in_use = str(target.xpath(GVM_XPATH_INUSE_TEXT)[0]) res.append({"name": t_name, "id": t_id, "in_use": t_in_use}) return res def create(self) -> None: res = self.client.create_target( name = self.name, comment = "", hosts = [self.host], port_list_id = self.pl.id, ssh_credential_id = Configs.ovs_ssh_credential, alive_test = AliveTest('Consider Alive')) status = res.xpath(GVM_XPATH_STATUS)[0] status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] if status == GVM_STATUS_CREATE_OK: t_id = str(res.xpath(GVM_XPATH_ID)[0]) res = self.__get_info(filter = t_id) if len(res) == 0: # No obj retrieved. Error during creation msg = f"The target name {self.name} retrieved 0 results after creation" logging.error(msg) else: if len(res) > 1: # Multiple objs retrieved, consider the first one msg = f"The target id {t_id} retrieved {len(res)} results" logging.warning(msg) logging.warning("The first one will be considered") self.name = res[0]['name'] self.id = res[0]['id'] self.in_use = res[0]['in_use'] msg = "Created target obj. " msg += f"Name: {self.name}, id: {self.id}, host: {self.host}" logging.debug(msg) else: msg = "ERROR during Target creation. " msg += f"Status code: {status}, msg: {status_text}" raise GvmException(msg) def __str__(self): d = {'name': self.name, 'id': self.id, "in_use": self.in_use, 'host': self.host} return pretty_json(d) def delete(self): logging.debug(f"Deletion target {self.name}") res = self.client.delete_target(self.id) self.client = None status = res.xpath(GVM_XPATH_STATUS)[0] status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] if status == GVM_STATUS_OK: logging.info(f"Target {self} DELETED") else: logging.error(f"ERROR during the target deletion {status}: {status_text}") class Task: """ This class helps the managing of the GVM task object Attributes: client = client used to interact with gvm server (created from GVMClient class) name: str = name of task object id: str = id returned after the creation in_use: str = state if the task object is in use report_id: str = report id once task is completed Methods: start = starts the task stop = stops the task delete = deletes the tasks get_progress = retrieves current task status wait = waits for the task is completed save_report = saves report on file get_report_info = retrieves report information """ # Constants WAIT_SECONDS = 10 def __init__(self, client, name: str, target: Target) -> None: self.client = client self.name = name self.target = target # Retrieve task objs by name res = self.__get_info(filter = name) if len(res) == 0: # If no result retrieved, create it self.create() else: logging.debug("Already created. Collected from server") if len(res) > 1: # If one result has been collected, consider the first one msg = f"The port_list name {name} retrieved {len(res)} results" logging.warning(msg) logging.warning("The first one will be considered") self.name = res[0]['name'] self.id = res[0]['id'] self.in_use = res[0]['in_use'] self.status = res[0]['status'] self.report_id = res[0].get("report_id", None) def __get_info(self, filter: str) -> List[dict]: res = [] tasks = self.client.get_tasks(filter_string = filter) \ .xpath('task') for t in tasks: t_name = str(t.xpath(GVM_XPATH_NAME_TEXT)[0]) t_id = str(t.xpath(GVM_XPATH_ID)[0]) t_in_use = str(t.xpath(GVM_XPATH_INUSE_TEXT)[0]) t_status = str(t.xpath(GVM_XPATH_STATUS_TEXT_3)[0]) t_dict = {"name": t_name, "id": t_id, "in_use": t_in_use, "status": t_status} try: t_report_id = t.xpath(GVM_XPATH_LAST_REPORT_ID)[0] except Exception: pass else: t_dict['report_id'] = t_report_id res.append(t_dict) return res def create(self) -> None: res = self.client.create_task( name = self.name, config_id = Configs.config, target_id = self.target.id, scanner_id = Configs.scanner) status = res.xpath(GVM_XPATH_STATUS)[0] status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] if status == GVM_STATUS_CREATE_OK: t_id = str(res.xpath(GVM_XPATH_ID)[0]) res = self.__get_info(filter = t_id) if len(res) == 0: # No obj retrieved. Error during creation msg = f"The task id {t_id} retrieved 0 results after creation" logging.error(msg) else: if len(res) > 1: # Multiple objs retrieved, consider the first one msg = f"The task id {t_id} retrieved {len(res)} results" logging.warning(msg) logging.warning("The first one will be considered") self.name = res[0]['name'] self.id = res[0]['id'] self.in_use = res[0]['in_use'] self.status = res[0]['status'] self.report_id = res[0].get("report_id", None) msg = "Created task obj. " msg += f"Name: {self.name}, id: {self.id}" logging.debug(msg) else: msg = "ERROR during Task creation. " msg += f"Status code: {status}, msg: {status_text}" raise GvmException(msg) def __str__(self): d = {'name': self.name, 'id': self.id, 'in_use': self.in_use, 'status': self.status, 'report_id': self.report_id} return pretty_json(d) def start(self): res = self.client.start_task(self.id) self.report_id = res.xpath(GVM_XPATH_REPORT_ID_TEXT)[0] logging.info(f"Task {self} STARTED") def stop(self): res = self.client.stop_task(self.id) pretty_print(res) logging.info(f"Task {self} STARTED") def delete(self): logging.debug(f"Deletion task {self.name}") res = self.client.delete_task(self.id) self.client = None status = res.xpath(GVM_XPATH_STATUS)[0] status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] if status == GVM_STATUS_OK: logging.info(f"Task {self} DELETED") else: logging.error(f"ERROR during the task deletion {status}: {status_text}") def update_status(self): task_info = self.client.get_tasks(filter_string = self.id) \ .xpath('task')[0] self.name = task_info.xpath(GVM_XPATH_NAME_TEXT)[0] self.status = task_info.xpath(GVM_XPATH_STATUS_TEXT_3)[0] # New -> Requested -> Queued -> Running -> Done self.progress = int(task_info.xpath(GVM_XPATH_PROGRESS_TEXT)[0])# 0 0 0 0 -> 100 -1 self.in_use = task_info.xpath(GVM_XPATH_INUSE_TEXT)[0] try: self.report_id = task_info.xpath(GVM_XPATH_LAST_REPORT_ID)[0] except Exception: pass """d = { "name": self.name, "status": self.status, "progress":self.progress, "in_use":self.in_use, } logging.debug(f"update_status: \n {pretty_json(d)}")""" def wait(self, timeout: int = 7200) -> bool: start_time = time() logging.debug("Waiting for scans ends the task") while True: self.update_status() if self.status not in ["New","Requested","Queued","Running","Done"]: # ["Interrupted", ...] logging.warning(f"Task in the undesired status: '{self.status}'") return False if self.status == "Done" and self.progress == -1: logging.info("Task completed") return True if time() - start_time > timeout: logging.error("TIMEOUT during waiting for task ending") return False logging.debug(f"Waiting for the task ends. Now {int(time() - start_time)}s from start. Status: {self.status}") sleep(self.WAIT_SECONDS) def save_report(self, format: str, filename: str): res = self.client.get_report(self.report_id, report_format_id=format, ignore_pagination=True, details=True) code = str(res.xpath(GVM_XPATH_REPORT_TEXT)[0]) with open(filename, "wb") as fh: fh.write(base64.b64decode(code)) def get_report_info(self, accepted_issues: List[str], known_issues: List[str]) -> Dict: report = dict() res = self.client.get_report(self.report_id, report_format_id=ReportFormats.anonymous_xml, ignore_pagination=True, details="1") o_ids = res.xpath('report/report/results/result/nvt/@oid') severities = res.xpath('report/report/results/result/nvt/severities/@score') severities = list(map(lambda a : float(a), severities)) treats = res.xpath('report/report/results/result/threat/text()') ports = res.xpath('report/report/results/result/port/text()') glob_severity = -1 # severities are not negative glob_threat = 'None' for o, s, t, p in zip(o_ids, severities, treats, ports): msg = f"Detected oid: {o}, severity: {s}, threat: {t} and port: {p}" if s >= 4: # If severity is not negligible if o in accepted_issues: msg += " => ACCEPTED" else: if o in known_issues: msg += " => DROPPED (not accepted but known)" continue else: msg += " => NEW (not accepted and not known)" logging.debug(msg) if p in report: if s > report[p]['severity']: report[p] = {'severity': s, 'threat': t} else: report[p] = {'severity': s, 'threat': t} if s > glob_severity: glob_severity = s glob_threat = t report['global'] = {'threat': glob_threat, 'severity': glob_severity} return report class GVMClient(): """ This class provides API to interact with GVM in order to get, create and delete port_lists, targets, tasks and reports """ CONNECTION_RETRIES = 5 LOCAL_IP = "127.0.0.1" def __init__(self, auth_n: str, auth_p: str, host_ip: str = LOCAL_IP): self.auth_name = auth_n self.auth_passwd = auth_p self.host_ip = host_ip self.client = None self.create_client() def create_client(self): retry = self.CONNECTION_RETRIES while(retry > 0): try: logging.debug('Creation of the GMP Client') logging.debug(f'host_ip: {self.host_ip}') self.client = Gmp(TLSConnection(hostname = self.host_ip), transform=EtreeTransform()) break except Exception: logging.error(f"Connection error with the gmp endpoint. Remaining {retry} retries") retry -= 1 sleep(0.5) logging.debug('GMP Client Created') if retry == 0: raise GvmException("Impossible connect to the gmp endpoint even after 5 retries") self.client.authenticate(self.auth_name, self.auth_passwd) logging.debug('GMP Client Authenticated') def get_client(self): return self.client def get_version(self) -> str: res = self.client.get_version() return str(res.xpath('version/text()')[0])