from gvm.connections import TLSConnection from gvm.protocols.gmpv208 import Gmp, AliveTest from gvm.transforms import EtreeTransform from gvm.xml import pretty_print from time import time, sleep import logging import base64 import json from typing import Optional, Dict, List, Tuple def pretty_json(j): return json.dumps(j,sort_keys=True,indent=4) class Configs: config = "9866edc1-8869-4e80-acac-d15d5647b4d9" scanner = "08b69003-5fc2-4037-a479-93b440211c73" ovs_ssh_credential = "b9af5845-8b87-4378-bca4-cee39a894c17" class ReportFormats: anonymous_xml = "5057e5cc-b825-11e4-9d0e-28d24461215b" csv_results = "c1645568-627a-11e3-a660-406186ea4fc5" itg = "77bd6c4a-1f62-11e1-abf0-406186ea4fc5" pdf = "c402cc3e-b531-11e1-9163-406186ea4fc5" txt = "a3810a62-1f62-11e1-9219-406186ea4fc5" xml = "a994b278-1f62-11e1-96ac-406186ea4fc5" class PortList: """ This class helps the managing of the GVM port_list object Attributes: client = client used to interact with gvm server (created from GVMClient class) name: str = name of port list object id: str = id returned after the creation in_use: str = state if the port_list object is in use """ def __init__(self, name: str = "", client = None, id: str = None, in_use: str = None): self.client = client, self.name = name self.id = id self.in_use = in_use def __str__(self): d = {'name': self.name, 'id': self.id, 'in_use': self.in_use} return pretty_json(d) def __del__(self): self.delete() def delete(self): res = self.client.delete_port_list(self.id) status = res.xpath('@status')[0] status_text = res.xpath('@status_text')[0] if status == "200": logging.info(f"Port_list {self} DELETED") else: logging.error(f"ERROR during the port_list deletion {status}: {status_text}") class Target: """ This class helps the managing of the GVM target object Attributes: client = client used to interact with gvm server (created from GVMClient class) name: str = name of target object id: str = id returned after the creation in_use: str = state if the target object is in use """ def __init__(self, name: str = "", id: str = "", in_use: str = "", hosts: str = "", client = None, port_list: PortList = None): self.client = client self.name = name self.id = id self.in_use = in_use self.hosts = hosts self.port_list = port_list def __str__(self): d = {'name': self.name, 'id': self.id, "in_use": self.in_use, 'hosts': self.hosts} return pretty_json(d) def __del__(self): self.delete() self.port_list.delete() def delete(self): res = self.client.delete_target(self.id) status = res.xpath('@status')[0] status_text = res.xpath('@status_text')[0] if status == "200": logging.info(f"Target {self} DELETED") else: logging.error(f"ERROR during the target deletion {status}: {status_text}") class Task: """ This class helps the managing of the GVM task object Attributes: client = client used to interact with gvm server (created from GVMClient class) name: str = name of task object id: str = id returned after the creation in_use: str = state if the task object is in use report_id: str = report id once task is completed Methods: start = starts the task stop = stops the task delete = deletes the tasks get_progress = retrieves current task status wait = waits for the task is completed save_report = saves report on file get_report_info = retrieves report information """ def __init__(self, name: str = "", id: str = "", client = None, in_use: str = "", status: str = "", progress: str = "", report_id: str = None, target: Target = None): self.client = client self.name = name self.id = id self.in_use = in_use self.status = status self.progress = progress self.report_id = report_id self.target = target def __str__(self): d = {'name': self.name, 'id': self.id, 'in_use': self.in_use, 'status': self.status, 'report_id': self.report_id} return pretty_json(d) def start(self): res = self.client.start_task(self.id) self.report_id = res.xpath('report_id/text()')[0] logging.info(f"Task {self} STARTED") def stop(self): res = self.client.stop_task(self.id) pretty_print(res) logging.info(f"Task {self} STARTED") def delete(self): res = self.client.delete_task(self.id) status = res.xpath('@status')[0] status_text = res.xpath('@status_text')[0] if status == "200": logging.info(f"Task {self} DELETED") else: logging.error(f"ERROR during the task deletion {status}: {status_text}") def __del__(self): self.delete() self.target.delete() def update_status(self): tasks_info = self.client.get_tasks(filter_string = self.id) task_info = tasks_info.xpath('task')[0] self.name = task_info.xpath('name/text()')[0] self.status = task_info.xpath('status/text()')[0] # New -> Requested -> Queued -> Running -> Done self.progress = int(task_info.xpath('progress/text()')[0])# 0 0 0 0 -> 100 -1 self.in_use = task_info.xpath('in_use/text()')[0] try: self.report_id = task_info.xpath('last_report/report/@id')[0] except: pass def wait(self, timeout: int = 3600) -> bool: start_time = time() logging.debug("Waiting for scans ends the task") while True: self.update_status() if self.status not in ["New","Requested","Queued","Running","Done"]: # ["Interrupted", ...] logging.warning(f"Task in the undesired status: '{self.status}'") return False if self.status == "Done" and self.progress == -1: logging.info(f"Task completed") return True if time() - start_time > timeout: logging.error("TIMEOUT during waiting for task ending") return False logging.debug(f"Waiting for the task ends. Now {int(time() - start_time)}s from start. Status: {self.status}") sleep(10) def save_report(self, report_format: str, report_filename: str): res = self.client.get_report(self.report_id, report_format_id=report_format, ignore_pagination=True, details="1") code = str(res.xpath('report/text()')[0]) with open(report_filename, "wb") as fh: fh.write(base64.b64decode(code)) def get_report_info(self) -> Dict: report = dict() res = self.client.get_report(self.report_id, report_format_id=ReportFormats.anonymous_xml, ignore_pagination=True, details="1") threats = res.xpath('report/report/ports/port/threat/text()') ports = res.xpath('report/report/ports/port/text()') severities = res.xpath('report/report/ports/port/severity/text()') severities = list(map(lambda a : float(a), severities)) for p,t,s in zip(ports, threats, severities): report[p] = {'severity': s, 'threat': t} glob_severity = -1 # returned severities are null or positive glob_threat = 'Log' for threat,severity in zip(threats,severities): if severity > glob_severity: glob_severity = severity glob_threat = threat glob_severity = severity report['global'] = {'threat': glob_threat, 'severity': glob_severity} return report class GVMClient(): """ This class provides API to interact with GVM in order to get, create and delete port_lists, targets, tasks and reports """ CONNECTION_RETRIES = 5 LOCAL_IP = "127.0.0.1" def __init__(self, auth_n: str, auth_p: str, host_ip: str = LOCAL_IP): self.auth_name = auth_n self.auth_passwd = auth_p self.host_ip = host_ip self.client = None self.create_client() def create_client(self): retry = self.CONNECTION_RETRIES while(retry > 0): try: self.client = Gmp(TLSConnection(hostname = self.host_ip), transform=EtreeTransform()) break except: logging.error(f"Connection error with the gmp endpoint. Remaining {retry} retries") retry -= 1 sleep(0.5) if retry == 0: raise Exception("Impossible connect to the gmp endpoint even after 5 retries") self.client.authenticate(self.auth_name, self.auth_passwd) def get_version(self) -> str: res = self.client.get_version() return str(res.xpath('version/text()')[0]) def get_port_lists(self, filter: str = "rows=-1") -> List[PortList]: res = [] client_res = self.client.get_port_lists(filter_string = filter) for pl in client_res.xpath('port_list'): o = PortList() o.client = self.client o.name = pl.xpath('name/text()')[0] o.id = pl.xpath('@id')[0] o.in_use = pl.xpath('in_use/text()')[0] res.append(o) return res def create_port_list(self, name: str, ports: List[str]) -> Optional[PortList]: res = self.client.create_port_list(name, ','.join(ports)) status = res.xpath('@status')[0] status_text = res.xpath('@status_text')[0] if status == "201": pl_id = str(res.xpath('@id')[0]) client_res = self.client.get_port_lists(filter_string = pl_id) pl_info = client_res.xpath('port_list')[0] o = PortList() o.client = self.client o.name = pl_info.xpath('name/text()')[0] o.id = pl_info.xpath('@id')[0] o.in_use = pl_info.xpath('in_use/text()')[0] logging.debug(f'Created port list obj. Name: {name}, id: {id}, ports: {ports}') else: logging.error(f"ERROR during Port list creation. Status code: {status}, msg: {status_text}") msg = f"ERROR during Port list creation. Status code: {status}, msg: {status_text}" raise Exception(msg) def delete_port_list(self, pl: PortList): res = self.client.delete_port_list(pl.id) status = res.xpath('@status')[0] status_text = res.xpath('@status_text')[0] if status == "200": logging.info(f"Port_list {pl} DELETED") else: logging.error(f"ERROR {status}: {status_text}") def get_or_create_port_list(self, pl_name: str, ports: List[str]) -> PortList: res = self.get_port_lists(pl_name) if len(res) == 0: return self.create_port_list(pl_name, ports) elif len(res) == 1: return res[0] else: logging.warning(f"Found {len(res)} results.") return res def create_target(self, name: str, ip: str, pl: PortList) -> Optional[Target]: res = self.client.create_target( name = name, comment = "", hosts = [ip], port_list_id = pl.id, ssh_credential_id = Configs.ovs_ssh_credential, alive_test = AliveTest('Consider Alive')) status = res.xpath('@status')[0] status_text = res.xpath('@status_text')[0] if status == "201": target_id = str(res.xpath('@id')[0]) client_res = self.client.get_targets(filter_string = target_id) target_info = client_res.xpath('target')[0] t = Target() t.client = self.client t.name = target_info.xpath('name/text()')[0] t.id = target_info.xpath('@id')[0] t.in_use = target_info.xpath('in_use/text()')[0] t.port_list = pl return t else: msg = f"ERROR during Target creation. Status code: {status}, msg: {status_text}" raise Exception(msg) def get_targets(self, filter: str) -> List[Target]: res = [] targets = self.client.get_targets(filter_string = filter) for target in targets.xpath('target'): t = Target() t.client = self.client t.name = target.xpath('name/text()')[0] t.id = target.xpath('@id')[0] t.in_use = target.xpath('in_use/text()')[0] res.append(t) return res def delete_target(self, target: Target): res = self.client.delete_target(target.id) status = res.xpath('@status')[0] status_text = res.xpath('@status_text')[0] if status == "200": logging.info(f"Target {target} DELETED") else: logging.error(f"ERROR {status}: {status_text}") def get_or_create_target(self, name: str, ip: str, port_list: PortList) -> Target: res = self.get_targets(name) if len(res) == 0: return self.create_target(name, ip, port_list) elif len(res) == 1: res[0].port_list = port_list return res[0] else: logging.warning(f"Found {len(res)} targets. Return None") return None def search_and_delete_target(self, target_name: str): targets = self.get_targets(target_name) if len(targets) == 1: self.delete_target(targets[0]['id']) else: raise("Multiple results for search") def search_and_delete_all_targets(self, target_name: str): targets = self.get_targets(target_name) for target in targets: self.delete_target(target) def create_task(self, name: str, target: Target) -> Optional[Task]: res = self.client.create_task( name = name, config_id = Configs.config, target_id = target.id, scanner_id = Configs.scanner) status = res.xpath('@status')[0] status_text = res.xpath('@status_text')[0] if status == "201": task_id = str(res.xpath('@id')[0]) client_res = self.client.get_tasks(filter_string = task_id) task_info = client_res.xpath('task')[0] t = Task() t.client = self.client t.name = task_info.xpath('name/text()')[0] t.id = task_info.xpath('@id')[0] t.in_use = task_info.xpath('in_use/text()')[0] t.status = task_info.xpath('status/text()')[0] try: t.report_id = task_info.xpath('last_report/report/@id')[0] except: pass t.target = target return t else: msg = f"ERROR during Task creation. Status code: {status}, msg: {status_text}" raise Exception(msg) def get_tasks(self, filter: str) -> List[Task]: list_of_tasks = [] tasks = self.client.get_tasks(filter_string = filter) for task in tasks.xpath('task'): t = Task() t.client = self.client t.name = task.xpath('name/text()')[0] t.id = task.xpath('@id')[0] t.in_use = task.xpath('in_use/text()')[0] t.status = task.xpath('status/text()')[0] try: t.report_id = task.xpath('last_report/report/@id')[0] except: pass list_of_tasks.append(t) return list_of_tasks def get_or_create_task(self, task_name: str, target: Target) -> Task: res = self.get_tasks(task_name) if len(res) == 0: return self.create_task(task_name, target) elif len(res) == 1: res[0].target = target return res[0] else: print(f"Found {len(res)} results. Return None") return res def delete_all_tasks(self, filter: str): tasks = self.get_tasks(filter) for task in tasks: self.delete_task(task) def get_report_formats(self): res = self.client.get_report_formats() for f in res.xpath('report_format'): name = f.xpath('name/text()')[0] id = f.xpath('@id')[0] print(f"Report format id: '{id}' and name: '{name}'")