Skip to content
Snippets Groups Projects
gvm_library.py 19.7 KiB
Newer Older
  • Learn to ignore specific revisions
  • from gvm.connections import TLSConnection
    from gvm.protocols.gmpv208 import Gmp, AliveTest 
    from gvm.transforms import EtreeTransform
    from gvm.xml import pretty_print
    from time import time, sleep
    import logging
    import base64
    import json
    
    from typing import Optional, Dict, List, Tuple
    
    
    # GVM Xpath Constants
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    GVM_XPATH_ID = '@id'
    
    GVM_XPATH_NAME_TEXT = 'name/text()'
    GVM_XPATH_REPORT_ID_TEXT = "report_id/text()"
    GVM_XPATH_STATUS = '@status'
    GVM_XPATH_STATUS_TEXT = '@status_text'
    GVM_XPATH_STATUS_TEXT_2 = '@status/text'
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    GVM_XPATH_STATUS_TEXT_3 = 'status/text()'
    
    GVM_XPATH_PROGRESS_TEXT = 'progress/text()'
    GVM_XPATH_INUSE_TEXT = 'in_use/text()'
    GVM_XPATH_LAST_REPORT_ID = 'last_report/report/@id'
    GVM_XPATH_REPORT_TEXT = 'report/text()'
    
    # GVM Status Constants
    GVM_STATUS_OK = "200"
    GVM_STATUS_CREATE_OK = "201"
    
    
    # Custom Exceptions
    class GvmException(Exception):
        pass
    
    # More Readable print function
    
    def pretty_json(j):
        return json.dumps(j,sort_keys=True,indent=4)
    
    
    # Class containing config ids
    
    class Configs:
        config = "9866edc1-8869-4e80-acac-d15d5647b4d9"
        scanner = "08b69003-5fc2-4037-a479-93b440211c73"
        ovs_ssh_credential = "b9af5845-8b87-4378-bca4-cee39a894c17"
    
    
    # Class containining format ids
    
    class ReportFormats:
        anonymous_xml = "5057e5cc-b825-11e4-9d0e-28d24461215b"
        csv_results   = "c1645568-627a-11e3-a660-406186ea4fc5"
        itg           = "77bd6c4a-1f62-11e1-abf0-406186ea4fc5"
        pdf           = "c402cc3e-b531-11e1-9163-406186ea4fc5"
        txt           = "a3810a62-1f62-11e1-9219-406186ea4fc5"
        xml           = "a994b278-1f62-11e1-96ac-406186ea4fc5"
    
    class PortList:
        """
        This class helps the managing of the GVM port_list object
        
        Attributes:
            client = client used to interact with gvm server (created from GVMClient class)
            name: str = name of port list object
            id: str = id returned after the creation
            in_use: str = state if the port_list object is in use
        """
    
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
        def __init__(self,
                     client, 
                     name: str, 
                     ports: List[str]):
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            self.client = client
    
            self.name = name
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            self.ports = ','.join(ports)
            
            # Retrieve port_list objs by name
            res = self.__get_info(filter = name)
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            if len(res) == 0: 
                # If no result retrieved, create it
                self.create()
            
            else:
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                logging.debug("Already created. Collected from server")
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                if len(res) > 1:
                    # If one result has been collected, consider the first one
                    msg = f"The port_list name {name} retrieved {len(res)} results"
                    logging.warning(msg)
                    logging.warning("The first one will be considered")
                
                # If one result has been collected, consider it
                self.name = res[0]['name']
                self.id = res[0]['id']
                self.in_use = res[0]['in_use']
     
        # Search port_lists by id/name
        def __get_info(self, filter: str = "rows=-1") -> List[Dict[str, str]]:
            res = []
            pls = self.client.get_port_lists(filter_string = filter) \
                             .xpath('port_list')
            
            for pl in pls:
                pl_name = str(pl.xpath(GVM_XPATH_NAME_TEXT)[0])
                pl_id = str(pl.xpath(GVM_XPATH_ID)[0])
                pl_in_use = str(pl.xpath(GVM_XPATH_INUSE_TEXT)[0])
                res.append({"name": pl_name, 
                            "id": pl_id, 
                            "in_use": pl_in_use})
            return res
    
        def create(self) -> None:
            res = self.client.create_port_list(self.name, self.ports)
            status = res.xpath(GVM_XPATH_STATUS)[0]
            status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
            if status == GVM_STATUS_CREATE_OK:
                pl_id = str(res.xpath(GVM_XPATH_ID)[0])
                res = self.__get_info(filter = pl_id)
                
                if len(res) > 0:
                    if len(res) > 1:
                        # Multiple objs retrieved, consider the first one
                        msg = f"The port_list name {self.name}"
                        msg += f" retrieved {len(res)} results"
                        logging.warning(msg)
                        logging.warning("The first one will be considered")
                    self.name = res[0]['name']
                    self.id = res[0]['id']
                    self.in_use = res[0]['in_use']
                    msg = "Created port list obj. "
                    msg += f"Name: {self.name}, id: {self.id}, ports: {self.ports}"
                    logging.debug(msg)
    
                else:
                    # No obj retrieved. Error during creation
                    msg = f"The port_list name {self.name} retrieved 0 results after creations"
                    logging.error(msg)
    
            else:
                msg = "ERROR during Port list creation. "
                msg += f"Status code: {status}, msg: {status_text}"
                logging.error(msg)
                raise GvmException(msg) 
    
    
        def __str__(self):
            d = {'name': self.name, 
                 'id': self.id, 
    
                 'in_use': self.in_use}
    
            return pretty_json(d)
        
    
        def delete(self):
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            logging.debug(f"Deletion port_list {self.name}")
    
            res = self.client.delete_port_list(self.id)
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            self.client = None
    
            status = res.xpath(GVM_XPATH_STATUS)[0]
            status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
            if status == GVM_STATUS_OK:
    
                logging.info(f"Port_list {self} DELETED") 
            else:
                logging.error(f"ERROR during the port_list deletion {status}: {status_text}")
    
    class Target:
        """
        This class helps the managing of the GVM target object
        
        Attributes:
            client = client used to interact with gvm server (created from GVMClient class)
            name: str = name of target object
            id: str = id returned after the creation
            in_use: str = state if the target object is in use
        """
        
    
        def __init__(self,
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                     client,
                     name: str,
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                     host: str,
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                     port_list: PortList):
    
            self.client = client
            self.name = name
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            self.host = host
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            self.pl = port_list
            
            # Retrieve targets objs by name
            res = self.__get_info(filter = name)
            
            if len(res) == 0: 
                # If no result retrieved, create it
                self.create()
            
            else:
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                logging.debug("Already created. Collected from server")
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                if len(res) > 1:
                    # If one result has been collected, consider the first one
                    msg = f"The target name {name} retrieved {len(res)} results"
                    logging.warning(msg)
                    logging.warning("The first one will be considered")
                
                # If one result has been collected, consider it
                self.name = res[0]['name']
                self.id = res[0]['id']
                self.in_use = res[0]['in_use']
     
        def __get_info(self, filter: str) -> List[Dict[str, str]]:
            res = []
            targets = self.client.get_targets(filter_string = filter) \
                                 .xpath('target')
            for target in targets:
                t_name = str(target.xpath(GVM_XPATH_NAME_TEXT)[0])
                t_id  = str(target.xpath(GVM_XPATH_ID)[0])
                t_in_use = str(target.xpath(GVM_XPATH_INUSE_TEXT)[0])
                res.append({"name": t_name, 
                            "id": t_id, 
                            "in_use": t_in_use})
            return res  
        
        def create(self) -> None:
            res = self.client.create_target(
                    name = self.name,
                    comment = "",
                    hosts = [self.host],
                    port_list_id = self.pl.id,
                    ssh_credential_id = Configs.ovs_ssh_credential,
                    alive_test = AliveTest('Consider Alive'))
            
            status = res.xpath(GVM_XPATH_STATUS)[0]
            status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
            if status == GVM_STATUS_CREATE_OK:
                t_id = str(res.xpath(GVM_XPATH_ID)[0])
                res = self.__get_info(filter = t_id)
                
                if len(res) == 0:
                    # No obj retrieved. Error during creation
                    msg = f"The target name {self.name} retrieved 0 results after creation"
                    logging.error(msg)
                else:
                    if len(res) > 1:
                        # Multiple objs retrieved, consider the first one
                        msg = f"The target id {t_id} retrieved {len(res)} results"
                        logging.warning(msg)
                        logging.warning("The first one will be considered")   
    
                    self.name = res[0]['name']
                    self.id = res[0]['id']
                    self.in_use = res[0]['in_use']
                    msg = "Created target obj. "
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                    msg += f"Name: {self.name}, id: {self.id}, host: {self.host}"
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                    logging.debug(msg)
            else:
                msg = "ERROR during Target creation. "
                msg += f"Status code: {status}, msg: {status_text}"
                raise GvmException(msg)
    
            
        def __str__(self):
            d = {'name': self.name, 
                 'id': self.id, 
                 "in_use": self.in_use,
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                 'host': self.host}
    
            return pretty_json(d)
        
    
        def delete(self):
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            logging.debug(f"Deletion target {self.name}")
    
            res = self.client.delete_target(self.id)
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            self.client = None
    
            status = res.xpath(GVM_XPATH_STATUS)[0]
            status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
            if status == GVM_STATUS_OK:
    
                logging.info(f"Target {self} DELETED") 
            else:
                logging.error(f"ERROR during the target deletion {status}: {status_text}")
    
    class Task:
        """
        This class helps the managing of the GVM task object
        
        Attributes:
            client = client used to interact with gvm server (created from GVMClient class)
            name: str = name of task object
            id: str = id returned after the creation
            in_use: str = state if the task object is in use
            report_id: str = report id once task is completed
            
        Methods:
            start = starts the task
            stop = stops the task
            delete = deletes the tasks
            get_progress = retrieves current task status
            wait = waits for the task is completed
            save_report = saves report on file
            get_report_info = retrieves report information
        """
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
        # Constants
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
        WAIT_SECONDS = 10
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    
    
        def __init__(self, 
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                     client,
                     name: str, 
                     target: Target) -> None:
    
            self.client = client
            self.name = name
    
            self.target = target
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    
            # Retrieve task objs by name
            res = self.__get_info(filter = name)
    
            if len(res) == 0: 
                # If no result retrieved, create it
                self.create()
            else:
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                logging.debug("Already created. Collected from server")
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                if len(res) > 1:
                    # If one result has been collected, consider the first one
                    msg = f"The port_list name {name} retrieved {len(res)} results"
                    logging.warning(msg)
                    logging.warning("The first one will be considered")
    
                self.name = res[0]['name']
                self.id = res[0]['id']
                self.in_use = res[0]['in_use']
                self.status = res[0]['status']
                self.report_id = res[0].get("report_id", None)
    
        def __get_info(self, filter: str) -> List[dict]: 
            res = []
            tasks = self.client.get_tasks(filter_string = filter) \
                               .xpath('task')
            for t in tasks:
                t_name = str(t.xpath(GVM_XPATH_NAME_TEXT)[0])
                t_id  = str(t.xpath(GVM_XPATH_ID)[0])
                t_in_use = str(t.xpath(GVM_XPATH_INUSE_TEXT)[0])
                t_status = str(t.xpath(GVM_XPATH_STATUS_TEXT_3)[0])
                t_dict = {"name": t_name, 
                          "id": t_id, 
                          "in_use": t_in_use, 
                          "status": t_status}
                try:
                    t_report_id = t.xpath(GVM_XPATH_LAST_REPORT_ID)[0]
                except Exception:
                    pass
                else:
                    t_dict['report_id'] = t_report_id
                
                res.append(t_dict)
            return res    
    
        def create(self) -> None:
            res = self.client.create_task(
                    name = self.name,
                    config_id = Configs.config,
                    target_id = self.target.id,
                    scanner_id = Configs.scanner)
            
            status = res.xpath(GVM_XPATH_STATUS)[0]
            status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
            
            if status == GVM_STATUS_CREATE_OK:
                t_id = str(res.xpath(GVM_XPATH_ID)[0])
                res = self.__get_info(filter = t_id)
                if len(res) == 0:
                    # No obj retrieved. Error during creation
                    msg = f"The task id {t_id} retrieved 0 results after creation"
                    logging.error(msg)
                else:
                    if len(res) > 1:
                        # Multiple objs retrieved, consider the first one
                        msg = f"The task id {t_id} retrieved {len(res)} results"
                        logging.warning(msg)
                        logging.warning("The first one will be considered")
    
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                    self.name = res[0]['name']
                    self.id = res[0]['id']
                    self.in_use = res[0]['in_use']
                    self.status = res[0]['status']
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                    self.report_id = res[0].get("report_id", None)
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                    msg = "Created task obj. "
                    msg += f"Name: {self.name}, id: {self.id}"
                    logging.debug(msg)
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            else:
                msg = "ERROR during Task creation. "
                msg += f"Status code: {status}, msg: {status_text}"
                raise GvmException(msg)
    
            
        def __str__(self):
            d = {'name': self.name, 
                 'id': self.id, 
                 'in_use': self.in_use,
                 'status': self.status,
    
                 'report_id': self.report_id}
    
            return pretty_json(d)
        
        def start(self):
            res = self.client.start_task(self.id)
    
            self.report_id = res.xpath(GVM_XPATH_REPORT_ID_TEXT)[0]
    
            logging.info(f"Task {self} STARTED") 
    
    
        def stop(self):
            res = self.client.stop_task(self.id)
            pretty_print(res)
    
            logging.info(f"Task {self} STARTED") 
    
    
        def delete(self):
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            logging.debug(f"Deletion task {self.name}")
            res = self.client.delete_task(self.id)
            self.client = None
            status = res.xpath(GVM_XPATH_STATUS)[0]
            status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
            if status == GVM_STATUS_OK:
                logging.info(f"Task {self} DELETED") 
            else:
                logging.error(f"ERROR during the task deletion {status}: {status_text}")
    
        def update_status(self):
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            task_info = self.client.get_tasks(filter_string = self.id) \
                                   .xpath('task')[0]
    
            self.name = task_info.xpath(GVM_XPATH_NAME_TEXT)[0]
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            self.status = task_info.xpath(GVM_XPATH_STATUS_TEXT_3)[0]       #   New -> Requested -> Queued -> Running  -> Done
    
            self.progress = int(task_info.xpath(GVM_XPATH_PROGRESS_TEXT)[0])#    0         0           0      0 -> 100     -1
            self.in_use = task_info.xpath(GVM_XPATH_INUSE_TEXT)[0]
    
                self.report_id = task_info.xpath(GVM_XPATH_LAST_REPORT_ID)[0]
    
            except Exception:
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            """d = {
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                "name": self.name,
                "status": self.status,
                "progress":self.progress,
                "in_use":self.in_use,
            }
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            logging.debug(f"update_status: \n {pretty_json(d)}")"""
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    
    
        def wait(self, timeout: int = 7200) -> bool:
    
            start_time = time()
            logging.debug("Waiting for scans ends the task")
            while True:
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                self.update_status()
    
                if self.status not in ["New","Requested","Queued","Running","Done"]: # ["Interrupted", ...]
                    logging.warning(f"Task in the undesired status: '{self.status}'")
    
                    return False
    
                if self.status == "Done" and self.progress == -1:
    
                    logging.info("Task completed")
    
                    return True
                if time() - start_time > timeout:
                    logging.error("TIMEOUT during waiting for task ending")
                    return False
    
                logging.debug(f"Waiting for the task ends. Now {int(time() - start_time)}s from start. Status: {self.status}")
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                sleep(self.WAIT_SECONDS)
    
        def save_report(self, format: str, filename: str):
    
            res = self.client.get_report(self.report_id,
    
                                         report_format_id=format, 
    
                                         ignore_pagination=True,
    
                                         details=True)
    
            code = str(res.xpath(GVM_XPATH_REPORT_TEXT)[0])
    
            with open(filename, "wb") as fh:
    
                fh.write(base64.b64decode(code))
                
    
        def get_report_info(self,
                            accepted_issues: List[str],
                            known_issues: List[str]) -> Dict:
    
            report = dict()
            res = self.client.get_report(self.report_id,
                                         report_format_id=ReportFormats.anonymous_xml,
                                         ignore_pagination=True,
                                         details="1")
    
            o_ids = res.xpath('report/report/results/result/nvt/@oid')
            severities = res.xpath('report/report/results/result/nvt/severities/@score')
            severities = list(map(lambda a : float(a), severities))
    
            treats = res.xpath('report/report/results/result/threat/text()')
    
            ports = res.xpath('report/report/results/result/port/text()')
    
            glob_severity = -1 # severities are not negative
    
            glob_threat = 'None'
    
            for o, s, t, p in zip(o_ids, severities, treats, ports):
    
                logging.debug(f"Detected oid: {o}, severity: {s}, threat: {t} and port: {p}")
    
                if (o not in accepted_issues) and (o in known_issues):
    
                    logging.debug(f"Oid {o}, not accepted but known") 
    
                    continue
                if p in report:
                    if s > report[p]['severity']:
                        report[p] = {'severity': s, 'threat': t}
    
                    report[p] = {'severity': s, 'threat': t}
                if s > glob_severity:
                    glob_severity = s
                    glob_threat = t
            report['global'] = {'threat': glob_threat, 'severity': glob_severity}
            return report
    
        
    class GVMClient():
        """
        This class provides API to interact with GVM in order to 
        get, create and delete port_lists, targets, tasks and reports
        """
        
        CONNECTION_RETRIES = 5
        LOCAL_IP = "127.0.0.1"
        
        def __init__(self, 
                     auth_n: str, 
                     auth_p: str, 
                     host_ip: str = LOCAL_IP):
            self.auth_name = auth_n
            self.auth_passwd = auth_p 
            self.host_ip = host_ip
    
            self.client = None
    
            self.create_client()
    
    
        def create_client(self):
            retry = self.CONNECTION_RETRIES
            while(retry > 0):
                try:
    
                    logging.debug('Creation of the GMP Client')
    
                    logging.debug(f'host_ip: {self.host_ip}')
    
                    self.client = Gmp(TLSConnection(hostname = self.host_ip), 
    
                                      transform=EtreeTransform())
                    break
    
                except Exception:
    
                    logging.error(f"Connection error with the gmp endpoint. Remaining {retry} retries")
                    retry -= 1
                    sleep(0.5)
    
            logging.debug('GMP Client Created')
    
            if retry == 0:
    
                raise GvmException("Impossible connect to the gmp endpoint even after 5 retries")
    
                
            self.client.authenticate(self.auth_name, self.auth_passwd)
    
            logging.debug('GMP Client Authenticated')
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    
        def get_client(self):
            return self.client
    
    
        def get_version(self) -> str:
            res = self.client.get_version()
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            return str(res.xpath('version/text()')[0])