Skip to content
Snippets Groups Projects
utilities.py 3.69 KiB
Newer Older
  • Learn to ignore specific revisions
  • import json
    import logging
    
    from typing import Dict, List, Tuple
    
    import git
    import os
    
    SSH_PORT: str = '22'
    HTTP_PORT: str = '80'
    HTTPS_PORT: str = '443'
    
    def import_dep_info(file_path: str, endpoint_keys: str) -> Dict[str,List[str]]:
    
        with open(file_path) as f:
            data = json.load(f)    
        
        endpoints = {}
        if endpoint_keys != "None":
            list_endpoints = endpoint_keys.split(',')
            for key in data['outputs'].keys():
                if key in list_endpoints:
                    endpoint = str(data['outputs'][key])
                    prefix,url = endpoint.split("://")
                    if ":" in url:
                        host,port = url.split(":")
                    else:
                        host = url
                        if prefix == "https":
    
                            port = HTTPS_PORT
    
                        elif prefix == 'http':
    
                            port = HTTP_PORT
    
                        else:
                            raise Exception(f"Impossible to parse the endpoint port. Endpoint: {endpoint}")
                    logging.info(f"Endpoint: {host}:{port}")
                    if host not in endpoints:
                        endpoints[host] = {port}
                    else:
                        endpoints[host].add(port)
    
        else:
            for key,value in data['outputs'].items():
    
                if "_ip" in key and isinstance(value, str):
                    logging.info(f"endpoint: {value}:{SSH_PORT}")
                    endpoints[value] = {SSH_PORT}
    
        
        for host,ports in endpoints.items():
            endpoints[host] = sorted(list(ports))
    
        return endpoints
    
    
    def process_global_reports_info(reports: Dict) -> Dict:
    
        glob_severity = -1
    
        glob_threat = 'None'
    
        for host in reports:
            host_glob_severity = reports[host]['global']['severity']
            if host_glob_severity > glob_severity:
                glob_severity = host_glob_severity
                glob_threat = reports[host]['global']['threat']
        reports['deployment'] = {'severity': glob_severity, 
                                'threat': glob_threat}
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
        
    
        if reports['deployment']['severity'] < 4:
            reports['global'] = "OK"
        else:
            reports['global'] = "NOK"
    
    Gioacchino Vino's avatar
    Gioacchino Vino committed
    
    
        return reports
    
    
    def read_not_relevant_issues() -> List[str]:
    
        git_sec_user = os.environ.get("GIT_SEC_USER")
        git_sec_token = os.environ.get("GIT_SEC_TOKEN")
        git_repo = "baltig.infn.it/infn-cloud/security-scans.git"
        repo_url = f"https://{git_sec_user}:{git_sec_token}@{git_repo}"
        destination_folder = 'repo'
        git.Repo.clone_from(repo_url, destination_folder)
    
        file_path = 'repo/queues/overridden.txt'
        with open(file_path, 'r') as file:
            return [line.strip() for line in file.readlines() if not line.startswith('#')]
    
    def read_issues() -> Tuple[List[str],List[str]]:
        git_sec_user = os.environ.get("GIT_SEC_USER")
        git_sec_token = os.environ.get("GIT_SEC_TOKEN")
        git_repo = "baltig.infn.it/infn-cloud/security-scans.git"
        repo_url = f"https://{git_sec_user}:{git_sec_token}@{git_repo}"
        destination_folder = 'repo'
        git.Repo.clone_from(repo_url, destination_folder)
    
        accepted_file_paths = ['repo/queues/accepted.txt']
        known_file_path = ['repo/queues/held.txt',
                           'repo/queues/new.txt',
                           'repo/queues/overridden.txt']
        accepted_issues = []
        known_issues = []
    
        for f in accepted_file_paths:
            with open(f, 'r') as file:
                accepted_issues += [line.strip() for line in file.readlines()
                                        if not line.startswith('#')]
        for f in known_file_path:
            with open(f, 'r') as file:
                known_issues += [line.strip() for line in file.readlines()
                                        if not line.startswith('#')]
        return accepted_issues, known_issues