import json import logging from typing import Dict, List import git import os SSH_PORT: str = '22' HTTP_PORT: str = '80' HTTPS_PORT: str = '443' def import_dep_info(file_path: str, endpoint_keys: str) -> Dict[str,List[str]]: with open(file_path) as f: data = json.load(f) endpoints = {} if endpoint_keys != "None": list_endpoints = endpoint_keys.split(',') for key in data['outputs'].keys(): if key in list_endpoints: endpoint = str(data['outputs'][key]) prefix,url = endpoint.split("://") if ":" in url: host,port = url.split(":") else: host = url if prefix == "https": port = HTTPS_PORT elif prefix == 'http': port = HTTP_PORT else: raise Exception(f"Impossible to parse the endpoint port. Endpoint: {endpoint}") logging.info(f"Endpoint: {host}:{port}") if host not in endpoints: endpoints[host] = {port} else: endpoints[host].add(port) else: for key,value in data['outputs'].items(): if "_ip" in key: if isinstance(value, str): logging.info(f"endpoint: {value}:{SSH_PORT}") endpoints[value] = {SSH_PORT} for host,ports in endpoints.items(): endpoints[host] = sorted(list(ports)) return endpoints def process_global_reports_info(reports: Dict) -> Dict: glob_severity = -1 glob_threat = 'None' for host in reports: host_glob_severity = reports[host]['global']['severity'] if host_glob_severity > glob_severity: glob_severity = host_glob_severity glob_threat = reports[host]['global']['threat'] reports['deployment'] = {'severity': glob_severity, 'threat': glob_threat} if reports['deployment']['severity'] < 4: reports['global'] = "OK" else: reports['global'] = "NOK" return reports def read_not_relevant_issues() -> List[str]: git_sec_user = os.environ.get("GIT_SEC_USER") git_sec_token = os.environ.get("GIT_SEC_TOKEN") git_repo = "baltig.infn.it/infn-cloud/security-scans.git" repo_url = f"https://{git_sec_user}:{git_sec_token}@{git_repo}" destination_folder = 'repo' git.Repo.clone_from(repo_url, destination_folder) file_path = 'repo/queues/overridden.txt' with open(file_path, 'r') as file: return [line.strip() for line in file.readlines() if not line.startswith('#')]