diff --git a/files_old/gvm_library.py b/files_old/gvm_library.py deleted file mode 100644 index 7044b90d094f157cc63d76b0c2552e71aab4afeb..0000000000000000000000000000000000000000 --- a/files_old/gvm_library.py +++ /dev/null @@ -1,413 +0,0 @@ -from gvm.connections import TLSConnection -from gvm.protocols.gmpv208 import Gmp, AliveTest -from gvm.transforms import EtreeTransform -from gvm.xml import pretty_print -from time import time, sleep -import logging -import base64 -import json -from typing import Optional - -def pretty_json(j): - return json.dumps(j,sort_keys=True,indent=4) - -class Configs: - config = "9866edc1-8869-4e80-acac-d15d5647b4d9" - scanner = "08b69003-5fc2-4037-a479-93b440211c73" - ovs_ssh_credential = "b9af5845-8b87-4378-bca4-cee39a894c17" - -class ReportFormats: - anonymous_xml = "5057e5cc-b825-11e4-9d0e-28d24461215b" - csv_results = "c1645568-627a-11e3-a660-406186ea4fc5" - itg = "77bd6c4a-1f62-11e1-abf0-406186ea4fc5" - pdf = "c402cc3e-b531-11e1-9163-406186ea4fc5" - txt = "a3810a62-1f62-11e1-9219-406186ea4fc5" - xml = "a994b278-1f62-11e1-96ac-406186ea4fc5" - -class PortList: - """ - This class helps the managing of the GVM port_list object - - Attributes: - client = client used to interact with gvm server (created from GVMClient class) - name: str = name of port list object - id: str = id returned after the creation - in_use: str = state if the port_list object is in use - """ - - def __init__(self, - client, - name: str = "", - id: str = None, - in_use: str = None): - self.client = client, - self.name = name - self.id = id - self.in_use = in_use - - def __str__(self): - d = {'name': self.name, - 'id': self.id, - 'in_use': self.in_use} - return pretty_json(d) - - def __del__(self): - res = self.client.delete_port_list(self.id) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "200": - logging.info(f"Port_list {self} DELETED") - else: - logging.error(f"ERROR during the port_list deletion {status}: {status_text}") - -class Target: - """ - This class helps the managing of the GVM target object - - Attributes: - client = client used to interact with gvm server (created from GVMClient class) - name: str = name of target object - id: str = id returned after the creation - in_use: str = state if the target object is in use - """ - - def __init__(self, - client, - name: str, - id: str, - in_use: str, - hosts: str): - self.client = client - self.name = name - self.id = id - self.in_use = in_use - self.hosts = hosts - - def __str__(self): - d = {'name': self.name, - 'id': self.id, - "in_use": self.in_use, - 'hosts': self.hosts} - return pretty_json(d) - - def __del__(self): - res = self.client.delete_target(self.id) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "200": - logging.info(f"Target {self} DELETED") - else: - logging.error(f"ERROR during the target deletion {status}: {status_text}") - -class Task: - """ - This class helps the managing of the GVM task object - - Attributes: - client = client used to interact with gvm server (created from GVMClient class) - name: str = name of task object - id: str = id returned after the creation - in_use: str = state if the task object is in use - report_id: str = report id once task is completed - - Methods: - start = starts the task - stop = stops the task - delete = deletes the tasks - get_progress = retrieves current task status - wait = waits for the task is completed - save_report = saves report on file - get_report_info = retrieves report information - """ - - def __init__(self, - client, - name: str, - id: str, - in_use: str = None, - report_id: str = None): - self.client = client - self.name = name - self.id = id - self.in_use = in_use - self.report_id = report_id - - def __str__(self): - d = {'name': self.name, - 'id': self.id, - "in_use": self.in_use, - 'report_id': self.report_id} - return pretty_json(d) - - def start(self): - res = self.client.start_task(self.id) - self.report_id = res.xpath('report_id/text()')[0] - - def stop(self): - res = self.client.stop_task(self.id) - pretty_print(res) - - def delete(self): - res = self.client.delete_task(self.id) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "200": - logging.info(f"Task {self} DELETED") - else: - logging.error(f"ERROR during the task deletion {status}: {status_text}") - - def __del__(self): - self.delete() - - def get_progress(self) -> tuple[str,int]: - res = self.client.get_tasks(self.id)[0] - status = res['status'] # New -> Requested -> Queued -> Running -> Done - progress = int(res['progress'])# 0 0 0 0 -> 100 -1 - return (status, progress) - - def wait(self, timeout: int = 3600) -> bool: - start_time = time() - logging.debug("Waiting for scans ends the task") - while True: - status, progress = self.get_progress() - if status not in ["New","Requested","Queued","Running","Done"]: # ["Interrupted", ...] - logging.warning(f"Task in the undesired status: '{status}'") - return False - if status == "Done" and progress == -1: - logging.info(f"Task completed") - return True - if time() - start_time > timeout: - logging.error("TIMEOUT during waiting for task ending") - return False - logging.debug(f"Waiting for the task ends. Now {int(time() - start_time)}s from start. Status: {status}") - sleep(10) - - def save_report(self, report_format: str, report_filename: str): - res = self.client.get_report(self.report_id, - report_format_id=report_format, - ignore_pagination=True, - details="1") - code = str(res.xpath('report/text()')[0]) - with open(report_filename, "wb") as fh: - fh.write(base64.b64decode(code)) - - def get_report_info(self) -> dict: - report = dict() - res = self.client.get_report(self.report_id, - report_format_id=ReportFormats.anonymous_xml, - ignore_pagination=True, - details="1") - threats = res.xpath('report/report/ports/port/threat/text()') - ports = res.xpath('report/report/ports/port/text()') - severities = res.xpath('report/report/ports/port/severity/text()') - severities = list(map(lambda a : float(a), severities)) - for p,t,s in zip(ports, threats, severities): - report[p] = {'severity': s, 'threat': t} - glob_severity = -1 # returned severities are null or positive - glob_threat = 'Log' - for threat,severity in zip(threats,severities): - if severity > glob_severity: - glob_severity = severity - glob_threat = threat - glob_severity = severity - - report['global'] = {'threat': glob_threat, 'severity': glob_severity} - return report - -class GVMClient(): - """ - This class provides API to interact with GVM in order to - get, create and delete port_lists, targets, tasks and reports - """ - - CONNECTION_RETRIES = 5 - LOCAL_IP = "127.0.0.1" - - def __init__(self, - auth_n: str, - auth_p: str, - host_ip: str = LOCAL_IP): - self.auth_name = auth_n - self.auth_passwd = auth_p - self.host_ip = host_ip - self.client = self.create_client() - - def create_client(self): - retry = self.CONNECTION_RETRIES - while(retry > 0): - try: - self.client = Gmp(TLSConnection(hostname=self.host_ip), - transform=EtreeTransform()) - break - except: - logging.error(f"Connection error with the gmp endpoint. Remaining {retry} retries") - retry -= 1 - sleep(0.5) - - if retry == 0: - raise Exception("Impossible connect to the gmp endpoint even after 5 retries") - - self.client.authenticate(self.auth_name, self.auth_passwd) - - def get_version(self) -> str: - res = self.client.get_version() - return str(res.xpath('version/text()')[0]) - - def get_port_lists(self, filter: str = "rows=-1") -> list[PortList]: - res = [] - client_res = self.client.get_port_lists(filter_string = filter) - for pl in client_res.xpath('port_list'): - o = PortList() - o.client = self.client - o.name = pl.xpath('name/text()')[0] - o.id = pl.xpath('@id')[0] - o.in_use = pl.xpath('in_use/text()')[0] - res.append(o) - return res - - def create_port_list(self, name: str, ports: list[str]) -> PortList: - res = self.client.create_port_list(name, ','.join(ports)) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "201": - id = res.xpath('@id')[0] - logging.debug(f'Created port list obj. Name: {name}, id: {id}, ports: {ports}') - return PortList(name = name, id = id) - else: - logging.error(f"ERROR during Port list creation. Status code: {status}, msg: {status_text}") - msg = f"ERROR during Port list creation. Status code: {status}, msg: {status_text}" - raise Exception(msg) - - def delete_port_list(self, pl: PortList): - res = self.client.delete_port_list(pl.id) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "200": - logging.info(f"Port_list {pl} DELETED") - else: - logging.error(f"ERROR {status}: {status_text}") - - def get_or_create_port_list(self, pl_name: str, ports: list[str]) -> PortList: - res = self.get_port_lists(pl_name) - if len(res) == 0: - pl = self.create_port_list(pl_name, ports) - return self.get_port_lists(pl.id)[0] - elif len(res) == 1: - return res[0] - else: - logging.warning(f"Found {len(res)} results.") - return res - - def create_target(self, name: str, ip: str, pl: PortList) -> Optional[Target]: - res = self.client.create_target( - name = name, - comment = "", - hosts = [ip], - port_list_id = pl.id, - ssh_credential_id = Configs.ovs_ssh_credential, - alive_test = AliveTest('Consider Alive')) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "201": - id = res.xpath('@id')[0] - return Target(name = name, id = id) - else: - msg = f"ERROR during Target creation. Status code: {status}, msg: {status_text}" - raise Exception(msg) - - def get_targets(self, filter: str) -> list[Target]: - res = [] - targets = self.client.get_targets(filter_string=filter) - for target in targets.xpath('target'): - t = Target() - t.client = self.client - t.name = target.xpath('name/text()')[0] - t.id = target.xpath('@id')[0] - t.in_use = target.xpath('in_use/text()')[0] - res.append(t) - return res - - def delete_target(self, target: Target): - res = self.client.delete_target(target.id) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "200": - logging.info(f"Target {target} DELETED") - else: - logging.error(f"ERROR {status}: {status_text}") - - def get_or_create_target(self, name: str, ip: str, port_list: list[str]) -> Target: - res = self.get_targets(name) - if len(res) == 0: - t = self.create_target(name,ip,port_list) - return self.get_targets(t.id)[0] - elif len(res) == 1: - return res[0] - else: - print(f"Found {len(res)} results. Return None") - return res - - def search_and_delete_target(self, target_name: str): - targets = self.get_targets(target_name) - if len(targets) == 1: - self.delete_target(targets[0]['id']) - else: - raise("Multiple results for search") - - def search_and_delete_all_targets(self, target_name: str): - targets = self.get_targets(target_name) - for target in targets: - self.delete_target(target) - - def create_task(self, name: str, target: Target) -> Optional[Task]: - res = self.client.create_task( - name = name, - config_id = Configs.config, - target_id = target.id, - scanner_id = Configs.scanner) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "201": - id = res.xpath('@id')[0] - return Task(name = name, id = id) - else: - msg = f"ERROR during Task creation. Status code: {status}, msg: {status_text}" - raise Exception(msg) - - def get_tasks(self, filter: str) -> list[Task]: - list_of_tasks = [] - tasks = self.client.get_tasks(filter_string=filter) - for task in tasks.xpath('task'): - t = Task() - t.name = task.xpath('name/text()')[0] - t.id = task.xpath('@id')[0] - t.in_use = task.xpath('in_use/text()')[0] - try: - t.report_id = task.xpath('last_report/report/@id')[0] - except: - pass - list_of_tasks.append(t) - return list_of_tasks - - def get_or_create_task(self, task_name: str, target: Target) -> Task: - res = self.get_tasks(task_name) - if len(res) == 0: - t = self.create_task(task_name, target) - return self.get_tasks(t.id)[0] - elif len(res) == 1: - return res[0] - else: - print(f"Found {len(res)} results. Return None") - return res - - def delete_all_tasks(self, filter: str): - tasks = self.get_tasks(filter) - for task in tasks: - self.delete_task(task) - - def get_report_formats(self): - res = self.client.get_report_formats() - for f in res.xpath('report_format'): - name = f.xpath('name/text()')[0] - id = f.xpath('@id')[0] - print(f"Report format id: '{id}' and name: '{name}'") \ No newline at end of file diff --git a/files_old/scan.py b/files_old/scan.py deleted file mode 100644 index 23231819eb35bc80840debb6042edbf2439f06c6..0000000000000000000000000000000000000000 --- a/files_old/scan.py +++ /dev/null @@ -1,107 +0,0 @@ -#!/usr/bin/env python3 - -import logging -import json -from sys import argv, exit -import os -import scan_gvm_library as gvm_library -import scan_utilities as utilities -import argparse - -### GVM Options ### -WAIT_TIMEOUT = 3600 #1h - -parser = argparse.ArgumentParser( - description='Scan endpoints and machines') - -parser.add_argument( - "--endpoint-keys", - help="Orchestrator output endpoints to scan (endpoints1,endpoints2)", - default="None" - ) -parser.add_argument( - "--dep-json", - default="./dep.json" - ) -parser.add_argument( - "--output-dir", - default="." - ) - -args = parser.parse_args() - -logging.basicConfig( - filename='scan.log', - level=logging.INFO, - format='%(asctime)s %(levelname)-8s %(message)s', - datefmt='%Y-%m-%d %H:%M:%S', - filemode='w') -logging.info("\n\nStart scan application") - -if os.environ.get('GMP_USER') is not None and \ - os.environ.get('GMP_USER') != '': - auth_name = os.getenv('GMP_USER') -else: - logging.error("GMP_USER env var is not defined\nexit") - raise Exception("GMP_USER env var is not defined") - -if os.environ.get('GMP_PASSWORD') is not None and \ - os.environ.get('GMP_PASSWORD') != '': - auth_passwd = os.getenv('GMP_PASSWORD') -else: - logging.error("GMP_PASSWORD env var is not defined\nexit") - raise Exception("GMP_PASSWORD env var is not defined") - -dep_json = args.dep_json -output_dir = args.output_dir -logging.info(f"endpoint_keys: {args.endpoint_keys}") -logging.info(f"dep_json: {dep_json}") -logging.info(f"output_dir: {output_dir}") - -endpoints = utilities.import_dep_info( - dep_json, - args.endpoint_keys) - -logging.info(f"endpoints: {endpoints}") - -# test gmp connection -gvm_library.set_auth(auth_name, auth_passwd) -logging.info(f"gvm version: {gvm_library.get_version()}") - -reports = dict() -for host,ports in endpoints.items(): - logging.info(f"endpoint: {host}:{ports}") - - target_name = f"{auth_name}_target_{host}" - task_name = f"{auth_name}_task_{host}" - port_list_name = f"{auth_name}_pl_{host}" - report_filename = f"{output_dir}/{host}-report.txt" - summary_filename = f"{output_dir}/summary-report.json" - - port_list = gvm_library.get_or_create_port_list(port_list_name,ports) - logging.info(f"Port list:\n {utilities.pretty_json(port_list)}") - - target = gvm_library.get_or_create_target(target_name,host,port_list) - logging.info(f"Target:\n {utilities.pretty_json(target)}") - - task = gvm_library.get_or_create_task(task_name, target) - logging.info(f"Task:\n {utilities.pretty_json(task)}") - - if task['status'] == 'New': - task = gvm_library.start_task(task) - if gvm_library.wait_for_task_ending(task, WAIT_TIMEOUT): - gvm_library.save_report(task,gvm_library.report_formats.txt, report_filename) - reports[host] = gvm_library.get_report_info(task) - else: - reports[host] = f"ERROR Task: {task['id']}" - - gvm_library.delete_task(task) - gvm_library.delete_target(target) - gvm_library.delete_port_list(port_list) - -reports = gvm_library.process_global_reports_info(reports) - -logging.info(utilities.pretty_json(reports)) - -with open(summary_filename, "w") as f: - f.write(json.dumps(reports)) diff --git a/files_old/scan_gvm_library.py b/files_old/scan_gvm_library.py deleted file mode 100644 index 04cc16cb8e25afef54e22ebe2c57afcd6ad61efd..0000000000000000000000000000000000000000 --- a/files_old/scan_gvm_library.py +++ /dev/null @@ -1,406 +0,0 @@ -from gvm.connections import TLSConnection -from gvm.protocols.gmpv208 import Gmp, AliveTest -from gvm.transforms import EtreeTransform -from gvm.xml import pretty_print -from time import time, sleep -import logging -import base64 - -local_ip = "127.0.0.1" -connection = TLSConnection(hostname=local_ip) -transform = EtreeTransform() -config = {'id':"9866edc1-8869-4e80-acac-d15d5647b4d9"} -scanner = {'id': "08b69003-5fc2-4037-a479-93b440211c73"} -ovs_ssh_credential = {'id': "b9af5845-8b87-4378-bca4-cee39a894c17"} -auth_name, auth_passwd = "", "" - - -def set_auth(auth_n, auth_p): - global auth_name - global auth_passwd - auth_name = auth_n - auth_passwd = auth_p - -def get_version_old(): - with Gmp(connection, transform=transform) as gmp: - gmp.authenticate(auth_name, auth_passwd) - pretty_print(gmp.get_version()) - -def create_connection(): - connection_retries = 5 - retry = connection_retries - while(retry > 0): - try: - gmp = Gmp(connection, transform=transform) - gmp.authenticate(auth_name, auth_passwd) - return gmp - except: - logging.warning(f"Connection error with the gmp endpoint. Remaining {retry} retries") - retry -= 1 - sleep(0.5) - raise Exception("Impossible connect to the gmp endpoint even after 5 retries") - -def get_version(): - gmp = create_connection() - res = gmp.get_version() - return res.xpath('version/text()')[0] - -########## PORT LIST ################################## - -def create_port_list(port_list_name, ports): - gmp = create_connection() - res = gmp.create_port_list(port_list_name, ','.join(ports)) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "201": - id = res.xpath('@id')[0] - logging.debug(f'Created port list obj. Name: {port_list_name}, id: {id}, ports: {ports}') - return {'name': port_list_name, 'id': id} - else: - logging.error(f"ERROR during Port list creation. Status code: {status}, msg: {status_text}") - msg = f"ERROR during Port list creation. Status code: {status}, msg: {status_text}" - raise Exception(msg) - -def get_port_lists(filter_str="rows=-1"): - l_o = [] - gmp = create_connection() - res = gmp.get_port_lists(filter_string=filter_str) - for pl in res.xpath('port_list'): - o = dict() - o['name'] = pl.xpath('name/text()')[0] - o['id'] = pl.xpath('@id')[0] - o['in_use'] = pl.xpath('in_use/text()')[0] - l_o.append(o) - return l_o - -def delete_port_list(port_list): - gmp = create_connection() - res = gmp.delete_port_list(port_list['id']) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "200": - logging.info(f"Port_list with id: {port_list['id']} and name: {port_list['name']} DELETED") - else: - logging.error(f"ERROR {status}: {status_text}") - -def get_or_create_port_list(port_list_name, ports): - res = get_port_lists(port_list_name) - if len(res) == 0: - port_list = create_port_list(port_list_name, ports) - return get_port_lists(port_list['id'])[0] - elif len(res) == 1: - return res[0] - else: - logging.warning(f"Found {len(res)} results.") - return res - -############## TARGET ################################## - -def create_target(name,ip,port_list): - o = dict() - gmp = create_connection() - res = gmp.create_target( - name=name, - comment = "", - hosts=[ip], - port_list_id = port_list['id'], - ssh_credential_id = ovs_ssh_credential['id'], - alive_test=AliveTest('Consider Alive')) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "201": - id = res.xpath('@id')[0] - return {'name': name, 'id': id} - else: - msg = f"ERROR during Target creation. Status code: {status}, msg: {status_text}" - raise Exception(msg) - -def get_targets(filter_str): - res = [] - gmp = create_connection() - targets = gmp.get_targets(filter_string=filter_str) - for target in targets.xpath('target'): - o = dict() - o['name'] = target.xpath('name/text()')[0] - o['hosts'] = target.xpath('hosts/text()')[0] - o['id'] = target.xpath('@id')[0] - o['in_use'] = target.xpath('in_use/text()')[0] - res.append(o) - return res - -def delete_target(target): - gmp = create_connection() - res = gmp.delete_target(target['id']) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "200": - logging.info(f"Port_list with id: {target['id']} and name: {target['name']} DELETED") - else: - logging.error(f"ERROR {status}: {status_text}") - -def get_or_create_target(target_name,ip,port_list): - res = get_targets(target_name) - if len(res) == 0: - t = create_target(target_name,ip,port_list) - return get_targets(t['id'])[0] - elif len(res) == 1: - return res[0] - else: - print(f"Found {len(res)} results. Return None") - return res - -def search_and_delete_target(target_name): - targets = get_targets(target_name) - if len(targets) == 1: - delete_target(targets[0]['id']) - else: - raise("Multiple results for search") - -def search_and_delete_all_targets(target_name): - targets = get_targets(target_name) - for target in targets: - delete_target(target) - -############## TASK ################################## - -def create_task(name, target): - o = dict() - gmp = create_connection() - res = gmp.create_task( - name=name, - config_id=config['id'], - target_id=target['id'], - scanner_id=scanner['id']) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "201": - id = res.xpath('@id')[0] - return {'name': name, 'id': id} - else: - msg = f"ERROR during Task creation. Status code: {status}, msg: {status_text}" - raise Exception(msg) - -def get_tasks(filter_str): - res = [] - gmp = create_connection() - tasks = gmp.get_tasks(filter_string=filter_str) - for task in tasks.xpath('task'): - o = dict() - o['name'] = task.xpath('name/text()')[0] - o['id'] = task.xpath('@id')[0] - o['progress'] = task.xpath('progress/text()')[0] - o['in_use'] = task.xpath('in_use/text()')[0] - o['status'] = task.xpath('status/text()')[0] - o['target_id'] = task.xpath('target/@id')[0] - try: - o['report_id'] = task.xpath('last_report/report/@id')[0] - except: - pass - res.append(o) - return res - -def get_or_create_task(task_name, target): - res = get_tasks(task_name) - if len(res) == 0: - t = create_task(task_name, target) - return get_tasks(t['id'])[0] - elif len(res) == 1: - return res[0] - else: - print(f"Found {len(res)} results. Return None") - return res - -def get_all_tasks(): - res = [] - gmp = create_connection() - tasks = gmp.get_tasks(filter_string="rows=-1") - for task in tasks.xpath('task'): - o = dict() - o['name'] = task.xpath('name/text()')[0] - o['id'] = task.xpath('@id')[0] - o['progress'] = task.xpath('progress/text()')[0] - o['in_use'] = task.xpath('in_use/text()')[0] - o['status'] = task.xpath('status/text()')[0] - o['target_id'] = task.xpath('target/@id')[0] - try: - o['report_id'] = task.xpath('last_report/report/@id')[0] - except: - pass - res.append(o) - return res - -def search_and_delete_all_tasks(filter_str): - tasks = get_tasks(filter_str) - for task in tasks: - delete_task(task) - -def start_task(task): - gmp = create_connection() - res = gmp.start_task(task['id']) - task['report_id'] = res.xpath('report_id/text()')[0] - return task - -def stop_task(task): - gmp = create_connection() - res = gmp.stop_task(task['id']) - pretty_print(res) - -def delete_task(task): - gmp = create_connection() - res = gmp.delete_task(task['id']) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "200": - logging.info(f"Target with id: {task['id']} and name: {task['name']} DELETED") - else: - logging.error(f"ERROR {status}: {status_text}") - -############## REPORTS #####################################3 - -class report_formats: - anonymous_xml = "5057e5cc-b825-11e4-9d0e-28d24461215b" - csv_results = "c1645568-627a-11e3-a660-406186ea4fc5" - itg = "77bd6c4a-1f62-11e1-abf0-406186ea4fc5" - pdf = "c402cc3e-b531-11e1-9163-406186ea4fc5" - txt = "a3810a62-1f62-11e1-9219-406186ea4fc5" - xml = "a994b278-1f62-11e1-96ac-406186ea4fc5" - -def get_report_formats(): - gmp = create_connection() - res = gmp.get_report_formats() - for f in res.xpath('report_format'): - name = f.xpath('name/text()')[0] - id = f.xpath('@id')[0] - print(id,name) - -def get_report_format(id): - gmp = create_connection() - res = gmp.get_report_formats() - pretty_print(res) - -def get_progress(task): - task_info = get_tasks(task['id'])[0] - status = task_info['status'] # New -> Requested -> Queued -> Running -> Done - progress = int(task_info['progress'])# 0 0 0 0 -> 100 -1 - return status, progress - -def wait_for_task_ending(task, timeout=3600): - start_time = time() - logging.info("Waiting for scans ends the task") - while True: - status, progress = get_progress(task) - if status not in ["New","Requested","Queued","Running","Done"]: # ["Interrupted", ...] - logging.warning(f"Waiting for scans ends the task. Status: {status}") - return False - if status == "Done" and progress == -1: - logging.info(f"Waiting for scans ends the task. Status: {status}") - return True - if time() - start_time > timeout: - logging.error("TIMEOUT during waiting for task ending") - return False - logging.debug(f"Waiting for the task ends. Now {int(time() - start_time)}s from start. Status: {status}") - sleep(10) - -def save_report(task,report_format_id, report_filename ): - gmp = create_connection() - res = gmp.get_report(task['report_id'], - report_format_id=report_format_id, - ignore_pagination=True, - details="1") - code = str(res.xpath('report/text()')[0]) - with open(report_filename, "wb") as fh: - fh.write(base64.b64decode(code)) - -def save_severity_report(task, severity_filename): - dict_severity = {"Log": 0, "Low": 1, "Medium": 2, "High": 3} - gmp = create_connection() - res = gmp.get_report(task['report_id'], - report_format_id=report_formats.anonymous_xml, - ignore_pagination=True, - details="1") - severities = res.xpath('report/report/ports/port/threat/text()') - old_num_severity = 0 - severity = "Log" - for sev in severities: - if dict_severity[sev] > old_num_severity: - old_num_severity = dict_severity[sev] - severity = sev - with open(severity_filename, "w") as f: - f.write(severity) - -def get_report_info(task): - report = dict() - gmp = create_connection() - res = gmp.get_report(task['report_id'], - report_format_id=report_formats.anonymous_xml, - ignore_pagination=True, - details="1") - threats = res.xpath('report/report/ports/port/threat/text()') - ports = res.xpath('report/report/ports/port/text()') - severities = res.xpath('report/report/ports/port/severity/text()') - severities = list(map(lambda a : float(a), severities)) - for p,t,s in zip(ports, threats, severities): - report[p] = {'severity': s, 'threat': t} - glob_severity = -1 # returned severities are null or positive - glob_threat = 'Log' - for threat,severity in zip(threats,severities): - if severity > glob_severity: - glob_severity = severity - glob_threat = threat - glob_severity = severity - - report['global'] = {'threat': glob_threat, 'severity': glob_severity} - return report - - -def get_reports(filter_str="rows=-1"): - lo = [] - gmp = create_connection() - reports = gmp.get_reports(filter_string = filter_str) - for report in reports.xpath('report'): - o = dict() - o['task_name'] = report.xpath('task/name/text()')[0] - o['id'] = report.xpath('@id')[0] - lo.append(o) - return lo - -def get_numeric_severity(severity): - if severity == "Log": - return 0 - elif severity == "Low": - return 1 - elif severity == "Medium": - return 2 - elif severity == "High": - return 3 - else: - return 4 - -def get_severity_from_number(num): - if num == 0: - return "Low" - elif num == 1: - return "Low" - elif num == 2: - return "Medium" - elif num == 3: - return "High" - else: - return "Undefined" - -def process_global_reports_info(reports): - glob_severity = -1 - glob_threat = 'Log' - for host in reports: - host_glob_severity = reports[host]['global']['severity'] - if host_glob_severity > glob_severity: - glob_severity = host_glob_severity - glob_threat = reports[host]['global']['threat'] - reports['deployment'] = {'severity': glob_severity, - 'threat': glob_threat} - if reports['deployment']['severity'] < 4: - reports['global'] = "OK" - else: - reports['global'] = "NOK" - return reports - diff --git a/files_old/scan_utilities.py b/files_old/scan_utilities.py deleted file mode 100644 index 149112c2df479cd35f1006fefecf09ddde675efc..0000000000000000000000000000000000000000 --- a/files_old/scan_utilities.py +++ /dev/null @@ -1,59 +0,0 @@ -import json -import logging - -SSH_PORT = '22' -HTTP_PORT = '80' -HTTPS_PORT = '443' - -def import_dep_info(file_path: str, endpoint_keys: str) -> dict: - with open(file_path) as f: - data = json.load(f) - - endpoints = {} - for key,value in data['outputs'].items(): - if "_ip" in key: - if isinstance(value, str): - logging.info(f"Endpoint: {value}:{SSH_PORT}") - endpoints[value] = {SSH_PORT} - - if endpoint_keys != "None": - list_endpoints = endpoint_keys.split(',') - for key in data['outputs'].keys(): - if key in list_endpoints: - endpoint = str(data['outputs'][key]) - prefix,url = endpoint.split("://") - if ":" in url: - host,port = url.split(":") - else: - host = url - if prefix == "https": - port = HTTPS_PORT - elif prefix == 'http': - port = HTTP_PORT - else: - raise Exception(f"Impossible to parse the endpoint port. Endpoint: {endpoint}") - logging.info(f"Endpoint: {host}:{port}") - if host not in endpoints: - endpoints[host] = {port} - else: - endpoints[host].add(port) - - for host,ports in endpoints.items(): - endpoints[host] = sorted(list(ports)) - return endpoints - -def process_global_reports_info(reports: dict) -> dict: - glob_severity = -1 - glob_threat = 'Log' - for host in reports: - host_glob_severity = reports[host]['global']['severity'] - if host_glob_severity > glob_severity: - glob_severity = host_glob_severity - glob_threat = reports[host]['global']['threat'] - reports['deployment'] = {'severity': glob_severity, - 'threat': glob_threat} - if reports['deployment']['severity'] < 4: - reports['global'] = "OK" - else: - reports['global'] = "NOK" - return reports \ No newline at end of file