diff --git a/files/gvm_library.py b/files/gvm_library.py index 772b812ac9136204d41d2fd68025ae9ff87fe094..95b5e60146bdd2642b884c0f9682e3167c3d0a50 100644 --- a/files/gvm_library.py +++ b/files/gvm_library.py @@ -9,11 +9,13 @@ import json from typing import Optional, Dict, List, Tuple # GVM Xpath Constants +GVM_XPATH_ID = '@id' GVM_XPATH_NAME_TEXT = 'name/text()' GVM_XPATH_REPORT_ID_TEXT = "report_id/text()" GVM_XPATH_STATUS = '@status' GVM_XPATH_STATUS_TEXT = '@status_text' GVM_XPATH_STATUS_TEXT_2 = '@status/text' +GVM_XPATH_STATUS_TEXT_3 = 'status/text()' GVM_XPATH_PROGRESS_TEXT = 'progress/text()' GVM_XPATH_INUSE_TEXT = 'in_use/text()' GVM_XPATH_LAST_REPORT_ID = 'last_report/report/@id' @@ -57,16 +59,81 @@ class PortList: in_use: str = state if the port_list object is in use """ - def __init__(self, - name: str = "", - client = None, - id: str = None, - in_use: str = None): + def __init__(self, + client, + name: str, + ports: List[str]): self.client = client, self.name = name - self.id = id - self.in_use = in_use + self.ports = ','.join(ports) + + # Retrieve port_list objs by name + res = self.__get_info(filter = name) + if len(res) == 0: + # If no result retrieved, create it + self.create() + + else: + if len(res) > 1: + # If one result has been collected, consider the first one + msg = f"The port_list name {name} retrieved {len(res)} results" + logging.warning(msg) + logging.warning("The first one will be considered") + + # If one result has been collected, consider it + self.name = res[0]['name'] + self.id = res[0]['id'] + self.in_use = res[0]['in_use'] + + # Search port_lists by id/name + def __get_info(self, filter: str = "rows=-1") -> List[Dict[str, str]]: + res = [] + pls = self.client.get_port_lists(filter_string = filter) \ + .xpath('port_list') + + for pl in pls: + pl_name = str(pl.xpath(GVM_XPATH_NAME_TEXT)[0]) + pl_id = str(pl.xpath(GVM_XPATH_ID)[0]) + pl_in_use = str(pl.xpath(GVM_XPATH_INUSE_TEXT)[0]) + res.append({"name": pl_name, + "id": pl_id, + "in_use": pl_in_use}) + return res + + def create(self) -> None: + res = self.client.create_port_list(self.name, self.ports) + status = res.xpath(GVM_XPATH_STATUS)[0] + status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] + if status == GVM_STATUS_CREATE_OK: + pl_id = str(res.xpath(GVM_XPATH_ID)[0]) + res = self.__get_info(filter = pl_id) + + if len(res) > 0: + if len(res) > 1: + # Multiple objs retrieved, consider the first one + msg = f"The port_list name {self.name}" + msg += f" retrieved {len(res)} results" + logging.warning(msg) + logging.warning("The first one will be considered") + self.name = res[0]['name'] + self.id = res[0]['id'] + self.in_use = res[0]['in_use'] + msg = "Created port list obj. " + msg += f"Name: {self.name}, id: {self.id}, ports: {self.ports}" + logging.debug(msg) + + else: + # No obj retrieved. Error during creation + msg = f"The port_list name {self.name} retrieved 0 results after creations" + logging.error(msg) + + else: + msg = "ERROR during Port list creation. " + msg += f"Status code: {status}, msg: {status_text}" + logging.error(msg) + raise GvmException(msg) + def __str__(self): d = {'name': self.name, 'id': self.id, @@ -97,18 +164,83 @@ class Target: """ def __init__(self, - name: str = "", - id: str = "", - in_use: str = "", - hosts: str = "", - client = None, - port_list: PortList = None): + client, + name: str, + hosts: str, + port_list: PortList): self.client = client self.name = name - self.id = id - self.in_use = in_use self.hosts = hosts - self.port_list = port_list + self.pl = port_list + + # Retrieve targets objs by name + res = self.__get_info(filter = name) + + if len(res) == 0: + # If no result retrieved, create it + self.create() + + else: + if len(res) > 1: + # If one result has been collected, consider the first one + msg = f"The target name {name} retrieved {len(res)} results" + logging.warning(msg) + logging.warning("The first one will be considered") + + # If one result has been collected, consider it + self.name = res[0]['name'] + self.id = res[0]['id'] + self.in_use = res[0]['in_use'] + + def __get_info(self, filter: str) -> List[Dict[str, str]]: + res = [] + targets = self.client.get_targets(filter_string = filter) \ + .xpath('target') + for target in targets: + t_name = str(target.xpath(GVM_XPATH_NAME_TEXT)[0]) + t_id = str(target.xpath(GVM_XPATH_ID)[0]) + t_in_use = str(target.xpath(GVM_XPATH_INUSE_TEXT)[0]) + res.append({"name": t_name, + "id": t_id, + "in_use": t_in_use}) + return res + + def create(self) -> None: + res = self.client.create_target( + name = self.name, + comment = "", + hosts = [self.host], + port_list_id = self.pl.id, + ssh_credential_id = Configs.ovs_ssh_credential, + alive_test = AliveTest('Consider Alive')) + + status = res.xpath(GVM_XPATH_STATUS)[0] + status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] + if status == GVM_STATUS_CREATE_OK: + t_id = str(res.xpath(GVM_XPATH_ID)[0]) + res = self.__get_info(filter = t_id) + + if len(res) == 0: + # No obj retrieved. Error during creation + msg = f"The target name {self.name} retrieved 0 results after creation" + logging.error(msg) + else: + if len(res) > 1: + # Multiple objs retrieved, consider the first one + msg = f"The target id {t_id} retrieved {len(res)} results" + logging.warning(msg) + logging.warning("The first one will be considered") + + self.name = res[0]['name'] + self.id = res[0]['id'] + self.in_use = res[0]['in_use'] + msg = "Created target obj. " + msg += f"Name: {self.name}, id: {self.id}, ports: {self.ports}" + logging.debug(msg) + else: + msg = "ERROR during Target creation. " + msg += f"Status code: {status}, msg: {status_text}" + raise GvmException(msg) def __str__(self): d = {'name': self.name, @@ -154,22 +286,88 @@ class Task: COMMUNICATION_RETRIES = 3 def __init__(self, - name: str = "", - id: str = "", - client = None, - in_use: str = "", - status: str = "", - progress: str = "", - report_id: str = None, - target: Target = None): + client, + name: str, + target: Target) -> None: self.client = client self.name = name - self.id = id - self.in_use = in_use - self.status = status - self.progress = progress - self.report_id = report_id self.target = target + + # Retrieve task objs by name + res = self.__get_info(filter = name) + + if len(res) == 0: + # If no result retrieved, create it + self.create() + else: + if len(res) > 1: + # If one result has been collected, consider the first one + msg = f"The port_list name {name} retrieved {len(res)} results" + logging.warning(msg) + logging.warning("The first one will be considered") + + self.name = res[0]['name'] + self.id = res[0]['id'] + self.in_use = res[0]['in_use'] + self.status = res[0]['status'] + self.report_id = res[0].get("report_id", None) + + def __get_info(self, filter: str) -> List[dict]: + res = [] + tasks = self.client.get_tasks(filter_string = filter) \ + .xpath('task') + for t in tasks: + t_name = str(t.xpath(GVM_XPATH_NAME_TEXT)[0]) + t_id = str(t.xpath(GVM_XPATH_ID)[0]) + t_in_use = str(t.xpath(GVM_XPATH_INUSE_TEXT)[0]) + t_status = str(t.xpath(GVM_XPATH_STATUS_TEXT_3)[0]) + t_dict = {"name": t_name, + "id": t_id, + "in_use": t_in_use, + "status": t_status} + try: + t_report_id = t.xpath(GVM_XPATH_LAST_REPORT_ID)[0] + except Exception: + pass + else: + t_dict['report_id'] = t_report_id + + res.append(t_dict) + return res + + def create(self) -> None: + res = self.client.create_task( + name = self.name, + config_id = Configs.config, + target_id = self.target.id, + scanner_id = Configs.scanner) + + status = res.xpath(GVM_XPATH_STATUS)[0] + status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] + + if status == GVM_STATUS_CREATE_OK: + t_id = str(res.xpath(GVM_XPATH_ID)[0]) + res = self.__get_info(filter = t_id) + if len(res) == 0: + # No obj retrieved. Error during creation + msg = f"The task id {t_id} retrieved 0 results after creation" + logging.error(msg) + else: + if len(res) > 1: + # Multiple objs retrieved, consider the first one + msg = f"The task id {t_id} retrieved {len(res)} results" + logging.warning(msg) + logging.warning("The first one will be considered") + + self.name = res[0].xpath(GVM_XPATH_NAME_TEXT)[0] + self.id = res[0].xpath(GVM_XPATH_ID)[0] + self.in_use = res[0].xpath(GVM_XPATH_INUSE_TEXT)[0] + self.status = res[0].xpath('status/text()')[0] + self.report_id = res[0].get("report_id", None) + else: + msg = "ERROR during Task creation. " + msg += f"Status code: {status}, msg: {status_text}" + raise GvmException(msg) def __str__(self): d = {'name': self.name, @@ -321,94 +519,6 @@ class GVMClient(): res = self.client.get_version() return str(res.xpath('version/text()')[0]) - def get_port_lists(self, filter: str = "rows=-1") -> List[PortList]: - res = [] - client_res = self.client.get_port_lists(filter_string = filter) - for pl in client_res.xpath('port_list'): - o = PortList() - o.client = self.client - o.name = pl.xpath(GVM_XPATH_NAME_TEXT)[0] - o.id = pl.xpath('@id')[0] - o.in_use = pl.xpath(GVM_XPATH_INUSE_TEXT)[0] - res.append(o) - return res - - def create_port_list(self, name: str, ports: List[str]) -> Optional[PortList]: - res = self.client.create_port_list(name, ','.join(ports)) - status = res.xpath(GVM_XPATH_STATUS)[0] - status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] - if status == GVM_STATUS_CREATE_OK: - pl_id = str(res.xpath('@id')[0]) - client_res = self.client.get_port_lists(filter_string = pl_id) - pl_info = client_res.xpath('port_list')[0] - o = PortList() - o.client = self.client - o.name = pl_info.xpath(GVM_XPATH_NAME_TEXT)[0] - o.id = pl_info.xpath('@id')[0] - o.in_use = pl_info.xpath(GVM_XPATH_INUSE_TEXT)[0] - logging.debug(f'Created port list obj. Name: {name}, id: {id}, ports: {ports}') - return o - else: - logging.error(f"ERROR during Port list creation. Status code: {status}, msg: {status_text}") - msg = f"ERROR during Port list creation. Status code: {status}, msg: {status_text}" - raise GvmException(msg) - - def delete_port_list(self, pl: PortList): - res = self.client.delete_port_list(pl.id) - status = res.xpath(GVM_XPATH_STATUS)[0] - status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] - if status == GVM_STATUS_OK: - logging.info(f"Port_list {pl} DELETED") - else: - logging.error(f"ERROR {status}: {status_text}") - - def get_or_create_port_list(self, pl_name: str, ports: List[str]) -> PortList: - res = self.get_port_lists(pl_name) - if len(res) == 0: - return self.create_port_list(pl_name, ports) - elif len(res) == 1: - return res[0] - else: - logging.warning(f"Found {len(res)} results.") - return res - - def create_target(self, name: str, ip: str, pl: PortList) -> Optional[Target]: - res = self.client.create_target( - name = name, - comment = "", - hosts = [ip], - port_list_id = pl.id, - ssh_credential_id = Configs.ovs_ssh_credential, - alive_test = AliveTest('Consider Alive')) - status = res.xpath(GVM_XPATH_STATUS)[0] - status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] - if status == GVM_STATUS_CREATE_OK: - target_id = str(res.xpath('@id')[0]) - client_res = self.client.get_targets(filter_string = target_id) - target_info = client_res.xpath('target')[0] - t = Target() - t.client = self.client - t.name = target_info.xpath(GVM_XPATH_NAME_TEXT)[0] - t.id = target_info.xpath('@id')[0] - t.in_use = target_info.xpath(GVM_XPATH_INUSE_TEXT)[0] - t.port_list = pl - return t - else: - msg = f"ERROR during Target creation. Status code: {status}, msg: {status_text}" - raise GvmException(msg) - - def get_targets(self, filter: str) -> List[Target]: - res = [] - targets = self.client.get_targets(filter_string = filter) - for target in targets.xpath('target'): - t = Target() - t.client = self.client - t.name = target.xpath(GVM_XPATH_NAME_TEXT)[0] - t.id = target.xpath('@id')[0] - t.in_use = target.xpath(GVM_XPATH_INUSE_TEXT)[0] - res.append(t) - return res - def delete_target(self, target: Target): res = self.client.delete_target(target.id) status = res.xpath(GVM_XPATH_STATUS)[0] @@ -418,17 +528,6 @@ class GVMClient(): else: logging.error(f"ERROR {status}: {status_text}") - def get_or_create_target(self, name: str, ip: str, port_list: PortList) -> Target: - res = self.get_targets(name) - if len(res) == 0: - return self.create_target(name, ip, port_list) - elif len(res) == 1: - res[0].port_list = port_list - return res[0] - else: - logging.warning(f"Found {len(res)} targets. Return First one") - return res[0] - def search_and_delete_target(self, target_name: str): targets = self.get_targets(target_name) if len(targets) == 1: @@ -441,35 +540,36 @@ class GVMClient(): for target in targets: self.delete_target(target) - def create_task(self, name: str, target: Target) -> Optional[Task]: - res = self.client.create_task( - name = name, - config_id = Configs.config, - target_id = target.id, - scanner_id = Configs.scanner) - status = res.xpath(GVM_XPATH_STATUS)[0] - status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0] - if status == GVM_STATUS_CREATE_OK: - task_id = str(res.xpath('@id')[0]) - client_res = self.client.get_tasks(filter_string = task_id) - task_info = client_res.xpath('task')[0] - t = Task() + def delete_all_tasks(self, filter: str): + tasks = self.get_tasks(filter) + for task in tasks: + self.delete_task(task) + + def get_port_lists_qwe(self, filter: str = "rows=-1") -> List[PortList]: + res = [] + client_res = self.client.get_port_lists(filter_string = filter) + for pl in client_res.xpath('port_list'): + o = PortList() + o.client = self.client + o.name = pl.xpath(GVM_XPATH_NAME_TEXT)[0] + o.id = pl.xpath('@id')[0] + o.in_use = pl.xpath(GVM_XPATH_INUSE_TEXT)[0] + res.append(o) + return res + + def get_targets_qwe(self, filter: str) -> List[Target]: + res = [] + targets = self.client.get_targets(filter_string = filter) + for target in targets.xpath('target'): + t = Target() t.client = self.client - t.name = task_info.xpath(GVM_XPATH_NAME_TEXT)[0] - t.id = task_info.xpath('@id')[0] - t.in_use = task_info.xpath(GVM_XPATH_INUSE_TEXT)[0] - t.status = task_info.xpath('status/text()')[0] - try: - t.report_id = task_info.xpath(GVM_XPATH_LAST_REPORT_ID)[0] - except Exception: - pass - t.target = target - return t - else: - msg = f"ERROR during Task creation. Status code: {status}, msg: {status_text}" - raise GvmException(msg) + t.name = target.xpath(GVM_XPATH_NAME_TEXT)[0] + t.id = target.xpath('@id')[0] + t.in_use = target.xpath(GVM_XPATH_INUSE_TEXT)[0] + res.append(t) + return res - def get_tasks(self, filter: str) -> List[Task]: + def get_tasks_qwe(self, filter: str) -> List[Task]: list_of_tasks = [] tasks = self.client.get_tasks(filter_string = filter) for task in tasks.xpath('task'): @@ -485,26 +585,3 @@ class GVMClient(): pass list_of_tasks.append(t) return list_of_tasks - - def get_or_create_task(self, task_name: str, target: Target) -> Task: - res = self.get_tasks(task_name) - if len(res) == 0: - return self.create_task(task_name, target) - elif len(res) == 1: - res[0].target = target - return res[0] - else: - print(f"WARNING: Returned {len(res)} tasks. Returned None") - return res[0] - - def delete_all_tasks(self, filter: str): - tasks = self.get_tasks(filter) - for task in tasks: - self.delete_task(task) - - def get_report_formats(self): - res = self.client.get_report_formats() - for f in res.xpath('report_format'): - rf_name = f.xpath(GVM_XPATH_NAME_TEXT)[0] - rf_id = f.xpath('@id')[0] - print(f"Report format id: '{rf_id}' and name: '{rf_name}'") \ No newline at end of file diff --git a/files/scan.py b/files/scan.py index 9ac169f62f88bf3b9ded7cd93155ce302d0c3e7d..493f51ff444ae88d2b4aea4b4a0a000cee21cf79 100644 --- a/files/scan.py +++ b/files/scan.py @@ -4,6 +4,7 @@ import logging import json import os from gvm_library import GVMClient, ReportFormats, GvmException, pretty_json +from gvm_library import PortList, Task, Target from utilities import import_dep_info, process_global_reports_info, read_not_relevant_issues import argparse @@ -94,13 +95,22 @@ for host,ports in endpoints.items(): report_filename = f"{output_dir}/{host}-report" summary_filename = f"{output_dir}/summary-report.json" - port_list = gvm_client.get_or_create_port_list(port_list_name, ports) + # Create PortList obj related to endpoint + port_list = PortList(client = gvm_client, + name = port_list_name, + ports = ports) logging.info(f"Port list:\n {port_list}") - target = gvm_client.get_or_create_target(target_name, host, port_list) + # Create Target obj related to endpoint + target = Target(client = gvm_client, + name = target_name, + host = host, + port_list =port_list) logging.info(f"Target:\n {target}") - task = gvm_client.get_or_create_task(task_name, target) + task = Task(client= gvm_client, + name = task_name, + target = target) logging.info(f"Task:\n {task}") tasks.append(task) diff --git a/files/utilities.py b/files/utilities.py index 5162fb890744ea4b546948712e9dcc91008a46b0..a6c7d6e8c38cc61a8de821dfbb2b84a4839ac60b 100644 --- a/files/utilities.py +++ b/files/utilities.py @@ -54,10 +54,12 @@ def process_global_reports_info(reports: Dict) -> Dict: glob_threat = reports[host]['global']['threat'] reports['deployment'] = {'severity': glob_severity, 'threat': glob_threat} + if reports['deployment']['severity'] < 4: reports['global'] = "OK" else: reports['global'] = "NOK" + return reports def read_not_relevant_issues() -> List[str]: diff --git a/utils/Dockerfile b/utils/Dockerfile index 6e378619c9a5438156843660a4f2d773207a1cde..619df41f978db04e06d79ae799ad1080528a7411 100644 --- a/utils/Dockerfile +++ b/utils/Dockerfile @@ -9,6 +9,7 @@ RUN DEBIAN_FRONTEND=noninteractive apt-get update \ && rm orchent_${ORCHENT_VERSION}_amd64.deb \ && apt-get clean && rm -rf /var/lib/apt/lists/* -RUN pip install gvm-tools jq yq GitPython +# python3 -m pip install pyopenssl==24.0.0 +RUN python3 -m pip install gvm-tools jq yq GitPython ENTRYPOINT ["/usr/local/bin/setup-sshd"]