diff --git a/files/gvm_library.py b/files/gvm_library.py index 27087ae9240e9cb9aa903001feb15fc9f50140fd..437c6294397d65c577155220fe2e039ac2baef44 100644 --- a/files/gvm_library.py +++ b/files/gvm_library.py @@ -8,6 +8,21 @@ import base64 import json from typing import Optional, Dict, List, Tuple +# GVM Xpath Constants +GVM_XPATH_NAME_TEXT = 'name/text()' +GVM_XPATH_REPORT_ID_TEXT = "report_id/text()" +GVM_XPATH_STATUS = '@status' +GVM_XPATH_STATUS_TEXT = '@status_text' +GVM_XPATH_STATUS_TEXT_2 = '@status/text' +GVM_XPATH_PROGRESS_TEXT = 'progress/text()' +GVM_XPATH_INUSE_TEXT = 'in_use/text()' +GVM_XPATH_LAST_REPORT_ID = 'last_report/report/@id' +GVM_XPATH_REPORT_TEXT = 'report/text()' + +# GVM Status Constants +GVM_STATUS_OK = "200" +GVM_STATUS_CREATE_OK = "201" + # Custom Exceptions class GvmException(Exception): pass @@ -63,9 +78,9 @@ class PortList: def delete(self): res = self.client.delete_port_list(self.id) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "200": + status = res.xpath(GVM_XPATH_STATUS)[0] + status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] + if status == GVM_STATUS_OK: logging.info(f"Port_list {self} DELETED") else: logging.error(f"ERROR during the port_list deletion {status}: {status_text}") @@ -107,9 +122,9 @@ class Target: def delete(self): res = self.client.delete_target(self.id) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "200": + status = res.xpath(GVM_XPATH_STATUS)[0] + status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] + if status == GVM_STATUS_OK: logging.info(f"Target {self} DELETED") else: logging.error(f"ERROR during the target deletion {status}: {status_text}") @@ -163,7 +178,7 @@ class Task: def start(self): res = self.client.start_task(self.id) - self.report_id = res.xpath('report_id/text()')[0] + self.report_id = res.xpath(GVM_XPATH_REPORT_ID_TEXT)[0] logging.info(f"Task {self} STARTED") def stop(self): @@ -173,9 +188,9 @@ class Task: def delete(self): res = self.client.delete_task(self.id) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "200": + status = res.xpath(GVM_XPATH_STATUS)[0] + status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] + if status == GVM_STATUS_OK: logging.info(f"Task {self} DELETED") else: logging.error(f"ERROR during the task deletion {status}: {status_text}") @@ -184,18 +199,22 @@ class Task: self.delete() def update_status(self): - tasks_info = self.client.get_tasks(filter_string = self.id) - task_info = tasks_info.xpath('task')[0] - self.name = task_info.xpath('name/text()')[0] - self.status = task_info.xpath('status/text()')[0] # New -> Requested -> Queued -> Running -> Done - self.progress = int(task_info.xpath('progress/text()')[0])# 0 0 0 0 -> 100 -1 - self.in_use = task_info.xpath('in_use/text()')[0] try: - self.report_id = task_info.xpath('last_report/report/@id')[0] - except: + tasks_info = self.client.get_tasks(filter_string = self.id) + except Exception: pass - - def wait(self, timeout: int = 3600) -> bool: + else: + task_info = tasks_info.xpath('task')[0] + self.name = task_info.xpath(GVM_XPATH_NAME_TEXT)[0] + self.status = task_info.xpath(GVM_XPATH_STATUS_TEXT_2)[0] # New -> Requested -> Queued -> Running -> Done + self.progress = int(task_info.xpath(GVM_XPATH_PROGRESS_TEXT)[0])# 0 0 0 0 -> 100 -1 + self.in_use = task_info.xpath(GVM_XPATH_INUSE_TEXT)[0] + try: + self.report_id = task_info.xpath(GVM_XPATH_LAST_REPORT_ID)[0] + except Exception: + pass + + def wait(self, timeout: int = 7200) -> bool: start_time = time() logging.debug("Waiting for scans ends the task") while True: @@ -204,7 +223,7 @@ class Task: logging.warning(f"Task in the undesired status: '{self.status}'") return False if self.status == "Done" and self.progress == -1: - logging.info(f"Task completed") + logging.info("Task completed") return True if time() - start_time > timeout: logging.error("TIMEOUT during waiting for task ending") @@ -217,7 +236,7 @@ class Task: report_format_id=format, ignore_pagination=True, details=True) - code = str(res.xpath('report/text()')[0]) + code = str(res.xpath(GVM_XPATH_REPORT_TEXT)[0]) with open(filename, "wb") as fh: fh.write(base64.b64decode(code)) @@ -296,37 +315,37 @@ class GVMClient(): for pl in client_res.xpath('port_list'): o = PortList() o.client = self.client - o.name = pl.xpath('name/text()')[0] + o.name = pl.xpath(GVM_XPATH_NAME_TEXT)[0] o.id = pl.xpath('@id')[0] - o.in_use = pl.xpath('in_use/text()')[0] + o.in_use = pl.xpath(GVM_XPATH_INUSE_TEXT)[0] res.append(o) return res def create_port_list(self, name: str, ports: List[str]) -> Optional[PortList]: res = self.client.create_port_list(name, ','.join(ports)) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "201": + status = res.xpath(GVM_XPATH_STATUS)[0] + status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] + if status == GVM_STATUS_CREATE_OK: pl_id = str(res.xpath('@id')[0]) client_res = self.client.get_port_lists(filter_string = pl_id) pl_info = client_res.xpath('port_list')[0] o = PortList() o.client = self.client - o.name = pl_info.xpath('name/text()')[0] + o.name = pl_info.xpath(GVM_XPATH_NAME_TEXT)[0] o.id = pl_info.xpath('@id')[0] - o.in_use = pl_info.xpath('in_use/text()')[0] + o.in_use = pl_info.xpath(GVM_XPATH_INUSE_TEXT)[0] logging.debug(f'Created port list obj. Name: {name}, id: {id}, ports: {ports}') return o else: logging.error(f"ERROR during Port list creation. Status code: {status}, msg: {status_text}") msg = f"ERROR during Port list creation. Status code: {status}, msg: {status_text}" - raise Exception(msg) + raise GvmException(msg) def delete_port_list(self, pl: PortList): res = self.client.delete_port_list(pl.id) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "200": + status = res.xpath(GVM_XPATH_STATUS)[0] + status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] + if status == GVM_STATUS_OK: logging.info(f"Port_list {pl} DELETED") else: logging.error(f"ERROR {status}: {status_text}") @@ -349,22 +368,22 @@ class GVMClient(): port_list_id = pl.id, ssh_credential_id = Configs.ovs_ssh_credential, alive_test = AliveTest('Consider Alive')) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "201": + status = res.xpath(GVM_XPATH_STATUS)[0] + status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] + if status == GVM_STATUS_CREATE_OK: target_id = str(res.xpath('@id')[0]) client_res = self.client.get_targets(filter_string = target_id) target_info = client_res.xpath('target')[0] t = Target() t.client = self.client - t.name = target_info.xpath('name/text()')[0] + t.name = target_info.xpath(GVM_XPATH_NAME_TEXT)[0] t.id = target_info.xpath('@id')[0] - t.in_use = target_info.xpath('in_use/text()')[0] + t.in_use = target_info.xpath(GVM_XPATH_INUSE_TEXT)[0] t.port_list = pl return t else: msg = f"ERROR during Target creation. Status code: {status}, msg: {status_text}" - raise Exception(msg) + raise GvmException(msg) def get_targets(self, filter: str) -> List[Target]: res = [] @@ -372,17 +391,17 @@ class GVMClient(): for target in targets.xpath('target'): t = Target() t.client = self.client - t.name = target.xpath('name/text()')[0] + t.name = target.xpath(GVM_XPATH_NAME_TEXT)[0] t.id = target.xpath('@id')[0] - t.in_use = target.xpath('in_use/text()')[0] + t.in_use = target.xpath(GVM_XPATH_INUSE_TEXT)[0] res.append(t) return res def delete_target(self, target: Target): res = self.client.delete_target(target.id) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "200": + status = res.xpath(GVM_XPATH_STATUS)[0] + status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] + if status == GVM_STATUS_OK: logging.info(f"Target {target} DELETED") else: logging.error(f"ERROR {status}: {status_text}") @@ -395,15 +414,15 @@ class GVMClient(): res[0].port_list = port_list return res[0] else: - logging.warning(f"Found {len(res)} targets. Return None") - return None + logging.warning(f"Found {len(res)} targets. Return First one") + return res[0] def search_and_delete_target(self, target_name: str): targets = self.get_targets(target_name) if len(targets) == 1: self.delete_target(targets[0]['id']) else: - raise("Multiple results for search") + raise GvmException("Multiple results for search") def search_and_delete_all_targets(self, target_name: str): targets = self.get_targets(target_name) @@ -416,27 +435,27 @@ class GVMClient(): config_id = Configs.config, target_id = target.id, scanner_id = Configs.scanner) - status = res.xpath('@status')[0] - status_text = res.xpath('@status_text')[0] - if status == "201": + status = res.xpath(GVM_XPATH_STATUS)[0] + status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] + if status == GVM_STATUS_CREATE_OK: task_id = str(res.xpath('@id')[0]) client_res = self.client.get_tasks(filter_string = task_id) task_info = client_res.xpath('task')[0] t = Task() t.client = self.client - t.name = task_info.xpath('name/text()')[0] + t.name = task_info.xpath(GVM_XPATH_NAME_TEXT)[0] t.id = task_info.xpath('@id')[0] - t.in_use = task_info.xpath('in_use/text()')[0] + t.in_use = task_info.xpath(GVM_XPATH_INUSE_TEXT)[0] t.status = task_info.xpath('status/text()')[0] try: - t.report_id = task_info.xpath('last_report/report/@id')[0] - except: + t.report_id = task_info.xpath(GVM_XPATH_LAST_REPORT_ID)[0] + except Exception: pass t.target = target return t else: msg = f"ERROR during Task creation. Status code: {status}, msg: {status_text}" - raise Exception(msg) + raise GvmException(msg) def get_tasks(self, filter: str) -> List[Task]: list_of_tasks = [] @@ -444,12 +463,12 @@ class GVMClient(): for task in tasks.xpath('task'): t = Task() t.client = self.client - t.name = task.xpath('name/text()')[0] + t.name = task.xpath(GVM_XPATH_NAME_TEXT)[0] t.id = task.xpath('@id')[0] - t.in_use = task.xpath('in_use/text()')[0] + t.in_use = task.xpath(GVM_XPATH_INUSE_TEXT)[0] t.status = task.xpath('status/text()')[0] try: - t.report_id = task.xpath('last_report/report/@id')[0] + t.report_id = task.xpath(GVM_XPATH_LAST_REPORT_ID)[0] except Exception: pass list_of_tasks.append(t) @@ -474,6 +493,6 @@ class GVMClient(): def get_report_formats(self): res = self.client.get_report_formats() for f in res.xpath('report_format'): - name = f.xpath('name/text()')[0] - id = f.xpath('@id')[0] - print(f"Report format id: '{id}' and name: '{name}'") \ No newline at end of file + rf_name = f.xpath(GVM_XPATH_NAME_TEXT)[0] + rf_id = f.xpath('@id')[0] + print(f"Report format id: '{rf_id}' and name: '{rf_name}'") \ No newline at end of file diff --git a/files/scan.py b/files/scan.py index 155b909b5c4c2cfa56b49a8e655c874a40a030b9..dd75e69d1aa98d0154dcd15693469537a4a13390 100644 --- a/files/scan.py +++ b/files/scan.py @@ -8,7 +8,7 @@ from utilities import import_dep_info, process_global_reports_info, read_not_rel import argparse ### GVM Options ### -WAIT_TIMEOUT = 7200 #1h +WAIT_TIMEOUT = 7200 #2h parser = argparse.ArgumentParser( description='Scan endpoints and machines')