Newer
Older
from typing import Dict, List, Tuple
SSH_PORT: str = '22'
HTTP_PORT: str = '80'
HTTPS_PORT: str = '443'
def import_dep_info(file_path: str, endpoint_keys: str) -> Dict[str,List[str]]:
with open(file_path) as f:
data = json.load(f)
endpoints = {}
if endpoint_keys != "None":
list_endpoints = endpoint_keys.split(',')
for key in data['outputs'].keys():
if key in list_endpoints:
endpoint = str(data['outputs'][key])
prefix,url = endpoint.split("://")
if ":" in url:
host,port = url.split(":")
else:
host = url
if prefix == "https":
else:
raise Exception(f"Impossible to parse the endpoint port. Endpoint: {endpoint}")
logging.info(f"Endpoint: {host}:{port}")
if host not in endpoints:
endpoints[host] = {port}
else:
endpoints[host].add(port)
else:
for key,value in data['outputs'].items():
if "_ip" in key and isinstance(value, str):
logging.info(f"endpoint: {value}:{SSH_PORT}")
endpoints[value] = {SSH_PORT}
for host,ports in endpoints.items():
endpoints[host] = sorted(list(ports))
def process_global_reports_info(reports: Dict) -> Dict:
for host in reports:
host_glob_severity = reports[host]['global']['severity']
if host_glob_severity > glob_severity:
glob_severity = host_glob_severity
glob_threat = reports[host]['global']['threat']
reports['deployment'] = {'severity': glob_severity,
'threat': glob_threat}
if reports['deployment']['severity'] < 4:
reports['global'] = "OK"
else:
reports['global'] = "NOK"
def read_not_relevant_issues() -> List[str]:
git_sec_user = os.environ.get("GIT_SEC_USER")
git_sec_token = os.environ.get("GIT_SEC_TOKEN")
git_repo = "baltig.infn.it/infn-cloud/security-scans.git"
repo_url = f"https://{git_sec_user}:{git_sec_token}@{git_repo}"
destination_folder = 'repo'
git.Repo.clone_from(repo_url, destination_folder)
file_path = 'repo/queues/overridden.txt'
with open(file_path, 'r') as file:
return [line.strip() for line in file.readlines() if not line.startswith('#')]
def read_issues() -> Tuple[List[str],List[str]]:
git_sec_user = os.environ.get("GIT_SEC_USER")
git_sec_token = os.environ.get("GIT_SEC_TOKEN")
git_repo = "baltig.infn.it/infn-cloud/security-scans.git"
repo_url = f"https://{git_sec_user}:{git_sec_token}@{git_repo}"
destination_folder = 'repo'
git.Repo.clone_from(repo_url, destination_folder)
accepted_file_paths = ['repo/queues/accepted.txt']
known_file_path = ['repo/queues/held.txt',
'repo/queues/new.txt',
'repo/queues/overridden.txt']
accepted_issues = []
known_issues = []
for f in accepted_file_paths:
with open(f, 'r') as file:
accepted_issues += [line.strip() for line in file.readlines()
if not line.startswith('#')]
for f in known_file_path:
with open(f, 'r') as file:
known_issues += [line.strip() for line in file.readlines()
if not line.startswith('#')]
return accepted_issues, known_issues