Skip to content
Snippets Groups Projects
gvm_library.py 17.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • from gvm.connections import TLSConnection
    from gvm.protocols.gmpv208 import Gmp, AliveTest 
    from gvm.transforms import EtreeTransform
    from gvm.xml import pretty_print
    from time import time, sleep
    import logging
    import base64
    import json
    
    from typing import Optional, Dict, List, Tuple
    
    
    
    def pretty_json(j):
        return json.dumps(j,sort_keys=True,indent=4)
    
    class Configs:
        config = "9866edc1-8869-4e80-acac-d15d5647b4d9"
        scanner = "08b69003-5fc2-4037-a479-93b440211c73"
        ovs_ssh_credential = "b9af5845-8b87-4378-bca4-cee39a894c17"
    
    class ReportFormats:
        anonymous_xml = "5057e5cc-b825-11e4-9d0e-28d24461215b"
        csv_results   = "c1645568-627a-11e3-a660-406186ea4fc5"
        itg           = "77bd6c4a-1f62-11e1-abf0-406186ea4fc5"
        pdf           = "c402cc3e-b531-11e1-9163-406186ea4fc5"
        txt           = "a3810a62-1f62-11e1-9219-406186ea4fc5"
        xml           = "a994b278-1f62-11e1-96ac-406186ea4fc5"
    
    class PortList:
        """
        This class helps the managing of the GVM port_list object
        
        Attributes:
            client = client used to interact with gvm server (created from GVMClient class)
            name: str = name of port list object
            id: str = id returned after the creation
            in_use: str = state if the port_list object is in use
        """
    
        def __init__(self, 
                     name: str = "", 
    
                     client = None,
    
                     id: str = None, 
                     in_use: str = None):
            self.client = client,
            self.name = name
            self.id = id
            self.in_use = in_use
            
        def __str__(self):
            d = {'name': self.name, 
                 'id': self.id, 
    
                 'in_use': self.in_use}
    
            return pretty_json(d)
        
        def __del__(self):
    
            self.delete()
        
        def delete(self):
    
            print("client pl", self.client)
    
            res = self.client.delete_port_list(self.id)
            status = res.xpath('@status')[0]
            status_text = res.xpath('@status_text')[0]
            if status == "200":
                logging.info(f"Port_list {self} DELETED") 
            else:
                logging.error(f"ERROR during the port_list deletion {status}: {status_text}")
    
    class Target:
        """
        This class helps the managing of the GVM target object
        
        Attributes:
            client = client used to interact with gvm server (created from GVMClient class)
            name: str = name of target object
            id: str = id returned after the creation
            in_use: str = state if the target object is in use
        """
        
    
        def __init__(self,
    
                     name: str = "", 
                     id: str = "",
                     in_use: str = "",
                     hosts: str = "",
    
                     client = None, 
                     port_list: PortList = None):
    
            self.client = client
            self.name = name
            self.id = id
            self.in_use = in_use
            self.hosts = hosts
    
            self.port_list = port_list
    
            
        def __str__(self):
            d = {'name': self.name, 
                 'id': self.id, 
                 "in_use": self.in_use,
    
                 'hosts': self.hosts}
    
            return pretty_json(d)
        
        def __del__(self):
    
            self.delete()
            
        def delete(self):
    
            res = self.client.delete_target(self.id)
            status = res.xpath('@status')[0]
            status_text = res.xpath('@status_text')[0]
            if status == "200":
                logging.info(f"Target {self} DELETED") 
            else:
                logging.error(f"ERROR during the target deletion {status}: {status_text}")
    
    class Task:
        """
        This class helps the managing of the GVM task object
        
        Attributes:
            client = client used to interact with gvm server (created from GVMClient class)
            name: str = name of task object
            id: str = id returned after the creation
            in_use: str = state if the task object is in use
            report_id: str = report id once task is completed
            
        Methods:
            start = starts the task
            stop = stops the task
            delete = deletes the tasks
            get_progress = retrieves current task status
            wait = waits for the task is completed
            save_report = saves report on file
            get_report_info = retrieves report information
        """
        
        def __init__(self, 
    
                     name: str = "", 
                     id: str = "",
    
                     client = None,
    
                     in_use: str = "",
                     status: str = "",
    
                     progress: str = "",
    
                     report_id: str = None, 
                     target: Target = None):
    
            self.client = client
            self.name = name
            self.id = id
            self.in_use = in_use
            self.status = status
    
            self.progress = progress
    
            self.report_id = report_id
    
            self.target = target
    
            
        def __str__(self):
            d = {'name': self.name, 
                 'id': self.id, 
                 'in_use': self.in_use,
                 'status': self.status,
    
                 'report_id': self.report_id}
    
            return pretty_json(d)
        
        def start(self):
            res = self.client.start_task(self.id)
            self.report_id = res.xpath('report_id/text()')[0]
    
            logging.info(f"Task {self} STARTED") 
    
    
        def stop(self):
            res = self.client.stop_task(self.id)
            pretty_print(res)
    
            logging.info(f"Task {self} STARTED") 
    
    
        def delete(self):
            res = self.client.delete_task(self.id)
            status = res.xpath('@status')[0]
            status_text = res.xpath('@status_text')[0]
            if status == "200":
                logging.info(f"Task {self} DELETED") 
            else:
                logging.error(f"ERROR during the task deletion {status}: {status_text}")
        
        def __del__(self):
            self.delete()
        
    
        def update_status(self):
    
            tasks_info = self.client.get_tasks(filter_string = self.id)
            task_info = tasks_info.xpath('task')[0]
    
            self.name = task_info.xpath('name/text()')[0]
            self.status = task_info.xpath('status/text()')[0]         #   New -> Requested -> Queued -> Running  -> Done
            self.progress = int(task_info.xpath('progress/text()')[0])#    0         0           0      0 -> 100     -1
            self.in_use = task_info.xpath('in_use/text()')[0]
            try:
                self.report_id = task_info.xpath('last_report/report/@id')[0]
            except:
                pass
            
    
        def wait(self, timeout: int = 3600) -> bool:
            start_time = time()
            logging.debug("Waiting for scans ends the task")
            while True:
    
                self.update_status()
                if self.status not in ["New","Requested","Queued","Running","Done"]: # ["Interrupted", ...]
                    logging.warning(f"Task in the undesired status: '{self.status}'")
    
                    return False
    
                if self.status == "Done" and self.progress == -1:
    
                    logging.info(f"Task completed")
                    return True
                if time() - start_time > timeout:
                    logging.error("TIMEOUT during waiting for task ending")
                    return False
    
                logging.debug(f"Waiting for the task ends. Now {int(time() - start_time)}s from start. Status: {self.status}")
    
        def save_report(self, format: str, filename: str):
    
            res = self.client.get_report(self.report_id,
    
                                         report_format_id=format, 
    
                                         ignore_pagination=True,
    
                                         details=True)
    
            code = str(res.xpath('report/text()')[0])
    
            with open(filename, "wb") as fh:
    
                fh.write(base64.b64decode(code))
                
    
        def get_report_info(self) -> Dict:
    
            report = dict()
            res = self.client.get_report(self.report_id,
                                         report_format_id=ReportFormats.anonymous_xml,
                                         ignore_pagination=True,
                                         details="1")
            threats = res.xpath('report/report/ports/port/threat/text()')
            ports = res.xpath('report/report/ports/port/text()')
            severities = res.xpath('report/report/ports/port/severity/text()')
            severities = list(map(lambda a : float(a), severities))
            for p,t,s in zip(ports, threats, severities):
                report[p] = {'severity': s, 'threat': t}
            glob_severity = -1 # returned severities are null or positive
            glob_threat = 'Log'
            for threat,severity in zip(threats,severities):
                if severity > glob_severity:
                    glob_severity = severity
                    glob_threat = threat
                    glob_severity = severity
    
            report['global'] = {'threat': glob_threat, 'severity': glob_severity}
            return report
        
    class GVMClient():
        """
        This class provides API to interact with GVM in order to 
        get, create and delete port_lists, targets, tasks and reports
        """
        
        CONNECTION_RETRIES = 5
        LOCAL_IP = "127.0.0.1"
        
        def __init__(self, 
                     auth_n: str, 
                     auth_p: str, 
                     host_ip: str = LOCAL_IP):
            self.auth_name = auth_n
            self.auth_passwd = auth_p 
            self.host_ip = host_ip
    
            self.client = None
    
            self.create_client()
    
    
        def create_client(self):
            retry = self.CONNECTION_RETRIES
            while(retry > 0):
                try:
    
                    self.client = Gmp(TLSConnection(hostname = self.host_ip), 
    
                                      transform=EtreeTransform())
                    break
                except:
                    logging.error(f"Connection error with the gmp endpoint. Remaining {retry} retries")
                    retry -= 1
                    sleep(0.5)
            
            if retry == 0:
                raise Exception("Impossible connect to the gmp endpoint even after 5 retries")
                
            self.client.authenticate(self.auth_name, self.auth_passwd)
            
        def get_version(self) -> str:
            res = self.client.get_version()
            return str(res.xpath('version/text()')[0])
        
    
        def get_port_lists(self, filter: str = "rows=-1") -> List[PortList]:
    
            res = []
            client_res = self.client.get_port_lists(filter_string = filter)
            for pl in client_res.xpath('port_list'):
                o = PortList()
                o.client = self.client
                o.name = pl.xpath('name/text()')[0]
                o.id = pl.xpath('@id')[0]
                o.in_use = pl.xpath('in_use/text()')[0]
                res.append(o)
            return res
        
    
        def create_port_list(self, name: str, ports: List[str]) -> Optional[PortList]:
    
            res = self.client.create_port_list(name, ','.join(ports))
            status = res.xpath('@status')[0]
            status_text = res.xpath('@status_text')[0]
            if status == "201":
    
                pl_id = str(res.xpath('@id')[0])
    
                client_res = self.client.get_port_lists(filter_string = pl_id)
                pl_info = client_res.xpath('port_list')[0]
                o = PortList()
                o.client = self.client
                o.name = pl_info.xpath('name/text()')[0]
                o.id = pl_info.xpath('@id')[0]
                o.in_use = pl_info.xpath('in_use/text()')[0]
    
                logging.debug(f'Created port list obj. Name: {name}, id: {id}, ports: {ports}')
    
                return o
    
            else:
                logging.error(f"ERROR during Port list creation. Status code: {status}, msg: {status_text}")
                msg = f"ERROR during Port list creation. Status code: {status}, msg: {status_text}"
                raise Exception(msg) 
    
        def delete_port_list(self, pl: PortList):
            res = self.client.delete_port_list(pl.id)
            status = res.xpath('@status')[0]
            status_text = res.xpath('@status_text')[0]
            if status == "200":
                logging.info(f"Port_list {pl} DELETED") 
            else:
                logging.error(f"ERROR {status}: {status_text}")
        
    
        def get_or_create_port_list(self, pl_name: str, ports: List[str]) -> PortList:
    
            res = self.get_port_lists(pl_name)
            if len(res) == 0:
    
                return self.create_port_list(pl_name, ports)
    
            elif len(res) == 1:
                return res[0]
            else:
                logging.warning(f"Found {len(res)} results.")
                return res
            
    
        def create_target(self, name: str, ip: str, pl: PortList) -> Optional[Target]:
    
            res = self.client.create_target(
                    name = name,
                    comment = "",
                    hosts = [ip],
                    port_list_id = pl.id,
                    ssh_credential_id = Configs.ovs_ssh_credential,
                    alive_test = AliveTest('Consider Alive'))
            status = res.xpath('@status')[0]
            status_text = res.xpath('@status_text')[0]
            if status == "201":
    
                target_id = str(res.xpath('@id')[0])
    
                client_res = self.client.get_targets(filter_string = target_id)
    
                target_info = client_res.xpath('target')[0]
    
                t = Target()
                t.client = self.client
                t.name = target_info.xpath('name/text()')[0]
                t.id  = target_info.xpath('@id')[0]
                t.in_use = target_info.xpath('in_use/text()')[0]
                t.port_list = pl
                return t  
    
            else:
                msg = f"ERROR during Target creation. Status code: {status}, msg: {status_text}"
    
                raise Exception(msg)
    
        def get_targets(self, filter: str) -> List[Target]:
    
            res = []
    
            targets = self.client.get_targets(filter_string = filter)
    
            for target in targets.xpath('target'):
                t = Target()
                t.client = self.client
                t.name = target.xpath('name/text()')[0]
                t.id  = target.xpath('@id')[0]
                t.in_use = target.xpath('in_use/text()')[0]
                res.append(t)
    
            return res  
    
    
        def delete_target(self, target: Target):
            res = self.client.delete_target(target.id)
            status = res.xpath('@status')[0]
            status_text = res.xpath('@status_text')[0]
            if status == "200":
                logging.info(f"Target {target} DELETED") 
            else:
                logging.error(f"ERROR {status}: {status_text}")
    
    
        def get_or_create_target(self, name: str, ip: str, port_list: PortList) -> Target:
    
            res = self.get_targets(name)
            if len(res) == 0:
    
                return self.create_target(name, ip, port_list)
    
            elif len(res) == 1:
    
                res[0].port_list = port_list
    
                return res[0]
            else:
    
                logging.warning(f"Found {len(res)} targets. Return None")
                return None
    
    
        def search_and_delete_target(self, target_name: str):
            targets = self.get_targets(target_name)
            if len(targets) == 1:
                self.delete_target(targets[0]['id'])
            else:
                raise("Multiple results for search")
    
        def search_and_delete_all_targets(self, target_name: str):
            targets = self.get_targets(target_name)
            for target in targets:
                self.delete_target(target)
    
    
        def create_task(self, name: str, target: Target) -> Optional[Task]:
    
            res = self.client.create_task(
                    name = name,
                    config_id = Configs.config,
                    target_id = target.id,
                    scanner_id = Configs.scanner)
            status = res.xpath('@status')[0]
            status_text = res.xpath('@status_text')[0]
            if status == "201":
    
                task_id = str(res.xpath('@id')[0])
    
                client_res = self.client.get_tasks(filter_string = task_id)
                task_info = client_res.xpath('task')[0]
                t = Task()
                t.client = self.client
                t.name = task_info.xpath('name/text()')[0]
                t.id = task_info.xpath('@id')[0]
                t.in_use = task_info.xpath('in_use/text()')[0]
                t.status = task_info.xpath('status/text()')[0]
                try:
                    t.report_id = task_info.xpath('last_report/report/@id')[0]
                except:
                    pass
                t.target = target
                return t
    
            else:
                msg = f"ERROR during Task creation. Status code: {status}, msg: {status_text}"
                raise Exception(msg)
    
    
        def get_tasks(self, filter: str) -> List[Task]: 
    
            list_of_tasks = []
    
            tasks = self.client.get_tasks(filter_string = filter)
    
            for task in tasks.xpath('task'):
                    t = Task()
    
                    t.client = self.client
    
                    t.name = task.xpath('name/text()')[0]
                    t.id = task.xpath('@id')[0]
                    t.in_use = task.xpath('in_use/text()')[0]
                    t.status = task.xpath('status/text()')[0]
                    try:
                        t.report_id = task.xpath('last_report/report/@id')[0]
                    except:
                        pass
                    list_of_tasks.append(t)
            return list_of_tasks
    
        def get_or_create_task(self, task_name: str, target: Target) -> Task:
            res = self.get_tasks(task_name)
            if len(res) == 0:
    
                return self.create_task(task_name, target)
    
            elif len(res) == 1:
    
                res[0].target = target
    
                return res[0]
            else:
                print(f"Found {len(res)} results. Return None")
                return res
    
        def delete_all_tasks(self, filter: str):
            tasks = self.get_tasks(filter)
            for task in tasks:
                self.delete_task(task)
    
        def get_report_formats(self):
            res =  self.client.get_report_formats()
            for f in res.xpath('report_format'):
                name = f.xpath('name/text()')[0]
                id = f.xpath('@id')[0]
                print(f"Report format id: '{id}' and name: '{name}'")