Skip to content
Snippets Groups Projects
gvm_library.py 29.3 KiB
Newer Older
  • Learn to ignore specific revisions
  • from gvm.connections import TLSConnection
    from gvm.protocols.gmpv208 import Gmp, AliveTest 
    from gvm.transforms import EtreeTransform
    from gvm.xml import pretty_print
    from time import time, sleep
    import logging
    import base64
    import json
    
    from typing import Optional, Dict, List, Tuple
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    import yaml
    from functools import reduce
    import os
    import git
    
    # GVM Xpath Constants
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    GVM_XPATH_ID = '@id'
    
    GVM_XPATH_NAME_TEXT = 'name/text()'
    GVM_XPATH_REPORT_ID_TEXT = "report_id/text()"
    GVM_XPATH_STATUS = '@status'
    GVM_XPATH_STATUS_TEXT = '@status_text'
    GVM_XPATH_STATUS_TEXT_2 = '@status/text'
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    GVM_XPATH_STATUS_TEXT_3 = 'status/text()'
    
    GVM_XPATH_PROGRESS_TEXT = 'progress/text()'
    GVM_XPATH_INUSE_TEXT = 'in_use/text()'
    GVM_XPATH_LAST_REPORT_ID = 'last_report/report/@id'
    GVM_XPATH_REPORT_TEXT = 'report/text()'
    
    # GVM Status Constants
    GVM_STATUS_OK = "200"
    GVM_STATUS_CREATE_OK = "201"
    
    
    # Custom Exceptions
    class GvmException(Exception):
        pass
    
    # More Readable print function
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    def pretty_json(j) -> str:
    
        return json.dumps(j,sort_keys=True,indent=4)
    
    
    # Class containing config ids
    
    class Configs:
        config = "9866edc1-8869-4e80-acac-d15d5647b4d9"
        scanner = "08b69003-5fc2-4037-a479-93b440211c73"
        ovs_ssh_credential = "b9af5845-8b87-4378-bca4-cee39a894c17"
    
    
    # Class containining format ids
    
    class ReportFormats:
        anonymous_xml = "5057e5cc-b825-11e4-9d0e-28d24461215b"
        csv_results   = "c1645568-627a-11e3-a660-406186ea4fc5"
        itg           = "77bd6c4a-1f62-11e1-abf0-406186ea4fc5"
        pdf           = "c402cc3e-b531-11e1-9163-406186ea4fc5"
        txt           = "a3810a62-1f62-11e1-9219-406186ea4fc5"
        xml           = "a994b278-1f62-11e1-96ac-406186ea4fc5"
    
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    from dataclasses import dataclass
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    from json import JSONEncoder
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    class EmployeeEncoder(JSONEncoder):
        def default(self, o):
            return o.__dict__
            
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    @dataclass
    class ResultReport():
        oid: str
        severity: float
        threat: str
        port: str
    
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
        def __str__(self):
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            msg = f"{self.oid}, {self.severity}, "
            msg += f"{self.threat}, {self.port}"
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            return msg
    
        def to_dict(self):
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            data = {"oid": self.oid,
                    "sev": self.severity,
                    "threat": self.threat,
                    "port": self.port}
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            return data
    
    
    
    class PortList:
        """
        This class helps the managing of the GVM port_list object
        
        Attributes:
            client = client used to interact with gvm server (created from GVMClient class)
            name: str = name of port list object
            id: str = id returned after the creation
            in_use: str = state if the port_list object is in use
        """
    
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
        def __init__(self,
                     client, 
                     name: str, 
                     ports: List[str]):
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            self.client = client
    
            self.name = name
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            self.ports = ','.join(ports)
            
            # Retrieve port_list objs by name
            res = self.__get_info(filter = name)
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            if len(res) == 0: 
                # If no result retrieved, create it
                self.create()
            
            else:
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                logging.debug("Already created. Collected from server")
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                if len(res) > 1:
                    # If one result has been collected, consider the first one
                    msg = f"The port_list name {name} retrieved {len(res)} results"
                    logging.warning(msg)
                    logging.warning("The first one will be considered")
                
                # If one result has been collected, consider it
                self.name = res[0]['name']
                self.id = res[0]['id']
                self.in_use = res[0]['in_use']
     
        # Search port_lists by id/name
        def __get_info(self, filter: str = "rows=-1") -> List[Dict[str, str]]:
            res = []
            pls = self.client.get_port_lists(filter_string = filter) \
                             .xpath('port_list')
            
            for pl in pls:
                pl_name = str(pl.xpath(GVM_XPATH_NAME_TEXT)[0])
                pl_id = str(pl.xpath(GVM_XPATH_ID)[0])
                pl_in_use = str(pl.xpath(GVM_XPATH_INUSE_TEXT)[0])
                res.append({"name": pl_name, 
                            "id": pl_id, 
                            "in_use": pl_in_use})
            return res
    
        def create(self) -> None:
            res = self.client.create_port_list(self.name, self.ports)
            status = res.xpath(GVM_XPATH_STATUS)[0]
            status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
            if status == GVM_STATUS_CREATE_OK:
                pl_id = str(res.xpath(GVM_XPATH_ID)[0])
                res = self.__get_info(filter = pl_id)
                
                if len(res) > 0:
                    if len(res) > 1:
                        # Multiple objs retrieved, consider the first one
                        msg = f"The port_list name {self.name}"
                        msg += f" retrieved {len(res)} results"
                        logging.warning(msg)
                        logging.warning("The first one will be considered")
                    self.name = res[0]['name']
                    self.id = res[0]['id']
                    self.in_use = res[0]['in_use']
                    msg = "Created port list obj. "
                    msg += f"Name: {self.name}, id: {self.id}, ports: {self.ports}"
                    logging.debug(msg)
    
                else:
                    # No obj retrieved. Error during creation
                    msg = f"The port_list name {self.name} retrieved 0 results after creations"
                    logging.error(msg)
    
            else:
                msg = "ERROR during Port list creation. "
                msg += f"Status code: {status}, msg: {status_text}"
                logging.error(msg)
                raise GvmException(msg) 
    
    
        def __str__(self):
            d = {'name': self.name, 
                 'id': self.id, 
    
                 'in_use': self.in_use}
    
            return pretty_json(d)
        
    
        def delete(self):
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            logging.debug(f"Deletion port_list {self.name}")
    
            res = self.client.delete_port_list(self.id)
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            self.client = None
    
            status = res.xpath(GVM_XPATH_STATUS)[0]
            status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
            if status == GVM_STATUS_OK:
    
                logging.info(f"Port_list {self} DELETED") 
            else:
                logging.error(f"ERROR during the port_list deletion {status}: {status_text}")
    
    class Target:
        """
        This class helps the managing of the GVM target object
        
        Attributes:
            client = client used to interact with gvm server (created from GVMClient class)
            name: str = name of target object
            id: str = id returned after the creation
            in_use: str = state if the target object is in use
        """
        
    
        def __init__(self,
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                     client,
                     name: str,
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                     host: str,
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                     port_list: PortList):
    
            self.client = client
            self.name = name
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            self.host = host
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            self.pl = port_list
            
            # Retrieve targets objs by name
            res = self.__get_info(filter = name)
            
            if len(res) == 0: 
                # If no result retrieved, create it
                self.create()
            
            else:
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                logging.debug("Already created. Collected from server")
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                if len(res) > 1:
                    # If one result has been collected, consider the first one
                    msg = f"The target name {name} retrieved {len(res)} results"
                    logging.warning(msg)
                    logging.warning("The first one will be considered")
                
                # If one result has been collected, consider it
                self.name = res[0]['name']
                self.id = res[0]['id']
                self.in_use = res[0]['in_use']
     
        def __get_info(self, filter: str) -> List[Dict[str, str]]:
            res = []
            targets = self.client.get_targets(filter_string = filter) \
                                 .xpath('target')
            for target in targets:
                t_name = str(target.xpath(GVM_XPATH_NAME_TEXT)[0])
                t_id  = str(target.xpath(GVM_XPATH_ID)[0])
                t_in_use = str(target.xpath(GVM_XPATH_INUSE_TEXT)[0])
                res.append({"name": t_name, 
                            "id": t_id, 
                            "in_use": t_in_use})
            return res  
        
        def create(self) -> None:
            res = self.client.create_target(
                    name = self.name,
                    comment = "",
                    hosts = [self.host],
                    port_list_id = self.pl.id,
                    ssh_credential_id = Configs.ovs_ssh_credential,
                    alive_test = AliveTest('Consider Alive'))
            
            status = res.xpath(GVM_XPATH_STATUS)[0]
            status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
            if status == GVM_STATUS_CREATE_OK:
                t_id = str(res.xpath(GVM_XPATH_ID)[0])
                res = self.__get_info(filter = t_id)
                
                if len(res) == 0:
                    # No obj retrieved. Error during creation
                    msg = f"The target name {self.name} retrieved 0 results after creation"
                    logging.error(msg)
                else:
                    if len(res) > 1:
                        # Multiple objs retrieved, consider the first one
                        msg = f"The target id {t_id} retrieved {len(res)} results"
                        logging.warning(msg)
                        logging.warning("The first one will be considered")   
    
                    self.name = res[0]['name']
                    self.id = res[0]['id']
                    self.in_use = res[0]['in_use']
                    msg = "Created target obj. "
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                    msg += f"Name: {self.name}, id: {self.id}, host: {self.host}"
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                    logging.debug(msg)
            else:
                msg = "ERROR during Target creation. "
                msg += f"Status code: {status}, msg: {status_text}"
                raise GvmException(msg)
    
            
        def __str__(self):
            d = {'name': self.name, 
                 'id': self.id, 
                 "in_use": self.in_use,
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                 'host': self.host}
    
            return pretty_json(d)
        
    
        def delete(self):
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            logging.debug(f"Deletion target {self.name}")
    
            res = self.client.delete_target(self.id)
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            self.client = None
    
            status = res.xpath(GVM_XPATH_STATUS)[0]
            status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
            if status == GVM_STATUS_OK:
    
                logging.info(f"Target {self} DELETED") 
            else:
                logging.error(f"ERROR during the target deletion {status}: {status_text}")
    
    class Task:
        """
        This class helps the managing of the GVM task object
        
        Attributes:
            client = client used to interact with gvm server (created from GVMClient class)
            name: str = name of task object
            id: str = id returned after the creation
            in_use: str = state if the task object is in use
            report_id: str = report id once task is completed
            
        Methods:
            start = starts the task
            stop = stops the task
            delete = deletes the tasks
            get_progress = retrieves current task status
            wait = waits for the task is completed
            save_report = saves report on file
            get_report_info = retrieves report information
        """
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
        # Constants
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
        WAIT_SECONDS = 10
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    
    
        def __init__(self, 
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                     client,
                     name: str, 
                     target: Target) -> None:
    
            self.client = client
            self.name = name
    
            self.target = target
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    
            # Retrieve task objs by name
            res = self.__get_info(filter = name)
    
            if len(res) == 0: 
                # If no result retrieved, create it
                self.create()
            else:
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                logging.debug("Already created. Collected from server")
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                if len(res) > 1:
                    # If one result has been collected, consider the first one
                    msg = f"The port_list name {name} retrieved {len(res)} results"
                    logging.warning(msg)
                    logging.warning("The first one will be considered")
    
                self.name = res[0]['name']
                self.id = res[0]['id']
                self.in_use = res[0]['in_use']
                self.status = res[0]['status']
                self.report_id = res[0].get("report_id", None)
    
        def __get_info(self, filter: str) -> List[dict]: 
            res = []
            tasks = self.client.get_tasks(filter_string = filter) \
                               .xpath('task')
            for t in tasks:
                t_name = str(t.xpath(GVM_XPATH_NAME_TEXT)[0])
                t_id  = str(t.xpath(GVM_XPATH_ID)[0])
                t_in_use = str(t.xpath(GVM_XPATH_INUSE_TEXT)[0])
                t_status = str(t.xpath(GVM_XPATH_STATUS_TEXT_3)[0])
                t_dict = {"name": t_name, 
                          "id": t_id, 
                          "in_use": t_in_use, 
                          "status": t_status}
                try:
                    t_report_id = t.xpath(GVM_XPATH_LAST_REPORT_ID)[0]
                except Exception:
                    pass
                else:
                    t_dict['report_id'] = t_report_id
                
                res.append(t_dict)
            return res    
    
        def create(self) -> None:
            res = self.client.create_task(
                    name = self.name,
                    config_id = Configs.config,
                    target_id = self.target.id,
                    scanner_id = Configs.scanner)
            
            status = res.xpath(GVM_XPATH_STATUS)[0]
            status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
            
            if status == GVM_STATUS_CREATE_OK:
                t_id = str(res.xpath(GVM_XPATH_ID)[0])
                res = self.__get_info(filter = t_id)
                if len(res) == 0:
                    # No obj retrieved. Error during creation
                    msg = f"The task id {t_id} retrieved 0 results after creation"
                    logging.error(msg)
                else:
                    if len(res) > 1:
                        # Multiple objs retrieved, consider the first one
                        msg = f"The task id {t_id} retrieved {len(res)} results"
                        logging.warning(msg)
                        logging.warning("The first one will be considered")
    
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                    self.name = res[0]['name']
                    self.id = res[0]['id']
                    self.in_use = res[0]['in_use']
                    self.status = res[0]['status']
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                    self.report_id = res[0].get("report_id", None)
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                    msg = "Created task obj. "
                    msg += f"Name: {self.name}, id: {self.id}"
                    logging.debug(msg)
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            else:
                msg = "ERROR during Task creation. "
                msg += f"Status code: {status}, msg: {status_text}"
                raise GvmException(msg)
    
            
        def __str__(self):
            d = {'name': self.name, 
                 'id': self.id, 
                 'in_use': self.in_use,
                 'status': self.status,
    
                 'report_id': self.report_id}
    
            return pretty_json(d)
        
        def start(self):
            res = self.client.start_task(self.id)
    
            self.report_id = res.xpath(GVM_XPATH_REPORT_ID_TEXT)[0]
    
            logging.info(f"Task {self} STARTED") 
    
    
        def stop(self):
            res = self.client.stop_task(self.id)
            pretty_print(res)
    
            logging.info(f"Task {self} STARTED") 
    
    
        def delete(self):
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            logging.debug(f"Deletion task {self.name}")
            res = self.client.delete_task(self.id)
            self.client = None
            status = res.xpath(GVM_XPATH_STATUS)[0]
            status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
            if status == GVM_STATUS_OK:
                logging.info(f"Task {self} DELETED") 
            else:
                logging.error(f"ERROR during the task deletion {status}: {status_text}")
    
        def update_status(self):
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            task_info = self.client.get_tasks(filter_string = self.id) \
                                   .xpath('task')[0]
    
            self.name = task_info.xpath(GVM_XPATH_NAME_TEXT)[0]
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            self.status = task_info.xpath(GVM_XPATH_STATUS_TEXT_3)[0]       #   New -> Requested -> Queued -> Running  -> Done
    
            self.progress = int(task_info.xpath(GVM_XPATH_PROGRESS_TEXT)[0])#    0         0           0      0 -> 100     -1
            self.in_use = task_info.xpath(GVM_XPATH_INUSE_TEXT)[0]
    
                self.report_id = task_info.xpath(GVM_XPATH_LAST_REPORT_ID)[0]
    
            except Exception:
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            """d = {
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                "name": self.name,
                "status": self.status,
                "progress":self.progress,
                "in_use":self.in_use,
            }
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            logging.debug(f"update_status: \n {pretty_json(d)}")"""
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    
    
        def wait(self, timeout: int = 7200) -> bool:
    
            start_time = time()
            logging.debug("Waiting for scans ends the task")
            while True:
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                self.update_status()
    
                if self.status not in ["New","Requested","Queued","Processing", "Running","Done"]: # ["Interrupted", ...]
    
                    logging.warning(f"Task in the undesired status: '{self.status}'")
    
                    return False
    
                if self.status == "Done" and self.progress == -1:
    
                    logging.info("Task completed")
    
                    return True
                if time() - start_time > timeout:
                    logging.error("TIMEOUT during waiting for task ending")
                    return False
    
                logging.debug(f"Waiting for the task ends. Now {int(time() - start_time)}s from start. Status: {self.status}")
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                sleep(self.WAIT_SECONDS)
    
        def save_report(self, format: str, filename: str):
    
            res = self.client.get_report(self.report_id,
    
                                         report_format_id=format, 
    
                                         ignore_pagination=True,
    
                                         details=True)
    
            code = str(res.xpath(GVM_XPATH_REPORT_TEXT)[0])
    
            with open(filename, "wb") as fh:
    
                fh.write(base64.b64decode(code))
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    
        def get_report(self) -> List[Tuple[str,str,str,str]]:
    
            res = self.client.get_report(self.report_id,
                                         report_format_id=ReportFormats.anonymous_xml,
                                         ignore_pagination=True,
                                         details="1")
    
            o_ids: list[str] = res.xpath('report/report/results/result/nvt/@oid')
            severities: list[str] = res.xpath('report/report/results/result/nvt/severities/@score')
            severities: list[float] = list(map(lambda a : float(a), severities))
            treats: list[str] = res.xpath('report/report/results/result/threat/text()')
            ports: list[str] = res.xpath('report/report/results/result/port/text()')
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            return [ResultReport(o,s,t,p) for o,s,t,p in
                        zip(o_ids, severities, treats, ports)]
    
        
    class GVMClient():
        """
        This class provides API to interact with GVM in order to 
        get, create and delete port_lists, targets, tasks and reports
        """
        
        CONNECTION_RETRIES = 5
        LOCAL_IP = "127.0.0.1"
        
        def __init__(self, 
                     auth_n: str, 
                     auth_p: str, 
                     host_ip: str = LOCAL_IP):
            self.auth_name = auth_n
            self.auth_passwd = auth_p 
            self.host_ip = host_ip
    
            self.client = None
    
            self.create_client()
    
    
        def create_client(self):
            retry = self.CONNECTION_RETRIES
            while(retry > 0):
                try:
    
                    logging.debug('Creation of the GMP Client')
    
                    logging.debug(f'host_ip: {self.host_ip}')
    
                    self.client = Gmp(TLSConnection(hostname = self.host_ip), 
    
                                      transform=EtreeTransform())
                    break
    
                except Exception:
    
                    logging.error(f"Connection error with the gmp endpoint. Remaining {retry} retries")
                    retry -= 1
                    sleep(0.5)
    
            logging.debug('GMP Client Created')
    
            if retry == 0:
    
                raise GvmException("Impossible connect to the gmp endpoint even after 5 retries")
    
                
            self.client.authenticate(self.auth_name, self.auth_passwd)
    
            logging.debug('GMP Client Authenticated')
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    
        def get_client(self):
            return self.client
    
    
        def get_version(self) -> str:
            res = self.client.get_version()
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            return str(res.xpath('version/text()')[0])
    
    class ReportManager():
        # CLASSIFICATION configuration
        SEVERITY_THR = 4
        MSG_OK = "OK"
        MSG_NOK = "NOK"
        DEFAULT_SEVERITY = -1
        MAX_SEVERITY = 9
        DEFAULT_THREAT = "None"
    
        # REPORT Keywords
        REPORT_DEPLOYMENT = "deployment"
        REPORT_GLOBAL = "global"
        REPORT_SEVERITY = "severity"
        REPORT_THREAT = "threat"
    
        # OIDS Classes
        OID_ACCEPTED = 'accepted-oids'
        OID_NEW = 'new-oids'
        OID_DROPPED = 'dropped-oids'
        OID_OS = 'os-related-oids'
        OID_CLASSES = (OID_ACCEPTED, OID_NEW, OID_DROPPED, OID_OS)
    
        # OS security repository configuration
        OS_GIT_REPO = "baltig.infn.it/infn-cloud/os_security_checks.git"
        OS_SEC_BRANCH = "new-oids"
        OS_SEC_USER = "GIT_OS_SEC_USER"
        OS_SEC_TOKEN = "GIT_OS_SEC_TOKEN"
        OS_SEC_DEST_DIR = "os-sc-repo"
        OS_SEC_FILENAME = "os-oids.yaml"
        OS_COMMIT_MESSAGE = 'Added oid(s)'
    
        # Security scans repository configuration
        SS_GIT_REPO = "baltig.infn.it/infn-cloud/security-scans.git"
        SS_SEC_USER = "GIT_SEC_USER"
        SS_SEC_TOKEN = "GIT_SEC_TOKEN"
        SS_SEC_DEST_DIR = "ss-repo"
        SS_SEC_CHILD_DIR = "queues"
        SS_SEC_ACCEPTED_FILES = ['accepted.txt']
        SS_SEC_KNOWN_FILES = ['held.txt', 'new.txt', 'overridden.txt']
    
        def __init__(self, os_name: str, is_os: bool) -> None:
            self.os_name = os_name
            self.is_os = is_os
            self.imported_oids: Dict[str, List[ResultReport]] = dict()
            self.import_os_sec_repo()
            self.import_security_oids()
    
        def import_yaml_file(self) -> dict:
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            if os.path.isfile(self.os_file):
                with open(self.os_file, 'r') as ifile:
                    oids = yaml.load(ifile)
                return oids
            else:
                return dict()
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    
        def import_os_sec_repo(self):
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            self.os_file = os.path.join(self.OS_SEC_DEST_DIR,
                                        self.OS_SEC_FILENAME)
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            user = os.environ.get(self.OS_SEC_USER)
            token = os.environ.get(self.OS_SEC_TOKEN)
            repo_url = f"https://{user}:{token}@{self.OS_GIT_REPO}"
    
            try:
                git.Repo.clone_from(repo_url,
                                    self.OS_SEC_DEST_DIR,
                                    branch = self.OS_SEC_BRANCH)
            except Exception as e:
                logging.error(f"Impossible clone the os scans repository, {e}")
                self.os_oids = dict()
                self.os_all_oids = []
            else:
                os_oids = self.import_yaml_file()
                logging.debug("Imported host os security oids")
                logging.debug(pretty_json(os_oids))
    
                if not isinstance(os_oids, dict):
                    logging.warning("Impossible parse the oids yaml file")
                    self.os_oids = dict()
                    self.os_all_oids = []
                else:
                    self.os_oids = os_oids
                    try:
                        self.os_all_oids = list(reduce(lambda x,y: x + y,
                                                       os_oids.values()))
                        logging.debug("Imported os security oids")
                        logging.debug(pretty_json(self.os_all_oids))
    
                    except Exception as e:
                        logging.warning("Impossible extract oids from imported yaml")
                        self.os_all_oids = []
    
        def import_security_oids(self) -> None:
            user = os.environ.get(self.SS_SEC_USER)
            token = os.environ.get(self.SS_SEC_TOKEN)
            repo_url = f"https://{user}:{token}@{self.SS_GIT_REPO}"
            files_dir = os.path.join(self.SS_SEC_DEST_DIR,self.SS_SEC_CHILD_DIR)
            try:
                git.Repo.clone_from(repo_url, self.SS_SEC_DEST_DIR)
            except Exception as e:
                logging.warning(f"Impossible clone the ss scans repository, {e}")
                self.accepted_oids = []
                self.known_oids = []
            else:
                accepted_oids: List[str] = []
                known_oids: List[str] = []
    
                for f in self.SS_SEC_ACCEPTED_FILES:
                    filename = os.path.join(files_dir,f)
                    with open(filename, 'r') as file:
                        accepted_oids += [line.strip() for line in file.readlines()
                                            if not line.startswith('#')]
                for f in self.SS_SEC_KNOWN_FILES:
                    filename = os.path.join(files_dir,f)
                    with open(filename, 'r') as file:
                        known_oids += [line.strip() for line in file.readlines()
                                        if not line.startswith('#')]
                self.accepted_oids = accepted_oids
                self.known_oids = known_oids
    
        def import_report(self, host: str, report: List[ResultReport]):
            self.imported_oids[host] = report
    
        def init_glob_vars(self):
            self.report = dict()
            self.report[self.REPORT_DEPLOYMENT] = {self.REPORT_SEVERITY: self.DEFAULT_SEVERITY,
                                                   self.REPORT_THREAT: self.DEFAULT_THREAT}
            self.oids = dict()
    
        def init_host_vars(self,host: str, r: ResultReport):
            self.oids[host] = {self.OID_ACCEPTED: [],
                               self.OID_DROPPED: [],
                               self.OID_NEW: [],
                               self.OID_OS: []}
            self.report[host] = dict()
            self.report[host][r.port] = {self.REPORT_SEVERITY: self.DEFAULT_SEVERITY,
                                    self.REPORT_THREAT: self.DEFAULT_THREAT}
            self.report[host][self.REPORT_GLOBAL] = {self.REPORT_SEVERITY: self.DEFAULT_SEVERITY,
                                                     self.REPORT_THREAT: self.DEFAULT_THREAT}
    
        def update_summary(self,host, r: ResultReport) -> None:
            # Evaluate max port severity per host
            if r.port not in self.report[host] or \
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                    r.severity > self.report[host][r.port][self.REPORT_SEVERITY]:
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                self.report[host][r.port] = {self.REPORT_SEVERITY: r.severity,
                                        self.REPORT_THREAT: r.threat}
    
            # Evaluate max global severity per host
            if r.severity > self.report[host][self.REPORT_GLOBAL][self.REPORT_SEVERITY]:
                self.report[host][self.REPORT_GLOBAL] = {self.REPORT_SEVERITY: r.severity,
                                                         self.REPORT_THREAT: r.threat}
    
            # Evaluate Global max severity
            if r.severity > self.report[self.REPORT_DEPLOYMENT][self.REPORT_SEVERITY]:
                self.report[self.REPORT_DEPLOYMENT] = {self.REPORT_SEVERITY: r.severity,
                                                       self.REPORT_THREAT: r.threat}
    
        def classify_reports(self) -> None:
    
            # Init global aggregated variables
            self.init_glob_vars()
    
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            for host, host_report in self.imported_oids.items():
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                for res_report in host_report:
    
                    # Init aggregated variables per host
                    self.init_host_vars(host,res_report)
    
                    # Skip if oid is not relevant
                    if res_report.severity < self.SEVERITY_THR: continue
    
                    # Classify oid
                    if not self.is_os and res_report.oid in self.os_all_oids:
                        self.oids[host][self.OID_OS].append(res_report)
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                    elif res_report.oid in self.accepted_oids:
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                        self.oids[host][self.OID_ACCEPTED].append(res_report)
                        self.update_summary(host, res_report)
                    elif res_report.oid in self.known_oids:
                        self.oids[host][self.OID_DROPPED].append(res_report)
                    else:
                        self.oids[host][self.OID_NEW].append(res_report)
                        self.update_summary(host, res_report)
    
            # Extract global estimation
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            if self.report[self.REPORT_DEPLOYMENT][self.REPORT_SEVERITY] < self.SEVERITY_THR:
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                self.report[self.REPORT_GLOBAL] = self.MSG_OK
            else:
                self.report[self.REPORT_GLOBAL] = self.MSG_NOK
    
        def get_summary(self) -> str:
            return pretty_json(self.report)
    
        def get_classified_oids(self) -> str:
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            json_oids = self.oids.copy()
            for host, datas in self.oids.items():
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                for key, oids in datas.items():
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                    json_oids[host][key] = [str(o) for o in oids]
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                if self.is_os:
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                    del datas[self.OID_OS]
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
            return pretty_json(self.oids)
    
        def create_msg(self, r: ResultReport):
            msg =  f"    Detected oid: {r.oid}, severity: {r.severity}"
            msg += f", threat: {r.threat} and port: {r.port}\n"
            return msg
    
        def write_data(self,
                       summary_filename: str,
                       oids_filename: str):
    
            # Save on file report summary
            self.write_summary(summary_filename)
    
            # If this script scanned a single_vm deployment
            # delete the empty OID_OS section in oid and
            # push the new os oids
            if self.is_os:
                del self.oids[self.OID_OS]
                self.write_new_oids()
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
                #self.push_new_oids()
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    
            # Save on file all classifies oids
            self.write_oids(oids_filename)
    
        def write_new_oids(self) -> None:
            # Overwrite the detected oids to the host oids
            self.os_oids[self.os_name] = self.oids[self.OID_ACCEPTED] + \
                                         self.oids[self.OID_NEW]
            with open(self.os_file, 'w') as file:
                yaml.dump(self.os_oids, file)
    
        def write_oids(self, oids_filename) -> None:
            with open(oids_filename, 'w') as file:
                yaml.dump(self.os_oids, file)
    
        def push_new_oids(self):
            repo = git.Repo(self.OS_SEC_DEST_DIR)
            repo.git.add(self.os_file)
            repo.index.commit(self.OS_COMMIT_MESSAGE)
            origin = repo.remote('origin')
            origin.push()
            logging.info("New oid file successfully pushed")
    
        def write_summary(self, summary_filename) -> None:
            with open(summary_filename, 'w') as file:
                yaml.dump(self.report, file)