Skip to content
Snippets Groups Projects
gvm_library.py 29.6 KiB
Newer Older
from gvm.connections import TLSConnection
from gvm.protocols.gmpv208 import Gmp, AliveTest 
from gvm.transforms import EtreeTransform
from gvm.xml import pretty_print
from time import time, sleep
import logging
import base64
import json
from typing import Optional, Dict, List, Tuple
Gioacchino Vino's avatar
Gioacchino Vino committed
import yaml
from functools import reduce
import os
import git
# GVM Xpath Constants
Gioacchino Vino's avatar
Gioacchino Vino committed
GVM_XPATH_ID = '@id'
GVM_XPATH_NAME_TEXT = 'name/text()'
GVM_XPATH_REPORT_ID_TEXT = "report_id/text()"
GVM_XPATH_STATUS = '@status'
GVM_XPATH_STATUS_TEXT = '@status_text'
GVM_XPATH_STATUS_TEXT_2 = '@status/text'
Gioacchino Vino's avatar
Gioacchino Vino committed
GVM_XPATH_STATUS_TEXT_3 = 'status/text()'
GVM_XPATH_PROGRESS_TEXT = 'progress/text()'
GVM_XPATH_INUSE_TEXT = 'in_use/text()'
GVM_XPATH_LAST_REPORT_ID = 'last_report/report/@id'
GVM_XPATH_REPORT_TEXT = 'report/text()'

# GVM Status Constants
GVM_STATUS_OK = "200"
GVM_STATUS_CREATE_OK = "201"

# Custom Exceptions
class GvmException(Exception):
    pass
# More Readable print function
Gioacchino Vino's avatar
Gioacchino Vino committed
def pretty_json(j) -> str:
    return json.dumps(j,sort_keys=True,indent=4)

# Class containing config ids
class Configs:
    config = "9866edc1-8869-4e80-acac-d15d5647b4d9"
    scanner = "08b69003-5fc2-4037-a479-93b440211c73"
    ovs_ssh_credential = "b9af5845-8b87-4378-bca4-cee39a894c17"

# Class containining format ids
class ReportFormats:
    anonymous_xml = "5057e5cc-b825-11e4-9d0e-28d24461215b"
    csv_results   = "c1645568-627a-11e3-a660-406186ea4fc5"
    itg           = "77bd6c4a-1f62-11e1-abf0-406186ea4fc5"
    pdf           = "c402cc3e-b531-11e1-9163-406186ea4fc5"
    txt           = "a3810a62-1f62-11e1-9219-406186ea4fc5"
    xml           = "a994b278-1f62-11e1-96ac-406186ea4fc5"

Gioacchino Vino's avatar
Gioacchino Vino committed
from dataclasses import dataclass
Gioacchino Vino's avatar
Gioacchino Vino committed
from json import JSONEncoder
Gioacchino Vino's avatar
Gioacchino Vino committed
class EmployeeEncoder(JSONEncoder):
    def default(self, o):
        return o.__dict__
        
Gioacchino Vino's avatar
Gioacchino Vino committed
@dataclass
class ResultReport():
    oid: str
    severity: float
    threat: str
    port: str

Gioacchino Vino's avatar
Gioacchino Vino committed
    def __str__(self):
Gioacchino Vino's avatar
Gioacchino Vino committed
        msg = f"{self.oid}, {self.severity}, "
        msg += f"{self.threat}, {self.port}"
Gioacchino Vino's avatar
Gioacchino Vino committed
        return msg

    def to_dict(self):
Gioacchino Vino's avatar
Gioacchino Vino committed
        data = {"oid": self.oid,
                "sev": self.severity,
                "threat": self.threat,
                "port": self.port}
Gioacchino Vino's avatar
Gioacchino Vino committed
        return data


class PortList:
    """
    This class helps the managing of the GVM port_list object
    
    Attributes:
        client = client used to interact with gvm server (created from GVMClient class)
        name: str = name of port list object
        id: str = id returned after the creation
        in_use: str = state if the port_list object is in use
    """

Gioacchino Vino's avatar
Gioacchino Vino committed
    def __init__(self,
                 client, 
                 name: str, 
                 ports: List[str]):
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.client = client
        self.name = name
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.ports = ','.join(ports)
        
        # Retrieve port_list objs by name
        res = self.__get_info(filter = name)
Gioacchino Vino's avatar
Gioacchino Vino committed
        if len(res) == 0: 
            # If no result retrieved, create it
            self.create()
        
        else:
Gioacchino Vino's avatar
Gioacchino Vino committed
            logging.debug("Already created. Collected from server")
Gioacchino Vino's avatar
Gioacchino Vino committed
            if len(res) > 1:
                # If one result has been collected, consider the first one
                msg = f"The port_list name {name} retrieved {len(res)} results"
                logging.warning(msg)
                logging.warning("The first one will be considered")
            
            # If one result has been collected, consider it
            self.name = res[0]['name']
            self.id = res[0]['id']
            self.in_use = res[0]['in_use']
 
    # Search port_lists by id/name
    def __get_info(self, filter: str = "rows=-1") -> List[Dict[str, str]]:
        res = []
        pls = self.client.get_port_lists(filter_string = filter) \
                         .xpath('port_list')
        
        for pl in pls:
            pl_name = str(pl.xpath(GVM_XPATH_NAME_TEXT)[0])
            pl_id = str(pl.xpath(GVM_XPATH_ID)[0])
            pl_in_use = str(pl.xpath(GVM_XPATH_INUSE_TEXT)[0])
            res.append({"name": pl_name, 
                        "id": pl_id, 
                        "in_use": pl_in_use})
        return res

    def create(self) -> None:
        res = self.client.create_port_list(self.name, self.ports)
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_CREATE_OK:
            pl_id = str(res.xpath(GVM_XPATH_ID)[0])
            res = self.__get_info(filter = pl_id)
            
            if len(res) > 0:
                if len(res) > 1:
                    # Multiple objs retrieved, consider the first one
                    msg = f"The port_list name {self.name}"
                    msg += f" retrieved {len(res)} results"
                    logging.warning(msg)
                    logging.warning("The first one will be considered")
                self.name = res[0]['name']
                self.id = res[0]['id']
                self.in_use = res[0]['in_use']
                msg = "Created port list obj. "
                msg += f"Name: {self.name}, id: {self.id}, ports: {self.ports}"
                logging.debug(msg)

            else:
                # No obj retrieved. Error during creation
                msg = f"The port_list name {self.name} retrieved 0 results after creations"
                logging.error(msg)

        else:
            msg = "ERROR during Port list creation. "
            msg += f"Status code: {status}, msg: {status_text}"
            logging.error(msg)
            raise GvmException(msg) 

    def __str__(self):
        d = {'name': self.name, 
             'id': self.id, 
             'in_use': self.in_use}
        return pretty_json(d)
    
    def delete(self):
Gioacchino Vino's avatar
Gioacchino Vino committed
        logging.debug(f"Deletion port_list {self.name}")
        res = self.client.delete_port_list(self.id)
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.client = None
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_OK:
            logging.info(f"Port_list {self} DELETED") 
        else:
            logging.error(f"ERROR during the port_list deletion {status}: {status_text}")
class Target:
    """
    This class helps the managing of the GVM target object
    
    Attributes:
        client = client used to interact with gvm server (created from GVMClient class)
        name: str = name of target object
        id: str = id returned after the creation
        in_use: str = state if the target object is in use
    """
    
    def __init__(self,
Gioacchino Vino's avatar
Gioacchino Vino committed
                 client,
                 name: str,
Gioacchino Vino's avatar
Gioacchino Vino committed
                 host: str,
Gioacchino Vino's avatar
Gioacchino Vino committed
                 port_list: PortList):
        self.client = client
        self.name = name
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.host = host
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.pl = port_list
        
        # Retrieve targets objs by name
        res = self.__get_info(filter = name)
        
        if len(res) == 0: 
            # If no result retrieved, create it
            self.create()
        
        else:
Gioacchino Vino's avatar
Gioacchino Vino committed
            logging.debug("Already created. Collected from server")
Gioacchino Vino's avatar
Gioacchino Vino committed
            if len(res) > 1:
                # If one result has been collected, consider the first one
                msg = f"The target name {name} retrieved {len(res)} results"
                logging.warning(msg)
                logging.warning("The first one will be considered")
            
            # If one result has been collected, consider it
            self.name = res[0]['name']
            self.id = res[0]['id']
            self.in_use = res[0]['in_use']
 
    def __get_info(self, filter: str) -> List[Dict[str, str]]:
        res = []
        targets = self.client.get_targets(filter_string = filter) \
                             .xpath('target')
        for target in targets:
            t_name = str(target.xpath(GVM_XPATH_NAME_TEXT)[0])
            t_id  = str(target.xpath(GVM_XPATH_ID)[0])
            t_in_use = str(target.xpath(GVM_XPATH_INUSE_TEXT)[0])
            res.append({"name": t_name, 
                        "id": t_id, 
                        "in_use": t_in_use})
        return res  
    
    def create(self) -> None:
        res = self.client.create_target(
                name = self.name,
                comment = "",
                hosts = [self.host],
                port_list_id = self.pl.id,
                ssh_credential_id = Configs.ovs_ssh_credential,
                alive_test = AliveTest('Consider Alive'))
        
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_CREATE_OK:
            t_id = str(res.xpath(GVM_XPATH_ID)[0])
            res = self.__get_info(filter = t_id)
            
            if len(res) == 0:
                # No obj retrieved. Error during creation
                msg = f"The target name {self.name} retrieved 0 results after creation"
                logging.error(msg)
            else:
                if len(res) > 1:
                    # Multiple objs retrieved, consider the first one
                    msg = f"The target id {t_id} retrieved {len(res)} results"
                    logging.warning(msg)
                    logging.warning("The first one will be considered")   

                self.name = res[0]['name']
                self.id = res[0]['id']
                self.in_use = res[0]['in_use']
                msg = "Created target obj. "
Gioacchino Vino's avatar
Gioacchino Vino committed
                msg += f"Name: {self.name}, id: {self.id}, host: {self.host}"
Gioacchino Vino's avatar
Gioacchino Vino committed
                logging.debug(msg)
        else:
            msg = "ERROR during Target creation. "
            msg += f"Status code: {status}, msg: {status_text}"
            raise GvmException(msg)
        
    def __str__(self):
        d = {'name': self.name, 
             'id': self.id, 
             "in_use": self.in_use,
Gioacchino Vino's avatar
Gioacchino Vino committed
             'host': self.host}
        return pretty_json(d)
    
    def delete(self):
Gioacchino Vino's avatar
Gioacchino Vino committed
        logging.debug(f"Deletion target {self.name}")
        res = self.client.delete_target(self.id)
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.client = None
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_OK:
            logging.info(f"Target {self} DELETED") 
        else:
            logging.error(f"ERROR during the target deletion {status}: {status_text}")

class Task:
    """
    This class helps the managing of the GVM task object
    
    Attributes:
        client = client used to interact with gvm server (created from GVMClient class)
        name: str = name of task object
        id: str = id returned after the creation
        in_use: str = state if the task object is in use
        report_id: str = report id once task is completed
        
    Methods:
        start = starts the task
        stop = stops the task
        delete = deletes the tasks
        get_progress = retrieves current task status
        wait = waits for the task is completed
        save_report = saves report on file
        get_report_info = retrieves report information
    """
Gioacchino Vino's avatar
Gioacchino Vino committed
    # Constants
Gioacchino Vino's avatar
Gioacchino Vino committed
    WAIT_SECONDS = 10
Gioacchino Vino's avatar
Gioacchino Vino committed

    def __init__(self, 
Gioacchino Vino's avatar
Gioacchino Vino committed
                 client,
                 name: str, 
                 target: Target) -> None:
        self.client = client
        self.name = name
        self.target = target
Gioacchino Vino's avatar
Gioacchino Vino committed

        # Retrieve task objs by name
        res = self.__get_info(filter = name)

        if len(res) == 0: 
            # If no result retrieved, create it
            self.create()
        else:
Gioacchino Vino's avatar
Gioacchino Vino committed
            logging.debug("Already created. Collected from server")
Gioacchino Vino's avatar
Gioacchino Vino committed
            if len(res) > 1:
                # If one result has been collected, consider the first one
                msg = f"The port_list name {name} retrieved {len(res)} results"
                logging.warning(msg)
                logging.warning("The first one will be considered")

            self.name = res[0]['name']
            self.id = res[0]['id']
            self.in_use = res[0]['in_use']
            self.status = res[0]['status']
            self.report_id = res[0].get("report_id", None)

    def __get_info(self, filter: str) -> List[dict]: 
        res = []
        tasks = self.client.get_tasks(filter_string = filter) \
                           .xpath('task')
        for t in tasks:
            t_name = str(t.xpath(GVM_XPATH_NAME_TEXT)[0])
            t_id  = str(t.xpath(GVM_XPATH_ID)[0])
            t_in_use = str(t.xpath(GVM_XPATH_INUSE_TEXT)[0])
            t_status = str(t.xpath(GVM_XPATH_STATUS_TEXT_3)[0])
            t_dict = {"name": t_name, 
                      "id": t_id, 
                      "in_use": t_in_use, 
                      "status": t_status}
            try:
                t_report_id = t.xpath(GVM_XPATH_LAST_REPORT_ID)[0]
            except Exception:
                pass
            else:
                t_dict['report_id'] = t_report_id
            
            res.append(t_dict)
        return res    

    def create(self) -> None:
        res = self.client.create_task(
                name = self.name,
                config_id = Configs.config,
                target_id = self.target.id,
                scanner_id = Configs.scanner)
        
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        
        if status == GVM_STATUS_CREATE_OK:
            t_id = str(res.xpath(GVM_XPATH_ID)[0])
            res = self.__get_info(filter = t_id)
            if len(res) == 0:
                # No obj retrieved. Error during creation
                msg = f"The task id {t_id} retrieved 0 results after creation"
                logging.error(msg)
            else:
                if len(res) > 1:
                    # Multiple objs retrieved, consider the first one
                    msg = f"The task id {t_id} retrieved {len(res)} results"
                    logging.warning(msg)
                    logging.warning("The first one will be considered")

Gioacchino Vino's avatar
Gioacchino Vino committed
                self.name = res[0]['name']
                self.id = res[0]['id']
                self.in_use = res[0]['in_use']
                self.status = res[0]['status']
Gioacchino Vino's avatar
Gioacchino Vino committed
                self.report_id = res[0].get("report_id", None)
Gioacchino Vino's avatar
Gioacchino Vino committed
                msg = "Created task obj. "
                msg += f"Name: {self.name}, id: {self.id}"
                logging.debug(msg)
Gioacchino Vino's avatar
Gioacchino Vino committed
        else:
            msg = "ERROR during Task creation. "
            msg += f"Status code: {status}, msg: {status_text}"
            raise GvmException(msg)
        
    def __str__(self):
        d = {'name': self.name, 
             'id': self.id, 
             'in_use': self.in_use,
             'status': self.status,
             'report_id': self.report_id}
        return pretty_json(d)
    
    def start(self):
        res = self.client.start_task(self.id)
        self.report_id = res.xpath(GVM_XPATH_REPORT_ID_TEXT)[0]
        logging.info(f"Task {self} STARTED") 

    def stop(self):
        res = self.client.stop_task(self.id)
        pretty_print(res)
        logging.info(f"Task {self} STARTED") 

    def delete(self):
Gioacchino Vino's avatar
Gioacchino Vino committed
        logging.debug(f"Deletion task {self.name}")
        res = self.client.delete_task(self.id)
        self.client = None
        status = res.xpath(GVM_XPATH_STATUS)[0]
        status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
        if status == GVM_STATUS_OK:
            logging.info(f"Task {self} DELETED") 
        else:
            logging.error(f"ERROR during the task deletion {status}: {status_text}")
    def update_status(self):
Gioacchino Vino's avatar
Gioacchino Vino committed
        task_info = self.client.get_tasks(filter_string = self.id) \
                               .xpath('task')[0]
        self.name = task_info.xpath(GVM_XPATH_NAME_TEXT)[0]
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.status = task_info.xpath(GVM_XPATH_STATUS_TEXT_3)[0]       #   New -> Requested -> Queued -> Running  -> Done
        self.progress = int(task_info.xpath(GVM_XPATH_PROGRESS_TEXT)[0])#    0         0           0      0 -> 100     -1
        self.in_use = task_info.xpath(GVM_XPATH_INUSE_TEXT)[0]
            self.report_id = task_info.xpath(GVM_XPATH_LAST_REPORT_ID)[0]
        except Exception:
Gioacchino Vino's avatar
Gioacchino Vino committed
        """d = {
Gioacchino Vino's avatar
Gioacchino Vino committed
            "name": self.name,
            "status": self.status,
            "progress":self.progress,
            "in_use":self.in_use,
        }
Gioacchino Vino's avatar
Gioacchino Vino committed
        logging.debug(f"update_status: \n {pretty_json(d)}")"""
Gioacchino Vino's avatar
Gioacchino Vino committed

    def wait(self, timeout: int = 7200) -> bool:
        start_time = time()
        logging.debug("Waiting for scans ends the task")
        while True:
Gioacchino Vino's avatar
Gioacchino Vino committed
            self.update_status()
            if self.status not in ["New","Requested","Queued","Processing", "Running","Done"]: # ["Interrupted", ...]
                logging.warning(f"Task in the undesired status: '{self.status}'")
                return False
            if self.status == "Done" and self.progress == -1:
                logging.info("Task completed")
                return True
            if time() - start_time > timeout:
                logging.error("TIMEOUT during waiting for task ending")
                return False
            logging.debug(f"Waiting for the task ends. Now {int(time() - start_time)}s from start. Status: {self.status}")
Gioacchino Vino's avatar
Gioacchino Vino committed
            sleep(self.WAIT_SECONDS)
    def save_report(self, format: str, filename: str):
        res = self.client.get_report(self.report_id,
                                     report_format_id=format, 
                                     ignore_pagination=True,
                                     details=True)
        code = str(res.xpath(GVM_XPATH_REPORT_TEXT)[0])
        with open(filename, "wb") as fh:
            fh.write(base64.b64decode(code))
Gioacchino Vino's avatar
Gioacchino Vino committed

    def get_report(self) -> List[Tuple[str,str,str,str]]:
        res = self.client.get_report(self.report_id,
                                     report_format_id=ReportFormats.anonymous_xml,
                                     ignore_pagination=True,
                                     details="1")
        o_ids: list[str] = res.xpath('report/report/results/result/nvt/@oid')
        severities: list[str] = res.xpath('report/report/results/result/nvt/severities/@score')
        severities: list[float] = list(map(lambda a : float(a), severities))
        treats: list[str] = res.xpath('report/report/results/result/threat/text()')
        ports: list[str] = res.xpath('report/report/results/result/port/text()')
Gioacchino Vino's avatar
Gioacchino Vino committed
        return [ResultReport(o,s,t,p) for o,s,t,p in
                    zip(o_ids, severities, treats, ports)]
    
class GVMClient():
    """
    This class provides API to interact with GVM in order to 
    get, create and delete port_lists, targets, tasks and reports
    """
    
    CONNECTION_RETRIES = 5
    LOCAL_IP = "127.0.0.1"
    
    def __init__(self, 
                 auth_n: str, 
                 auth_p: str, 
                 host_ip: str = LOCAL_IP):
        self.auth_name = auth_n
        self.auth_passwd = auth_p 
        self.host_ip = host_ip
        self.client = None
        self.create_client()

    def create_client(self):
        retry = self.CONNECTION_RETRIES
        while(retry > 0):
            try:
                logging.debug('Creation of the GMP Client')
                logging.debug(f'host_ip: {self.host_ip}')
                self.client = Gmp(TLSConnection(hostname = self.host_ip), 
                                  transform=EtreeTransform())
                break
            except Exception:
                logging.error(f"Connection error with the gmp endpoint. Remaining {retry} retries")
                retry -= 1
                sleep(0.5)
        logging.debug('GMP Client Created')
        if retry == 0:
            raise GvmException("Impossible connect to the gmp endpoint even after 5 retries")
            
        self.client.authenticate(self.auth_name, self.auth_passwd)
        logging.debug('GMP Client Authenticated')
Gioacchino Vino's avatar
Gioacchino Vino committed

    def get_client(self):
        return self.client

    def get_version(self) -> str:
        res = self.client.get_version()
Gioacchino Vino's avatar
Gioacchino Vino committed
        return str(res.xpath('version/text()')[0])

class ReportManager():
    # CLASSIFICATION configuration
    SEVERITY_THR = 4
    MSG_OK = "OK"
    MSG_NOK = "NOK"
    DEFAULT_SEVERITY = -1
    MAX_SEVERITY = 9
    DEFAULT_THREAT = "None"

    # REPORT Keywords
    REPORT_DEPLOYMENT = "deployment"
    REPORT_GLOBAL = "global"
    REPORT_SEVERITY = "severity"
    REPORT_THREAT = "threat"

    # OIDS Classes
    OID_ACCEPTED = 'accepted-oids'
    OID_NEW = 'new-oids'
    OID_DROPPED = 'dropped-oids'
    OID_OS = 'os-related-oids'

    # OS security repository configuration
    OS_GIT_REPO = "baltig.infn.it/infn-cloud/os_security_checks.git"
    OS_SEC_BRANCH = "new-oids"
    OS_SEC_USER = "GIT_OS_SEC_USER"
    OS_SEC_TOKEN = "GIT_OS_SEC_TOKEN"
    OS_SEC_DEST_DIR = "os-sc-repo"
    OS_SEC_FILENAME = "os-oids.yaml"
    OS_COMMIT_MESSAGE = 'Added oid(s)'

    # Security scans repository configuration
    SS_GIT_REPO = "baltig.infn.it/infn-cloud/security-scans.git"
    SS_SEC_USER = "GIT_SEC_USER"
    SS_SEC_TOKEN = "GIT_SEC_TOKEN"
    SS_SEC_DEST_DIR = "ss-repo"
    SS_SEC_CHILD_DIR = "queues"
    SS_SEC_ACCEPTED_FILES = ['accepted.txt']
    SS_SEC_KNOWN_FILES = ['held.txt', 'new.txt', 'overridden.txt']

    def __init__(self, os_name: str, is_os: bool) -> None:
        self.os_name = os_name
        self.is_os = is_os
        self.imported_oids: Dict[str, List[ResultReport]] = dict()
        self.import_os_sec_repo()
        self.import_security_oids()

    def import_yaml_file(self) -> dict:
Gioacchino Vino's avatar
Gioacchino Vino committed
        if os.path.isfile(self.os_file):
            with open(self.os_file, 'r') as ifile:
Gioacchino Vino's avatar
Gioacchino Vino committed
                oids = yaml.load(ifile, Loader=yaml.FullLoader)
Gioacchino Vino's avatar
Gioacchino Vino committed
            return oids
        else:
            return dict()
Gioacchino Vino's avatar
Gioacchino Vino committed

    def import_os_sec_repo(self):
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.os_file = os.path.join(self.OS_SEC_DEST_DIR,
                                    self.OS_SEC_FILENAME)
Gioacchino Vino's avatar
Gioacchino Vino committed
        user = os.environ.get(self.OS_SEC_USER)
        token = os.environ.get(self.OS_SEC_TOKEN)
        repo_url = f"https://{user}:{token}@{self.OS_GIT_REPO}"

        try:
            git.Repo.clone_from(repo_url,
                                self.OS_SEC_DEST_DIR,
                                branch = self.OS_SEC_BRANCH)
        except Exception as e:
            logging.error(f"Impossible clone the os scans repository, {e}")
            self.os_oids = dict()
            self.os_all_oids = []
        else:
            os_oids = self.import_yaml_file()
            logging.debug("Imported host os security oids")
            logging.debug(pretty_json(os_oids))

            if not isinstance(os_oids, dict):
                logging.warning("Impossible parse the oids yaml file")
                self.os_oids = dict()
                self.os_all_oids = []
            else:
                self.os_oids = os_oids
                try:
                    self.os_all_oids = list(reduce(lambda x,y: x + y,
                                                   os_oids.values()))
                    logging.debug("Imported os security oids")
                    logging.debug(pretty_json(self.os_all_oids))

                except Exception as e:
                    logging.warning("Impossible extract oids from imported yaml")
                    self.os_all_oids = []

    def import_security_oids(self) -> None:
        user = os.environ.get(self.SS_SEC_USER)
        token = os.environ.get(self.SS_SEC_TOKEN)
        repo_url = f"https://{user}:{token}@{self.SS_GIT_REPO}"
        files_dir = os.path.join(self.SS_SEC_DEST_DIR,self.SS_SEC_CHILD_DIR)
        try:
            git.Repo.clone_from(repo_url, self.SS_SEC_DEST_DIR)
        except Exception as e:
            logging.warning(f"Impossible clone the ss scans repository, {e}")
            self.accepted_oids = []
            self.known_oids = []
        else:
            accepted_oids: List[str] = []
            known_oids: List[str] = []

            for f in self.SS_SEC_ACCEPTED_FILES:
                filename = os.path.join(files_dir,f)
                with open(filename, 'r') as file:
                    accepted_oids += [line.strip() for line in file.readlines()
                                        if not line.startswith('#')]
            for f in self.SS_SEC_KNOWN_FILES:
                filename = os.path.join(files_dir,f)
                with open(filename, 'r') as file:
                    known_oids += [line.strip() for line in file.readlines()
                                    if not line.startswith('#')]
            self.accepted_oids = accepted_oids
            self.known_oids = known_oids
Gioacchino Vino's avatar
Gioacchino Vino committed
            logging.debug("accepted oids")
            logging.debug(pretty_json(self.accepted_oids))
            logging.debug("known oids")
            logging.debug(pretty_json(self.known_oids))
Gioacchino Vino's avatar
Gioacchino Vino committed

    def import_report(self, host: str, report: List[ResultReport]):
        self.imported_oids[host] = report

    def init_glob_vars(self):
        self.report = dict()
        self.report[self.REPORT_DEPLOYMENT] = {self.REPORT_SEVERITY: self.DEFAULT_SEVERITY,
                                               self.REPORT_THREAT: self.DEFAULT_THREAT}
        self.oids = dict()

    def init_host_vars(self,host: str, r: ResultReport):
        self.oids[host] = {self.OID_ACCEPTED: [],
                           self.OID_DROPPED: [],
Gioacchino Vino's avatar
Gioacchino Vino committed
                           self.OID_NEW: []}
        
Gioacchino Vino's avatar
Gioacchino Vino committed
        if not self.is_os:
Gioacchino Vino's avatar
Gioacchino Vino committed
            self.oids[host][self.OID_OS] = []

Gioacchino Vino's avatar
Gioacchino Vino committed
        self.report[host] = dict()
        self.report[host][r.port] = {self.REPORT_SEVERITY: self.DEFAULT_SEVERITY,
                                self.REPORT_THREAT: self.DEFAULT_THREAT}
        self.report[host][self.REPORT_GLOBAL] = {self.REPORT_SEVERITY: self.DEFAULT_SEVERITY,
                                                 self.REPORT_THREAT: self.DEFAULT_THREAT}

    def update_summary(self,host, r: ResultReport) -> None:
        # Evaluate max port severity per host
        if r.port not in self.report[host] or \
Gioacchino Vino's avatar
Gioacchino Vino committed
                r.severity > self.report[host][r.port][self.REPORT_SEVERITY]:
Gioacchino Vino's avatar
Gioacchino Vino committed
            self.report[host][r.port] = {self.REPORT_SEVERITY: r.severity,
                                    self.REPORT_THREAT: r.threat}

        # Evaluate max global severity per host
        if r.severity > self.report[host][self.REPORT_GLOBAL][self.REPORT_SEVERITY]:
            self.report[host][self.REPORT_GLOBAL] = {self.REPORT_SEVERITY: r.severity,
                                                     self.REPORT_THREAT: r.threat}

        # Evaluate Global max severity
        if r.severity > self.report[self.REPORT_DEPLOYMENT][self.REPORT_SEVERITY]:
            self.report[self.REPORT_DEPLOYMENT] = {self.REPORT_SEVERITY: r.severity,
                                                   self.REPORT_THREAT: r.threat}

    def classify_reports(self) -> None:

        # Init global aggregated variables
        self.init_glob_vars()

Gioacchino Vino's avatar
Gioacchino Vino committed
        for host, host_report in self.imported_oids.items():
Gioacchino Vino's avatar
Gioacchino Vino committed
            for res_report in host_report:

                # Init aggregated variables per host
                self.init_host_vars(host,res_report)

                # Skip if oid is not relevant
                if res_report.severity < self.SEVERITY_THR: continue

                # Classify oid
                if not self.is_os and res_report.oid in self.os_all_oids:
Gioacchino Vino's avatar
Gioacchino Vino committed
                    self.oids[host][self.OID_OS] += [res_report]
Gioacchino Vino's avatar
Gioacchino Vino committed
                elif res_report.oid in self.accepted_oids:
Gioacchino Vino's avatar
Gioacchino Vino committed
                    self.oids[host][self.OID_ACCEPTED].append(res_report)
                    self.update_summary(host, res_report)
                elif res_report.oid in self.known_oids:
                    self.oids[host][self.OID_DROPPED].append(res_report)
                else:
                    self.oids[host][self.OID_NEW].append(res_report)
                    self.update_summary(host, res_report)

        # Extract global estimation
Gioacchino Vino's avatar
Gioacchino Vino committed
        if self.report[self.REPORT_DEPLOYMENT][self.REPORT_SEVERITY] < self.SEVERITY_THR:
Gioacchino Vino's avatar
Gioacchino Vino committed
            self.report[self.REPORT_GLOBAL] = self.MSG_OK
        else:
            self.report[self.REPORT_GLOBAL] = self.MSG_NOK

    def get_summary(self) -> str:
        return pretty_json(self.report)

    def get_classified_oids(self) -> str:
Gioacchino Vino's avatar
Gioacchino Vino committed
        json_oids = dict()
        for host, data in self.oids.items():
            json_oids[host] = dict()
            for key, oids in data.items():
Gioacchino Vino's avatar
Gioacchino Vino committed
                json_oids[host][key] = [str(o) for o in oids]
Gioacchino Vino's avatar
Gioacchino Vino committed
        return pretty_json(json_oids)
Gioacchino Vino's avatar
Gioacchino Vino committed

    def create_msg(self, r: ResultReport):
        msg =  f"    Detected oid: {r.oid}, severity: {r.severity}"
        msg += f", threat: {r.threat} and port: {r.port}\n"
        return msg

    def write_data(self,
                   summary_filename: str,
                   oids_filename: str):

        # Save on file report summary
        self.write_summary(summary_filename)

        # If this script scanned a single_vm deployment
        # delete the empty OID_OS section in oid and
        # push the new os oids
        if self.is_os:
            self.write_new_oids()
Gioacchino Vino's avatar
Gioacchino Vino committed
            self.push_new_oids()
Gioacchino Vino's avatar
Gioacchino Vino committed

        # Save on file all classifies oids
        self.write_oids(oids_filename)

    def write_new_oids(self) -> None:
        # Overwrite the detected oids to the host oids
Gioacchino Vino's avatar
Gioacchino Vino committed
        self.os_oids[self.os_name] = []
        for _ , data in self.oids.items():
Gioacchino Vino's avatar
Gioacchino Vino committed
            self.os_oids[self.os_name] += [str(a.oid) for a in data[self.OID_ACCEPTED]]
            self.os_oids[self.os_name] += [str(n.oid) for n in data[self.OID_NEW]]
Gioacchino Vino's avatar
Gioacchino Vino committed
        with open(self.os_file, 'w') as file:
            yaml.dump(self.os_oids, file)

    def write_oids(self, oids_filename) -> None:
        with open(oids_filename, 'w') as file:
            yaml.dump(self.os_oids, file)

    def push_new_oids(self):
        repo = git.Repo(self.OS_SEC_DEST_DIR)
Gioacchino Vino's avatar
Gioacchino Vino committed
        repo.git.add(self.OS_SEC_FILENAME)
Gioacchino Vino's avatar
Gioacchino Vino committed
        repo.index.commit(self.OS_COMMIT_MESSAGE)
        origin = repo.remote('origin')
        origin.push()
        logging.info("New oid file successfully pushed")

    def write_summary(self, summary_filename) -> None:
        with open(summary_filename, 'w') as file:
            yaml.dump(self.report, file)