Skip to content
Snippets Groups Projects
gvm_library.py 29.9 KiB
Newer Older
from gvm.connections import TLSConnection
from gvm.protocols.gmpv208 import Gmp, AliveTest 
from gvm.transforms import EtreeTransform
from gvm.xml import pretty_print
from time import time, sleep
import logging
import base64
import json
Gioacchino Vino's avatar
Gioacchino Vino committed
from typing import Optional, Dict, List, Tuple, Set
Gioacchino Vino's avatar
Gioacchino Vino committed
import yaml
from functools import reduce
import os
import git
Gioacchino Vino's avatar
Gioacchino Vino committed
import pandas as pd
# GVM Xpath Constants
Gioacchino Vino's avatar
Gioacchino Vino committed
GVM_XPATH_ID = '@id'
GVM_XPATH_NAME_TEXT = 'name/text()'
GVM_XPATH_REPORT_ID_TEXT = "report_id/text()"
GVM_XPATH_STATUS = '@status'
GVM_XPATH_STATUS_TEXT = '@status_text'
GVM_XPATH_STATUS_TEXT_2 = '@status/text'
Gioacchino Vino's avatar
Gioacchino Vino committed
GVM_XPATH_STATUS_TEXT_3 = 'status/text()'
GVM_XPATH_PROGRESS_TEXT = 'progress/text()'
GVM_XPATH_INUSE_TEXT = 'in_use/text()'
GVM_XPATH_LAST_REPORT_ID = 'last_report/report/@id'
GVM_XPATH_REPORT_TEXT = 'report/text()'

# GVM Status Constants
GVM_STATUS_OK = "200"
GVM_STATUS_CREATE_OK = "201"

# Custom Exceptions
class GvmException(Exception):
    pass
# More Readable print function
Gioacchino Vino's avatar
Gioacchino Vino committed
def pretty_json(j) -> str:
    return json.dumps(j,sort_keys=True,indent=4)

# Class containing config ids
class Configs:
    config = "9866edc1-8869-4e80-acac-d15d5647b4d9"
    scanner = "08b69003-5fc2-4037-a479-93b440211c73"
    ovs_ssh_credential = "b9af5845-8b87-4378-bca4-cee39a894c17"

# Class containining format ids
class ReportFormats:
    anonymous_xml = "5057e5cc-b825-11e4-9d0e-28d24461215b"
    csv_results   = "c1645568-627a-11e3-a660-406186ea4fc5"
    itg           = "77bd6c4a-1f62-11e1-abf0-406186ea4fc5"
    pdf           = "c402cc3e-b531-11e1-9163-406186ea4fc5"
    txt           = "a3810a62-1f62-11e1-9219-406186ea4fc5"
    xml           = "a994b278-1f62-11e1-96ac-406186ea4fc5"

Gioacchino Vino's avatar
Gioacchino Vino committed
class ResultReport():
    oid: str
    severity: float
    threat: str
    port: str

Gioacchino Vino's avatar
Gioacchino Vino committed
    def __init__(self, o, s, t, p):
        self.oid = str(o)
        self.severity = float(s)
        self.threat = str(t)
        self.port = str(p)

Gioacchino Vino's avatar
Gioacchino Vino committed
    def __str__(self):
Gioacchino Vino's avatar
Gioacchino Vino committed
        msg = f"{self.oid}, {self.severity}, "
        msg += f"{self.threat}, {self.port}"
Gioacchino Vino's avatar
Gioacchino Vino committed
        return msg

class PortList:
    """
    This class helps the managing of the GVM port_list object
    
    Attributes:
        client = client used to interact with gvm server (created from GVMClient class)
        name: str = name of port list object
        id: str = id returned after the creation
        in_use: str = state if the port_list object is in use
    """

Gioacchino Vino's avatar
Gioacchino Vino committed
    def __init__(self,
                 client, 
                 name: str, 
                 ports: List[str]):
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.client = client
        self.name = name
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.ports = ','.join(ports)
        
        # Retrieve port_list objs by name
        res = self.__get_info(filter = name)
Gioacchino Vino's avatar
Gioacchino Vino committed
        if len(res) == 0: 
            # If no result retrieved, create it
            self.create()
        
        else:
Gioacchino Vino's avatar
Gioacchino Vino committed
            logging.debug("Already created. Collected from server")
Gioacchino Vino's avatar
Gioacchino Vino committed
            if len(res) > 1:
                # If one result has been collected, consider the first one
                msg = f"The port_list name {name} retrieved {len(res)} results"
                logging.warning(msg)
                logging.warning("The first one will be considered")
            
            # If one result has been collected, consider it
            self.name = res[0]['name']
            self.id = res[0]['id']
            self.in_use = res[0]['in_use']
 
    # Search port_lists by id/name
    def __get_info(self, filter: str = "rows=-1") -> List[Dict[str, str]]:
        res = []
        pls = self.client.get_port_lists(filter_string = filter) \
                         .xpath('port_list')
        
        for pl in pls:
            pl_name = str(pl.xpath(GVM_XPATH_NAME_TEXT)[0])
            pl_id = str(pl.xpath(GVM_XPATH_ID)[0])
            pl_in_use = str(pl.xpath(GVM_XPATH_INUSE_TEXT)[0])
            res.append({"name": pl_name, 
                        "id": pl_id, 
                        "in_use": pl_in_use})
        return res

    def create(self) -> None:
        res = self.client.create_port_list(self.name, self.ports)
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_CREATE_OK:
            pl_id = str(res.xpath(GVM_XPATH_ID)[0])
            res = self.__get_info(filter = pl_id)
            
            if len(res) > 0:
                if len(res) > 1:
                    # Multiple objs retrieved, consider the first one
                    msg = f"The port_list name {self.name}"
                    msg += f" retrieved {len(res)} results"
                    logging.warning(msg)
                    logging.warning("The first one will be considered")
                self.name = res[0]['name']
                self.id = res[0]['id']
                self.in_use = res[0]['in_use']
                msg = "Created port list obj. "
                msg += f"Name: {self.name}, id: {self.id}, ports: {self.ports}"
                logging.debug(msg)

            else:
                # No obj retrieved. Error during creation
                msg = f"The port_list name {self.name} retrieved 0 results after creations"
                logging.error(msg)

        else:
            msg = "ERROR during Port list creation. "
            msg += f"Status code: {status}, msg: {status_text}"
            logging.error(msg)
            raise GvmException(msg) 

    def __str__(self):
        d = {'name': self.name, 
             'id': self.id, 
             'in_use': self.in_use}
        return pretty_json(d)
    
    def delete(self):
Gioacchino Vino's avatar
Gioacchino Vino committed
        logging.debug(f"Deletion port_list {self.name}")
        res = self.client.delete_port_list(self.id)
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.client = None
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_OK:
            logging.info(f"Port_list {self} DELETED") 
        else:
            logging.error(f"ERROR during the port_list deletion {status}: {status_text}")
class Target:
    """
    This class helps the managing of the GVM target object
    
    Attributes:
        client = client used to interact with gvm server (created from GVMClient class)
        name: str = name of target object
        id: str = id returned after the creation
        in_use: str = state if the target object is in use
    """
    
    def __init__(self,
Gioacchino Vino's avatar
Gioacchino Vino committed
                 client,
                 name: str,
Gioacchino Vino's avatar
Gioacchino Vino committed
                 host: str,
Gioacchino Vino's avatar
Gioacchino Vino committed
                 port_list: PortList):
        self.client = client
        self.name = name
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.host = host
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.pl = port_list
        
        # Retrieve targets objs by name
        res = self.__get_info(filter = name)
        
        if len(res) == 0: 
            # If no result retrieved, create it
            self.create()
        
        else:
Gioacchino Vino's avatar
Gioacchino Vino committed
            logging.debug("Already created. Collected from server")
Gioacchino Vino's avatar
Gioacchino Vino committed
            if len(res) > 1:
                # If one result has been collected, consider the first one
                msg = f"The target name {name} retrieved {len(res)} results"
                logging.warning(msg)
                logging.warning("The first one will be considered")
            
            # If one result has been collected, consider it
            self.name = res[0]['name']
            self.id = res[0]['id']
            self.in_use = res[0]['in_use']
 
    def __get_info(self, filter: str) -> List[Dict[str, str]]:
        res = []
        targets = self.client.get_targets(filter_string = filter) \
                             .xpath('target')
        for target in targets:
            t_name = str(target.xpath(GVM_XPATH_NAME_TEXT)[0])
            t_id  = str(target.xpath(GVM_XPATH_ID)[0])
            t_in_use = str(target.xpath(GVM_XPATH_INUSE_TEXT)[0])
            res.append({"name": t_name, 
                        "id": t_id, 
                        "in_use": t_in_use})
        return res  
    
    def create(self) -> None:
        res = self.client.create_target(
                name = self.name,
                comment = "",
                hosts = [self.host],
                port_list_id = self.pl.id,
                ssh_credential_id = Configs.ovs_ssh_credential,
                alive_test = AliveTest('Consider Alive'))
        
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_CREATE_OK:
            t_id = str(res.xpath(GVM_XPATH_ID)[0])
            res = self.__get_info(filter = t_id)
            
            if len(res) == 0:
                # No obj retrieved. Error during creation
                msg = f"The target name {self.name} retrieved 0 results after creation"
                logging.error(msg)
            else:
                if len(res) > 1:
                    # Multiple objs retrieved, consider the first one
                    msg = f"The target id {t_id} retrieved {len(res)} results"
                    logging.warning(msg)
                    logging.warning("The first one will be considered")   

                self.name = res[0]['name']
                self.id = res[0]['id']
                self.in_use = res[0]['in_use']
                msg = "Created target obj. "
Gioacchino Vino's avatar
Gioacchino Vino committed
                msg += f"Name: {self.name}, id: {self.id}, host: {self.host}"
Gioacchino Vino's avatar
Gioacchino Vino committed
                logging.debug(msg)
        else:
            msg = "ERROR during Target creation. "
            msg += f"Status code: {status}, msg: {status_text}"
            raise GvmException(msg)
        
    def __str__(self):
        d = {'name': self.name, 
             'id': self.id, 
             "in_use": self.in_use,
Gioacchino Vino's avatar
Gioacchino Vino committed
             'host': self.host}
        return pretty_json(d)
    
    def delete(self):
Gioacchino Vino's avatar
Gioacchino Vino committed
        logging.debug(f"Deletion target {self.name}")
        res = self.client.delete_target(self.id)
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.client = None
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_OK:
            logging.info(f"Target {self} DELETED") 
        else:
            logging.error(f"ERROR during the target deletion {status}: {status_text}")

class Task:
    """
    This class helps the managing of the GVM task object
    
    Attributes:
        client = client used to interact with gvm server (created from GVMClient class)
        name: str = name of task object
        id: str = id returned after the creation
        in_use: str = state if the task object is in use
        report_id: str = report id once task is completed
        
    Methods:
        start = starts the task
        stop = stops the task
        delete = deletes the tasks
        get_progress = retrieves current task status
        wait = waits for the task is completed
        save_report = saves report on file
        get_report_info = retrieves report information
    """
Gioacchino Vino's avatar
Gioacchino Vino committed
    # Constants
Gioacchino Vino's avatar
Gioacchino Vino committed
    WAIT_SECONDS = 10
Gioacchino Vino's avatar
Gioacchino Vino committed

    def __init__(self, 
Gioacchino Vino's avatar
Gioacchino Vino committed
                 client,
                 name: str, 
                 target: Target) -> None:
        self.client = client
        self.name = name
        self.target = target
Gioacchino Vino's avatar
Gioacchino Vino committed

        # Retrieve task objs by name
        res = self.__get_info(filter = name)

        if len(res) == 0: 
            # If no result retrieved, create it
            self.create()
        else:
Gioacchino Vino's avatar
Gioacchino Vino committed
            logging.debug("Already created. Collected from server")
Gioacchino Vino's avatar
Gioacchino Vino committed
            if len(res) > 1:
                # If one result has been collected, consider the first one
                msg = f"The port_list name {name} retrieved {len(res)} results"
                logging.warning(msg)
                logging.warning("The first one will be considered")

            self.name = res[0]['name']
            self.id = res[0]['id']
            self.in_use = res[0]['in_use']
            self.status = res[0]['status']
            self.report_id = res[0].get("report_id", None)

    def __get_info(self, filter: str) -> List[dict]: 
        res = []
        tasks = self.client.get_tasks(filter_string = filter) \
                           .xpath('task')
        for t in tasks:
            t_name = str(t.xpath(GVM_XPATH_NAME_TEXT)[0])
            t_id  = str(t.xpath(GVM_XPATH_ID)[0])
            t_in_use = str(t.xpath(GVM_XPATH_INUSE_TEXT)[0])
            t_status = str(t.xpath(GVM_XPATH_STATUS_TEXT_3)[0])
            t_dict = {"name": t_name, 
                      "id": t_id, 
                      "in_use": t_in_use, 
                      "status": t_status}
            try:
                t_report_id = t.xpath(GVM_XPATH_LAST_REPORT_ID)[0]
            except Exception:
                pass
            else:
                t_dict['report_id'] = t_report_id
            
            res.append(t_dict)
        return res    

    def create(self) -> None:
        res = self.client.create_task(
                name = self.name,
                config_id = Configs.config,
                target_id = self.target.id,
                scanner_id = Configs.scanner)
        
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        
        if status == GVM_STATUS_CREATE_OK:
            t_id = str(res.xpath(GVM_XPATH_ID)[0])
            res = self.__get_info(filter = t_id)
            if len(res) == 0:
                # No obj retrieved. Error during creation
                msg = f"The task id {t_id} retrieved 0 results after creation"
                logging.error(msg)
            else:
                if len(res) > 1:
                    # Multiple objs retrieved, consider the first one
                    msg = f"The task id {t_id} retrieved {len(res)} results"
                    logging.warning(msg)
                    logging.warning("The first one will be considered")

Gioacchino Vino's avatar
Gioacchino Vino committed
                self.name = res[0]['name']
                self.id = res[0]['id']
                self.in_use = res[0]['in_use']
                self.status = res[0]['status']
Gioacchino Vino's avatar
Gioacchino Vino committed
                self.report_id = res[0].get("report_id", None)
Gioacchino Vino's avatar
Gioacchino Vino committed
                msg = "Created task obj. "
                msg += f"Name: {self.name}, id: {self.id}"
                logging.debug(msg)
Gioacchino Vino's avatar
Gioacchino Vino committed
        else:
            msg = "ERROR during Task creation. "
            msg += f"Status code: {status}, msg: {status_text}"
            raise GvmException(msg)
        
    def __str__(self):
        d = {'name': self.name, 
             'id': self.id, 
             'in_use': self.in_use,
             'status': self.status,
             'report_id': self.report_id}
        return pretty_json(d)
    
    def start(self):
        res = self.client.start_task(self.id)
        self.report_id = res.xpath(GVM_XPATH_REPORT_ID_TEXT)[0]
        logging.info(f"Task {self} STARTED") 

    def stop(self):
        res = self.client.stop_task(self.id)
        pretty_print(res)
        logging.info(f"Task {self} STARTED") 

    def delete(self):
Gioacchino Vino's avatar
Gioacchino Vino committed
        logging.debug(f"Deletion task {self.name}")
        res = self.client.delete_task(self.id)
        self.client = None
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_OK:
            logging.info(f"Task {self} DELETED") 
        else:
            logging.error(f"ERROR during the task deletion {status}: {status_text}")
    def update_status(self):
Gioacchino Vino's avatar
Gioacchino Vino committed
        task_info = self.client.get_tasks(filter_string = self.id) \
                               .xpath('task')[0]
        self.name = task_info.xpath(GVM_XPATH_NAME_TEXT)[0]
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.status = task_info.xpath(GVM_XPATH_STATUS_TEXT_3)[0]       #   New -> Requested -> Queued -> Running  -> Done
        self.progress = int(task_info.xpath(GVM_XPATH_PROGRESS_TEXT)[0])#    0         0           0      0 -> 100     -1
        self.in_use = task_info.xpath(GVM_XPATH_INUSE_TEXT)[0]
            self.report_id = task_info.xpath(GVM_XPATH_LAST_REPORT_ID)[0]
        except Exception:
Gioacchino Vino's avatar
Gioacchino Vino committed

    def wait(self, timeout: int = 7200) -> bool:
        start_time = time()
        logging.debug("Waiting for scans ends the task")
        while True:
Gioacchino Vino's avatar
Gioacchino Vino committed
            self.update_status()
            if self.status not in ["New","Requested","Queued","Processing", "Running","Done"]: # ["Interrupted", ...]
                logging.warning(f"Task in the undesired status: '{self.status}'")
                return False
            if self.status == "Done" and self.progress == -1:
                logging.info("Task completed")
                return True
            if time() - start_time > timeout:
                logging.error("TIMEOUT during waiting for task ending")
                return False
            logging.debug(f"Waiting for the task ends. Now {int(time() - start_time)}s from start. Status: {self.status}")
Gioacchino Vino's avatar
Gioacchino Vino committed
            sleep(self.WAIT_SECONDS)
    def save_report(self, format: str, filename: str):
        res = self.client.get_report(self.report_id,
                                     report_format_id=format, 
                                     ignore_pagination=True,
                                     details=True)
        code = str(res.xpath(GVM_XPATH_REPORT_TEXT)[0])
        with open(filename, "wb") as fh:
            fh.write(base64.b64decode(code))
    def get_report(self) -> Dict[str,Tuple]:
        res = self.client.get_report(self.report_id,
                                     report_format_id=ReportFormats.anonymous_xml,
                                     ignore_pagination=True,
                                     details="1")
        oids: tuple[str] = tuple(res.xpath('report/report/results/result/nvt/@oid'))
        sev: tuple[str] = tuple(res.xpath('report/report/results/result/nvt/severities/@score'))
        threat: tuple[str] = tuple(res.xpath('report/report/results/result/threat/text()'))
        ports: tuple[str] = tuple(res.xpath('report/report/results/result/port/text()'))
        sev: tuple[float] = tuple(map(float,sev))
        return {"oids":oids, "severity":sev, "threat":threat, "ports":ports}
    
class GVMClient():
    """
    This class provides API to interact with GVM in order to 
    get, create and delete port_lists, targets, tasks and reports
    """
    
    CONNECTION_RETRIES = 5
    LOCAL_IP = "127.0.0.1"
    
    def __init__(self, 
                 auth_n: str, 
                 auth_p: str, 
                 host_ip: str = LOCAL_IP):
Gioacchino Vino's avatar
Gioacchino Vino committed
        logging.info("GVM Client Iniziatation started...")
        self.auth_name = auth_n
        self.auth_passwd = auth_p 
        self.host_ip = host_ip
        self.client = None
        self.create_client()
Gioacchino Vino's avatar
Gioacchino Vino committed
        logging.info("GVM Client Iniziatation completed")


    def create_client(self):
        retry = self.CONNECTION_RETRIES
        while(retry > 0):
            try:
                logging.debug('Creation of the GMP Client')
                logging.debug(f'host_ip: {self.host_ip}')
                self.client = Gmp(TLSConnection(hostname = self.host_ip), 
                                  transform=EtreeTransform())
                break
            except Exception:
                logging.error(f"Connection error with the gmp endpoint. Remaining {retry} retries")
                retry -= 1
                sleep(0.5)
        logging.debug('GMP Client Created')
        if retry == 0:
            raise GvmException("Impossible connect to the gmp endpoint even after 5 retries")
            
        self.client.authenticate(self.auth_name, self.auth_passwd)
        logging.debug('GMP Client Authenticated')
Gioacchino Vino's avatar
Gioacchino Vino committed

    def get_client(self):
        return self.client

    def get_version(self) -> str:
        res = self.client.get_version()
Gioacchino Vino's avatar
Gioacchino Vino committed
        return str(res.xpath('version/text()')[0])

class ReportManager():
    # CLASSIFICATION configuration
    SEVERITY_THR = 4
    MSG_OK = "OK"
    MSG_NOK = "NOK"
    DEFAULT_SEVERITY = -1
    MAX_SEVERITY = 9
    DEFAULT_THREAT = "None"

    # REPORT Keywords
    REPORT_DEPLOYMENT = "deployment"
    REPORT_GLOBAL = "global"
    REPORT_SEVERITY = "severity"
    REPORT_THREAT = "threat"
    REPORT_PORTS = "ports"
Gioacchino Vino's avatar
Gioacchino Vino committed

    # OIDS Classes
    OID_ACCEPTED = 'accepted-oids'
    OID_NEW = 'new-oids'
    OID_DROPPED = 'dropped-oids'
    OID_OS = 'os-related-oids'

    # OS security repository configuration
    OS_GIT_REPO = "baltig.infn.it/infn-cloud/os_security_checks.git"
    OS_SEC_BRANCH = "new-oids"
    OS_SEC_USER = "GIT_OS_SEC_USER"
    OS_SEC_TOKEN = "GIT_OS_SEC_TOKEN"
    OS_SEC_DEST_DIR = "os-sc-repo"
    OS_SEC_FILENAME = "os-oids.yaml"
    OS_COMMIT_MESSAGE = 'Added oid(s)'

    # Security scans repository configuration
    SS_GIT_REPO = "baltig.infn.it/infn-cloud/security-scans.git"
    SS_SEC_USER = "GIT_SEC_USER"
    SS_SEC_TOKEN = "GIT_SEC_TOKEN"
    SS_SEC_DEST_DIR = "ss-repo"
    SS_SEC_CHILD_DIR = "queues"
    SS_SEC_ACCEPTED_FILES = ['accepted.txt']
    SS_SEC_KNOWN_FILES = ['held.txt', 'new.txt', 'overridden.txt']

    # Classification configuration
    LABEL_COLUMN = "label"
    LABEL_NEW_VULNS = "NEW"
    LABEL_ACKNOWLEDGED_VULNS = "ACKNOWLEDGED"
    LABEL_REJECTED_VULNS = "REJECTED"
    LABEL_OS_RELATED_VULNS = "OS_RELATED"

Gioacchino Vino's avatar
Gioacchino Vino committed
    def __init__(self, os_name: str, is_os: bool) -> None:
Gioacchino Vino's avatar
Gioacchino Vino committed
        logging.info("Report Manager Iniziatation started...")
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.os_name = os_name
        self.is_os = is_os
        self.imported_oids: Dict[str, List[ResultReport]] = dict()
        self.import_os_sec_repo()
        self.import_security_oids()
Gioacchino Vino's avatar
Gioacchino Vino committed
        logging.info("Report Manager Iniziatation completed")
Gioacchino Vino's avatar
Gioacchino Vino committed

    def import_yaml_file(self) -> dict:
Gioacchino Vino's avatar
Gioacchino Vino committed
        if os.path.isfile(self.os_file):
            with open(self.os_file, 'r') as ifile:
Gioacchino Vino's avatar
Gioacchino Vino committed
                oids = yaml.load(ifile, Loader=yaml.FullLoader)
Gioacchino Vino's avatar
Gioacchino Vino committed
            return oids
        else:
            return dict()
Gioacchino Vino's avatar
Gioacchino Vino committed

    def import_os_sec_repo(self):
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.os_file = os.path.join(self.OS_SEC_DEST_DIR,
                                    self.OS_SEC_FILENAME)
Gioacchino Vino's avatar
Gioacchino Vino committed
        user = os.environ.get(self.OS_SEC_USER)
        token = os.environ.get(self.OS_SEC_TOKEN)
        repo_url = f"https://{user}:{token}@{self.OS_GIT_REPO}"

        try:
            git.Repo.clone_from(repo_url,
                                self.OS_SEC_DEST_DIR,
                                branch = self.OS_SEC_BRANCH)
        except Exception as e:
            logging.error(f"Impossible clone the os scans repository, {e}")
            self.os_oids = dict()
            self.os_all_oids = []
        else:
            os_oids = self.import_yaml_file()
            logging.debug("Imported host os security oids")
            logging.debug(pretty_json(os_oids))

            if not isinstance(os_oids, dict):
                logging.warning("Impossible parse the oids yaml file")
                self.os_oids = dict()
                self.os_all_oids = []
            else:
                self.os_oids = os_oids
                try:
Gioacchino Vino's avatar
Gioacchino Vino committed
                    self.os_all_oids = tuple(set(reduce(lambda x,y: x + y,
                                                   os_oids.values())))
Gioacchino Vino's avatar
Gioacchino Vino committed
                    logging.debug("Imported os security oids")
                    logging.debug(pretty_json(self.os_all_oids))

                except Exception as e:
                    logging.warning("Impossible extract oids from imported yaml")
                    self.os_all_oids = []

Gioacchino Vino's avatar
Gioacchino Vino committed
    def extract_oids(self, lines: List[str]) -> Tuple[str]:
Gioacchino Vino's avatar
Gioacchino Vino committed
        oids: List[str] = list()
        for line in lines:
            line = line.strip()
            if line.startswith('#'): continue
Gioacchino Vino's avatar
Gioacchino Vino committed
            if len(v_line := line.split(" ")[0]) > 0:
                oids.append(v_line)

Gioacchino Vino's avatar
Gioacchino Vino committed
        return tuple(set(oids))
Gioacchino Vino's avatar
Gioacchino Vino committed
    def import_security_oids(self) -> None:
        user = os.environ.get(self.SS_SEC_USER)
        token = os.environ.get(self.SS_SEC_TOKEN)
        repo_url = f"https://{user}:{token}@{self.SS_GIT_REPO}"
        files_dir = os.path.join(self.SS_SEC_DEST_DIR,self.SS_SEC_CHILD_DIR)
        try:
            git.Repo.clone_from(repo_url, self.SS_SEC_DEST_DIR)
        except Exception as e:
            logging.warning(f"Impossible clone the ss scans repository, {e}")
            self.accepted_oids = []
            self.known_oids = []
        else:
            accepted_oids: List[str] = []
            known_oids: List[str] = []

            for f in self.SS_SEC_ACCEPTED_FILES:
                filename = os.path.join(files_dir,f)
                with open(filename, 'r') as file:
Gioacchino Vino's avatar
Gioacchino Vino committed
                    accepted_oids += self.extract_oids(file.readlines())
            
Gioacchino Vino's avatar
Gioacchino Vino committed
            for f in self.SS_SEC_KNOWN_FILES:
                filename = os.path.join(files_dir,f)
                with open(filename, 'r') as file:
Gioacchino Vino's avatar
Gioacchino Vino committed
                    known_oids += self.extract_oids(file.readlines())

            self.accepted_oids = tuple(sorted(accepted_oids))
            self.known_oids = tuple(sorted(known_oids))
Gioacchino Vino's avatar
Gioacchino Vino committed
            logging.debug("accepted oids")
            logging.debug(pretty_json(self.accepted_oids))
            logging.debug("known oids")
            logging.debug(pretty_json(self.known_oids))
    def import_report(self, host: str, report: Dict[str,Tuple]) -> None:
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.imported_oids[host] = report
    def show_imported_reports(self) -> None:
        logging.debug("IMPORTED REPORTS")
        for host, report in self.imported_oids.items():
            logging.debug(f"HOST: {host}")
            logging.debug(f"\n{pd.DataFrame(report)}")
        logging.debug("")     
    def classify_reports(self) -> None: 
        TO_SOLVE_VULNS = [self.LABEL_NEW_VULNS,self.LABEL_ACKNOWLEDGED_VULNS]
        to_solve   = pd.Series({"oids": self.accepted_oids})
        to_exclude = pd.Series({"oids": self.known_oids})
        os_vulns   = pd.Series({"oids": self.os_all_oids})
        self.report, self.oids = dict(), dict()
        self.report[self.REPORT_DEPLOYMENT] = {self.REPORT_SEVERITY: self.DEFAULT_SEVERITY}
        for host, host_report in self.imported_oids.items():
            self.oids[host], self.report[host] = dict(), dict()
            # Create Pandas DataFrame from GreenBone report
            vulns = pd.DataFrame(host_report)
            # Add Label column
            vulns[self.LABEL_COLUMN] = self.LABEL_NEW_VULNS
            # Remove not important Vulnerabilties   
            vulns = vulns[vulns.severity >= self.SEVERITY_THR]
            # Label Acknowledged Vulnerabilities
            vulns.loc[vulns.oids.isin(to_solve.oids),
                      self.LABEL_COLUMN] = self.LABEL_ACKNOWLEDGED_VULNS

            # Label Excluded Vulnerabilities
            vulns.loc[vulns.oids.isin(to_exclude.oids),
                      self.LABEL_COLUMN] = self.LABEL_REJECTED_VULNS

            if not self.is_os:
                # Label Os Vulnerabilities
                vulns.loc[vulns.oids.isin(os_vulns.oids),
                          self.LABEL_COLUMN] = self.LABEL_OS_RELATED_VULNS
                # Collect Os Vulnerability oids
                self.oids[host][self.LABEL_OS_RELATED_VULNS] = \
                    vulns[vulns.label == self.LABEL_OS_RELATED_VULNS].oids.to_list()

            # Collect Acknowledged Vulnerability oids
            self.oids[host][self.LABEL_ACKNOWLEDGED_VULNS] = \
                vulns[vulns.label == self.LABEL_ACKNOWLEDGED_VULNS].oids.to_list()
            
            # Collect Rejected Vulnerability oids
            self.oids[host][self.LABEL_REJECTED_VULNS] = \
                vulns[vulns.label == self.LABEL_REJECTED_VULNS].oids.to_list()
            
            # Collect New Vulnerability oids
            self.oids[host][self.LABEL_NEW_VULNS] = \
                vulns[vulns.label == self.LABEL_NEW_VULNS].oids.to_list()
            
            # Collect Acknowledged and New Vulnerabilities to create To-Solve Dataframe
            to_solve = vulns[vulns[self.LABEL_COLUMN].isin(TO_SOLVE_VULNS)]

            # Extract Max Severity per "ports" parameter
            for ports, sev in to_solve.groupby(self.REPORT_PORTS).severity.max().items():
                self.report[host][ports] = {self.REPORT_SEVERITY: sev}
            
            # Compute Host Max Severity
            max_severity = to_solve.severity.max()
            self.report[host][self.REPORT_GLOBAL] = {self.REPORT_SEVERITY: max_severity}
            
            # Check if Host Max Severity is greater the Deployment Max Severity
            if max_severity > self.report[self.REPORT_DEPLOYMENT][self.REPORT_SEVERITY]:
                self.report[self.REPORT_DEPLOYMENT][self.REPORT_SEVERITY] = max_severity

            logging.debug(f"HOST: {host}")
            logging.debug(f"\n{vulns}")
        # Check if the Deployment Max Severity whether relevent or not
Gioacchino Vino's avatar
Gioacchino Vino committed
        if self.report[self.REPORT_DEPLOYMENT][self.REPORT_SEVERITY] < self.SEVERITY_THR:
Gioacchino Vino's avatar
Gioacchino Vino committed
            self.report[self.REPORT_GLOBAL] = self.MSG_OK
        else:
            self.report[self.REPORT_GLOBAL] = self.MSG_NOK
Gioacchino Vino's avatar
Gioacchino Vino committed
    def get_summary(self) -> str:
        return pretty_json(self.report)

    def get_classified_oids(self) -> str:
        return pretty_json(self.oids)
Gioacchino Vino's avatar
Gioacchino Vino committed

    def write_data(self,
                   summary_filename: str,
                   oids_filename: str):

        # Save on file report summary
        self.write_summary(summary_filename)

        # If this script scanned a single_vm deployment
        # delete the empty OID_OS section in oid and
        # push the new os oids
        if self.is_os:
            self.write_new_oids()
Gioacchino Vino's avatar
Gioacchino Vino committed
            self.push_new_oids()
Gioacchino Vino's avatar
Gioacchino Vino committed

        # Save on file all classifies oids
        self.write_oids(oids_filename)

    def write_new_oids(self) -> None:
        # Overwrite the detected oids to the host oids
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.os_oids[self.os_name] = []
        for _ , data in self.oids.items():
Gioacchino Vino's avatar
Gioacchino Vino committed
            self.os_oids[self.os_name] += [a.oid for a in data[self.OID_ACCEPTED]]
            self.os_oids[self.os_name] += [n.oid for n in data[self.OID_NEW]]
Gioacchino Vino's avatar
Gioacchino Vino committed
        with open(self.os_file, 'w') as f:
            yaml.dump(self.os_oids, f)
Gioacchino Vino's avatar
Gioacchino Vino committed

    def write_oids(self, oids_filename) -> None:
Gioacchino Vino's avatar
Gioacchino Vino committed
        with open(oids_filename, 'w') as f:
Gioacchino Vino's avatar
Gioacchino Vino committed
            f.write(self.get_classified_oids())
Gioacchino Vino's avatar
Gioacchino Vino committed

    def push_new_oids(self):
        repo = git.Repo(self.OS_SEC_DEST_DIR)
Gioacchino Vino's avatar
Gioacchino Vino committed
        repo.git.add(self.OS_SEC_FILENAME)
Gioacchino Vino's avatar
Gioacchino Vino committed
        repo.index.commit(self.OS_COMMIT_MESSAGE)
        origin = repo.remote('origin')
        origin.push()
        logging.info("New oid file successfully pushed")

    def write_summary(self, summary_filename) -> None:
Gioacchino Vino's avatar
Gioacchino Vino committed
        with open(summary_filename, 'w') as f:
            f.write(pretty_json(self.report))