Skip to content
Snippets Groups Projects
gvm_library.py 18.9 KiB
Newer Older
from gvm.connections import TLSConnection
from gvm.protocols.gmpv208 import Gmp, AliveTest 
from gvm.transforms import EtreeTransform
from gvm.xml import pretty_print
from time import time, sleep
import logging
import base64
import json
from typing import Optional, Dict, List, Tuple

# GVM Xpath Constants
GVM_XPATH_NAME_TEXT = 'name/text()'
GVM_XPATH_REPORT_ID_TEXT = "report_id/text()"
GVM_XPATH_STATUS = '@status'
GVM_XPATH_STATUS_TEXT = '@status_text'
GVM_XPATH_STATUS_TEXT_2 = '@status/text'
GVM_XPATH_PROGRESS_TEXT = 'progress/text()'
GVM_XPATH_INUSE_TEXT = 'in_use/text()'
GVM_XPATH_LAST_REPORT_ID = 'last_report/report/@id'
GVM_XPATH_REPORT_TEXT = 'report/text()'

# GVM Status Constants
GVM_STATUS_OK = "200"
GVM_STATUS_CREATE_OK = "201"

# Custom Exceptions
class GvmException(Exception):
    pass
# More Readable print function
def pretty_json(j):
    return json.dumps(j,sort_keys=True,indent=4)

# Class containing config ids
class Configs:
    config = "9866edc1-8869-4e80-acac-d15d5647b4d9"
    scanner = "08b69003-5fc2-4037-a479-93b440211c73"
    ovs_ssh_credential = "b9af5845-8b87-4378-bca4-cee39a894c17"

# Class containining format ids
class ReportFormats:
    anonymous_xml = "5057e5cc-b825-11e4-9d0e-28d24461215b"
    csv_results   = "c1645568-627a-11e3-a660-406186ea4fc5"
    itg           = "77bd6c4a-1f62-11e1-abf0-406186ea4fc5"
    pdf           = "c402cc3e-b531-11e1-9163-406186ea4fc5"
    txt           = "a3810a62-1f62-11e1-9219-406186ea4fc5"
    xml           = "a994b278-1f62-11e1-96ac-406186ea4fc5"

class PortList:
    """
    This class helps the managing of the GVM port_list object
    
    Attributes:
        client = client used to interact with gvm server (created from GVMClient class)
        name: str = name of port list object
        id: str = id returned after the creation
        in_use: str = state if the port_list object is in use
    """

    def __init__(self, 
                 name: str = "", 
                 client = None,
                 id: str = None, 
                 in_use: str = None):
        self.client = client,
        self.name = name
        self.id = id
        self.in_use = in_use
        
    def __str__(self):
        d = {'name': self.name, 
             'id': self.id, 
             'in_use': self.in_use}
        return pretty_json(d)
    
    def __del__(self):
        self.delete()
    
    def delete(self):
        res = self.client.delete_port_list(self.id)
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_OK:
            logging.info(f"Port_list {self} DELETED") 
        else:
            logging.error(f"ERROR during the port_list deletion {status}: {status_text}")
class Target:
    """
    This class helps the managing of the GVM target object
    
    Attributes:
        client = client used to interact with gvm server (created from GVMClient class)
        name: str = name of target object
        id: str = id returned after the creation
        in_use: str = state if the target object is in use
    """
    
    def __init__(self,
                 name: str = "", 
                 id: str = "",
                 in_use: str = "",
                 hosts: str = "",
                 client = None, 
                 port_list: PortList = None):
        self.client = client
        self.name = name
        self.id = id
        self.in_use = in_use
        self.hosts = hosts
        self.port_list = port_list
        
    def __str__(self):
        d = {'name': self.name, 
             'id': self.id, 
             "in_use": self.in_use,
             'hosts': self.hosts}
        return pretty_json(d)
    
    def __del__(self):
        self.delete()
        
    def delete(self):
        res = self.client.delete_target(self.id)
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_OK:
            logging.info(f"Target {self} DELETED") 
        else:
            logging.error(f"ERROR during the target deletion {status}: {status_text}")

class Task:
    """
    This class helps the managing of the GVM task object
    
    Attributes:
        client = client used to interact with gvm server (created from GVMClient class)
        name: str = name of task object
        id: str = id returned after the creation
        in_use: str = state if the task object is in use
        report_id: str = report id once task is completed
        
    Methods:
        start = starts the task
        stop = stops the task
        delete = deletes the tasks
        get_progress = retrieves current task status
        wait = waits for the task is completed
        save_report = saves report on file
        get_report_info = retrieves report information
    """

    # Configiration
    COMMUNICATION_RETRIES = 3
    
    def __init__(self, 
                 name: str = "", 
                 id: str = "",
                 client = None,
                 in_use: str = "",
                 status: str = "",
                 progress: str = "",
                 report_id: str = None, 
                 target: Target = None):
        self.client = client
        self.name = name
        self.id = id
        self.in_use = in_use
        self.status = status
        self.progress = progress
        self.report_id = report_id
        self.target = target
        
    def __str__(self):
        d = {'name': self.name, 
             'id': self.id, 
             'in_use': self.in_use,
             'status': self.status,
             'report_id': self.report_id}
        return pretty_json(d)
    
    def start(self):
        res = self.client.start_task(self.id)
        self.report_id = res.xpath(GVM_XPATH_REPORT_ID_TEXT)[0]
        logging.info(f"Task {self} STARTED") 

    def stop(self):
        res = self.client.stop_task(self.id)
        pretty_print(res)
        logging.info(f"Task {self} STARTED") 

    def delete(self):
        res = self.client.delete_task(self.id)
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_OK:
            logging.info(f"Task {self} DELETED") 
        else:
            logging.error(f"ERROR during the task deletion {status}: {status_text}")
    
    def __del__(self):
        self.delete()
    
    def update_status(self):
        tasks_info = self.client.get_tasks(filter_string = self.id)
        task_info = tasks_info.xpath('task')[0]
        self.name = task_info.xpath(GVM_XPATH_NAME_TEXT)[0]
        self.status = task_info.xpath(GVM_XPATH_STATUS_TEXT_2)[0]       #   New -> Requested -> Queued -> Running  -> Done
        self.progress = int(task_info.xpath(GVM_XPATH_PROGRESS_TEXT)[0])#    0         0           0      0 -> 100     -1
        self.in_use = task_info.xpath(GVM_XPATH_INUSE_TEXT)[0]
            self.report_id = task_info.xpath(GVM_XPATH_LAST_REPORT_ID)[0]
        except Exception:
            
    def wait(self, timeout: int = 7200) -> bool:
        start_time = time()
        retries = self.COMMUNICATION_RETRIES
        logging.debug("Waiting for scans ends the task")
        while True:
            try:
                self.update_status()
            except Exception:
                if retries > 0:
                    retries -= 1
                else:
                    logging.error(f"Collected {self.COMMUNICATION_RETRIES} failures during updating the task status")
                    return False

            if self.status not in ["New","Requested","Queued","Running","Done"]: # ["Interrupted", ...]
                logging.warning(f"Task in the undesired status: '{self.status}'")
                return False
            if self.status == "Done" and self.progress == -1:
                logging.info("Task completed")
                return True
            if time() - start_time > timeout:
                logging.error("TIMEOUT during waiting for task ending")
                return False
            logging.debug(f"Waiting for the task ends. Now {int(time() - start_time)}s from start. Status: {self.status}")
    def save_report(self, format: str, filename: str):
        res = self.client.get_report(self.report_id,
                                     report_format_id=format, 
                                     ignore_pagination=True,
                                     details=True)
        code = str(res.xpath(GVM_XPATH_REPORT_TEXT)[0])
        with open(filename, "wb") as fh:
            fh.write(base64.b64decode(code))
            
    def get_report_info(self, issues_to_drop: List[str]) -> Dict:
        report = dict()
        res = self.client.get_report(self.report_id,
                                     report_format_id=ReportFormats.anonymous_xml,
                                     ignore_pagination=True,
                                     details="1")
        o_ids = res.xpath('report/report/results/result/nvt/@oid')
        severities = res.xpath('report/report/results/result/nvt/severities/@score')
        severities = list(map(lambda a : float(a), severities))
        treats = res.xpath('report/report/results/result/threat/text()')
        ports = res.xpath('report/report/results/result/port/text()')
        glob_severity = -1 # severities are not negative
        glob_threat = 'None'
        for o, s, t, p in zip(o_ids, severities, treats, ports):
            logging.debug(f"Detected oid: {o}, severity: {s}, threat: {t} and port: {p}")
            if o in issues_to_drop: 
                logging.debug(f"Dropped issue {o}") 
                continue
            if p in report:
                if s > report[p]['severity']:
                    report[p] = {'severity': s, 'threat': t}
                report[p] = {'severity': s, 'threat': t}
            if s > glob_severity:
                glob_severity = s
                glob_threat = t
        report['global'] = {'threat': glob_threat, 'severity': glob_severity}
        return report
    
class GVMClient():
    """
    This class provides API to interact with GVM in order to 
    get, create and delete port_lists, targets, tasks and reports
    """
    
    CONNECTION_RETRIES = 5
    LOCAL_IP = "127.0.0.1"
    
    def __init__(self, 
                 auth_n: str, 
                 auth_p: str, 
                 host_ip: str = LOCAL_IP):
        self.auth_name = auth_n
        self.auth_passwd = auth_p 
        self.host_ip = host_ip
        self.client = None
        self.create_client()

    def create_client(self):
        retry = self.CONNECTION_RETRIES
        while(retry > 0):
            try:
                self.client = Gmp(TLSConnection(hostname = self.host_ip), 
                                  transform=EtreeTransform())
                break
            except Exception:
                logging.error(f"Connection error with the gmp endpoint. Remaining {retry} retries")
                retry -= 1
                sleep(0.5)
        
        if retry == 0:
            raise GvmException("Impossible connect to the gmp endpoint even after 5 retries")
            
        self.client.authenticate(self.auth_name, self.auth_passwd)
        
    def get_version(self) -> str:
        res = self.client.get_version()
        return str(res.xpath('version/text()')[0])
    
    def get_port_lists(self, filter: str = "rows=-1") -> List[PortList]:
        res = []
        client_res = self.client.get_port_lists(filter_string = filter)
        for pl in client_res.xpath('port_list'):
            o = PortList()
            o.client = self.client
            o.name = pl.xpath(GVM_XPATH_NAME_TEXT)[0]
            o.id = pl.xpath('@id')[0]
            o.in_use = pl.xpath(GVM_XPATH_INUSE_TEXT)[0]
            res.append(o)
        return res
    
    def create_port_list(self, name: str, ports: List[str]) -> Optional[PortList]:
        res = self.client.create_port_list(name, ','.join(ports))
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_CREATE_OK:
            pl_id = str(res.xpath('@id')[0])
            client_res = self.client.get_port_lists(filter_string = pl_id)
            pl_info = client_res.xpath('port_list')[0]
            o = PortList()
            o.client = self.client
            o.name = pl_info.xpath(GVM_XPATH_NAME_TEXT)[0]
            o.id = pl_info.xpath('@id')[0]
            o.in_use = pl_info.xpath(GVM_XPATH_INUSE_TEXT)[0]
            logging.debug(f'Created port list obj. Name: {name}, id: {id}, ports: {ports}')
            return o
        else:
            logging.error(f"ERROR during Port list creation. Status code: {status}, msg: {status_text}")
            msg = f"ERROR during Port list creation. Status code: {status}, msg: {status_text}"
            raise GvmException(msg) 

    def delete_port_list(self, pl: PortList):
        res = self.client.delete_port_list(pl.id)
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_OK:
            logging.info(f"Port_list {pl} DELETED") 
        else:
            logging.error(f"ERROR {status}: {status_text}")
    
    def get_or_create_port_list(self, pl_name: str, ports: List[str]) -> PortList:
        res = self.get_port_lists(pl_name)
        if len(res) == 0:
            return self.create_port_list(pl_name, ports)
        elif len(res) == 1:
            return res[0]
        else:
            logging.warning(f"Found {len(res)} results.")
            return res
        
    def create_target(self, name: str, ip: str, pl: PortList) -> Optional[Target]:
        res = self.client.create_target(
                name = name,
                comment = "",
                hosts = [ip],
                port_list_id = pl.id,
                ssh_credential_id = Configs.ovs_ssh_credential,
                alive_test = AliveTest('Consider Alive'))
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_CREATE_OK:
            target_id = str(res.xpath('@id')[0])
            client_res = self.client.get_targets(filter_string = target_id)
            target_info = client_res.xpath('target')[0]
            t = Target()
            t.client = self.client
            t.name = target_info.xpath(GVM_XPATH_NAME_TEXT)[0]
            t.id  = target_info.xpath('@id')[0]
            t.in_use = target_info.xpath(GVM_XPATH_INUSE_TEXT)[0]
            t.port_list = pl
            return t  
        else:
            msg = f"ERROR during Target creation. Status code: {status}, msg: {status_text}"
            raise GvmException(msg)
    def get_targets(self, filter: str) -> List[Target]:
        res = []
        targets = self.client.get_targets(filter_string = filter)
        for target in targets.xpath('target'):
            t = Target()
            t.client = self.client
            t.name = target.xpath(GVM_XPATH_NAME_TEXT)[0]
            t.id  = target.xpath('@id')[0]
            t.in_use = target.xpath(GVM_XPATH_INUSE_TEXT)[0]
            res.append(t)
        return res  

    def delete_target(self, target: Target):
        res = self.client.delete_target(target.id)
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_OK:
            logging.info(f"Target {target} DELETED") 
        else:
            logging.error(f"ERROR {status}: {status_text}")

    def get_or_create_target(self, name: str, ip: str, port_list: PortList) -> Target:
        res = self.get_targets(name)
        if len(res) == 0:
            return self.create_target(name, ip, port_list)
        elif len(res) == 1:
            res[0].port_list = port_list
            return res[0]
        else:
            logging.warning(f"Found {len(res)} targets. Return First one")
            return res[0]

    def search_and_delete_target(self, target_name: str):
        targets = self.get_targets(target_name)
        if len(targets) == 1:
            self.delete_target(targets[0]['id'])
        else:
            raise GvmException("Multiple results for search")

    def search_and_delete_all_targets(self, target_name: str):
        targets = self.get_targets(target_name)
        for target in targets:
            self.delete_target(target)

    def create_task(self, name: str, target: Target) -> Optional[Task]:
        res = self.client.create_task(
                name = name,
                config_id = Configs.config,
                target_id = target.id,
                scanner_id = Configs.scanner)
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_CREATE_OK:
            task_id = str(res.xpath('@id')[0])
            client_res = self.client.get_tasks(filter_string = task_id)
            task_info = client_res.xpath('task')[0]
            t = Task()
            t.client = self.client
            t.name = task_info.xpath(GVM_XPATH_NAME_TEXT)[0]
            t.id = task_info.xpath('@id')[0]
            t.in_use = task_info.xpath(GVM_XPATH_INUSE_TEXT)[0]
            t.status = task_info.xpath('status/text()')[0]
            try:
                t.report_id = task_info.xpath(GVM_XPATH_LAST_REPORT_ID)[0]
            except Exception:
                pass
            t.target = target
            return t
        else:
            msg = f"ERROR during Task creation. Status code: {status}, msg: {status_text}"
            raise GvmException(msg)
    def get_tasks(self, filter: str) -> List[Task]: 
        list_of_tasks = []
        tasks = self.client.get_tasks(filter_string = filter)
        for task in tasks.xpath('task'):
                t = Task()
                t.client = self.client
                t.name = task.xpath(GVM_XPATH_NAME_TEXT)[0]
                t.id = task.xpath('@id')[0]
                t.in_use = task.xpath(GVM_XPATH_INUSE_TEXT)[0]
                t.status = task.xpath('status/text()')[0]
                try:
                    t.report_id = task.xpath(GVM_XPATH_LAST_REPORT_ID)[0]
                except Exception:
                    pass
                list_of_tasks.append(t)
        return list_of_tasks

    def get_or_create_task(self, task_name: str, target: Target) -> Task:
        res = self.get_tasks(task_name)
        if len(res) == 0:
            return self.create_task(task_name, target)
        elif len(res) == 1:
            res[0].target = target
            return res[0]
        else:
            print(f"WARNING: Returned {len(res)} tasks. Returned None")
            return res[0]

    def delete_all_tasks(self, filter: str):
        tasks = self.get_tasks(filter)
        for task in tasks:
            self.delete_task(task)

    def get_report_formats(self):
        res =  self.client.get_report_formats()
        for f in res.xpath('report_format'):
            rf_name = f.xpath(GVM_XPATH_NAME_TEXT)[0]
            rf_id = f.xpath('@id')[0]
            print(f"Report format id: '{rf_id}' and name: '{rf_name}'")