-
Gioacchino Vino authoredGioacchino Vino authored
gvm_library.py 20.00 KiB
from gvm.connections import TLSConnection
from gvm.protocols.gmpv208 import Gmp, AliveTest
from gvm.transforms import EtreeTransform
from gvm.xml import pretty_print
from time import time, sleep
import logging
import base64
import json
from typing import Optional, Dict, List, Tuple
# GVM Xpath Constants
GVM_XPATH_ID = '@id'
GVM_XPATH_NAME_TEXT = 'name/text()'
GVM_XPATH_REPORT_ID_TEXT = "report_id/text()"
GVM_XPATH_STATUS = '@status'
GVM_XPATH_STATUS_TEXT = '@status_text'
GVM_XPATH_STATUS_TEXT_2 = '@status/text'
GVM_XPATH_STATUS_TEXT_3 = 'status/text()'
GVM_XPATH_PROGRESS_TEXT = 'progress/text()'
GVM_XPATH_INUSE_TEXT = 'in_use/text()'
GVM_XPATH_LAST_REPORT_ID = 'last_report/report/@id'
GVM_XPATH_REPORT_TEXT = 'report/text()'
# GVM Status Constants
GVM_STATUS_OK = "200"
GVM_STATUS_CREATE_OK = "201"
# Custom Exceptions
class GvmException(Exception):
pass
# More Readable print function
def pretty_json(j):
return json.dumps(j,sort_keys=True,indent=4)
# Class containing config ids
class Configs:
config = "9866edc1-8869-4e80-acac-d15d5647b4d9"
scanner = "08b69003-5fc2-4037-a479-93b440211c73"
ovs_ssh_credential = "b9af5845-8b87-4378-bca4-cee39a894c17"
# Class containining format ids
class ReportFormats:
anonymous_xml = "5057e5cc-b825-11e4-9d0e-28d24461215b"
csv_results = "c1645568-627a-11e3-a660-406186ea4fc5"
itg = "77bd6c4a-1f62-11e1-abf0-406186ea4fc5"
pdf = "c402cc3e-b531-11e1-9163-406186ea4fc5"
txt = "a3810a62-1f62-11e1-9219-406186ea4fc5"
xml = "a994b278-1f62-11e1-96ac-406186ea4fc5"
class PortList:
"""
This class helps the managing of the GVM port_list object
Attributes:
client = client used to interact with gvm server (created from GVMClient class)
name: str = name of port list object
id: str = id returned after the creation
in_use: str = state if the port_list object is in use
"""
def __init__(self,
client,
name: str,
ports: List[str]):
self.client = client
self.name = name
self.ports = ','.join(ports)
# Retrieve port_list objs by name
res = self.__get_info(filter = name)
if len(res) == 0:
# If no result retrieved, create it
self.create()
else:
logging.debug("Already created. Collected from server")
if len(res) > 1:
# If one result has been collected, consider the first one
msg = f"The port_list name {name} retrieved {len(res)} results"
logging.warning(msg)
logging.warning("The first one will be considered")
# If one result has been collected, consider it
self.name = res[0]['name']
self.id = res[0]['id']
self.in_use = res[0]['in_use']
# Search port_lists by id/name
def __get_info(self, filter: str = "rows=-1") -> List[Dict[str, str]]:
res = []
pls = self.client.get_port_lists(filter_string = filter) \
.xpath('port_list')
for pl in pls:
pl_name = str(pl.xpath(GVM_XPATH_NAME_TEXT)[0])
pl_id = str(pl.xpath(GVM_XPATH_ID)[0])
pl_in_use = str(pl.xpath(GVM_XPATH_INUSE_TEXT)[0])
res.append({"name": pl_name,
"id": pl_id,
"in_use": pl_in_use})
return res
def create(self) -> None:
res = self.client.create_port_list(self.name, self.ports)
status = res.xpath(GVM_XPATH_STATUS)[0]
status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
if status == GVM_STATUS_CREATE_OK:
pl_id = str(res.xpath(GVM_XPATH_ID)[0])
res = self.__get_info(filter = pl_id)
if len(res) > 0:
if len(res) > 1:
# Multiple objs retrieved, consider the first one
msg = f"The port_list name {self.name}"
msg += f" retrieved {len(res)} results"
logging.warning(msg)
logging.warning("The first one will be considered")
self.name = res[0]['name']
self.id = res[0]['id']
self.in_use = res[0]['in_use']
msg = "Created port list obj. "
msg += f"Name: {self.name}, id: {self.id}, ports: {self.ports}"
logging.debug(msg)
else:
# No obj retrieved. Error during creation
msg = f"The port_list name {self.name} retrieved 0 results after creations"
logging.error(msg)
else:
msg = "ERROR during Port list creation. "
msg += f"Status code: {status}, msg: {status_text}"
logging.error(msg)
raise GvmException(msg)
def __str__(self):
d = {'name': self.name,
'id': self.id,
'in_use': self.in_use}
return pretty_json(d)
def delete(self):
logging.debug(f"Deletion port_list {self.name}")
res = self.client.delete_port_list(self.id)
self.client = None
status = res.xpath(GVM_XPATH_STATUS)[0]
status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
if status == GVM_STATUS_OK:
logging.info(f"Port_list {self} DELETED")
else:
logging.error(f"ERROR during the port_list deletion {status}: {status_text}")
class Target:
"""
This class helps the managing of the GVM target object
Attributes:
client = client used to interact with gvm server (created from GVMClient class)
name: str = name of target object
id: str = id returned after the creation
in_use: str = state if the target object is in use
"""
def __init__(self,
client,
name: str,
host: str,
port_list: PortList):
self.client = client
self.name = name
self.host = host
self.pl = port_list
# Retrieve targets objs by name
res = self.__get_info(filter = name)
if len(res) == 0:
# If no result retrieved, create it
self.create()
else:
logging.debug("Already created. Collected from server")
if len(res) > 1:
# If one result has been collected, consider the first one
msg = f"The target name {name} retrieved {len(res)} results"
logging.warning(msg)
logging.warning("The first one will be considered")
# If one result has been collected, consider it
self.name = res[0]['name']
self.id = res[0]['id']
self.in_use = res[0]['in_use']
def __get_info(self, filter: str) -> List[Dict[str, str]]:
res = []
targets = self.client.get_targets(filter_string = filter) \
.xpath('target')
for target in targets:
t_name = str(target.xpath(GVM_XPATH_NAME_TEXT)[0])
t_id = str(target.xpath(GVM_XPATH_ID)[0])
t_in_use = str(target.xpath(GVM_XPATH_INUSE_TEXT)[0])
res.append({"name": t_name,
"id": t_id,
"in_use": t_in_use})
return res
def create(self) -> None:
res = self.client.create_target(
name = self.name,
comment = "",
hosts = [self.host],
port_list_id = self.pl.id,
ssh_credential_id = Configs.ovs_ssh_credential,
alive_test = AliveTest('Consider Alive'))
status = res.xpath(GVM_XPATH_STATUS)[0]
status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
if status == GVM_STATUS_CREATE_OK:
t_id = str(res.xpath(GVM_XPATH_ID)[0])
res = self.__get_info(filter = t_id)
if len(res) == 0:
# No obj retrieved. Error during creation
msg = f"The target name {self.name} retrieved 0 results after creation"
logging.error(msg)
else:
if len(res) > 1:
# Multiple objs retrieved, consider the first one
msg = f"The target id {t_id} retrieved {len(res)} results"
logging.warning(msg)
logging.warning("The first one will be considered")
self.name = res[0]['name']
self.id = res[0]['id']
self.in_use = res[0]['in_use']
msg = "Created target obj. "
msg += f"Name: {self.name}, id: {self.id}, host: {self.host}"
logging.debug(msg)
else:
msg = "ERROR during Target creation. "
msg += f"Status code: {status}, msg: {status_text}"
raise GvmException(msg)
def __str__(self):
d = {'name': self.name,
'id': self.id,
"in_use": self.in_use,
'host': self.host}
return pretty_json(d)
def delete(self):
logging.debug(f"Deletion target {self.name}")
res = self.client.delete_target(self.id)
self.client = None
status = res.xpath(GVM_XPATH_STATUS)[0]
status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
if status == GVM_STATUS_OK:
logging.info(f"Target {self} DELETED")
else:
logging.error(f"ERROR during the target deletion {status}: {status_text}")
class Task:
"""
This class helps the managing of the GVM task object
Attributes:
client = client used to interact with gvm server (created from GVMClient class)
name: str = name of task object
id: str = id returned after the creation
in_use: str = state if the task object is in use
report_id: str = report id once task is completed
Methods:
start = starts the task
stop = stops the task
delete = deletes the tasks
get_progress = retrieves current task status
wait = waits for the task is completed
save_report = saves report on file
get_report_info = retrieves report information
"""
# Constants
WAIT_SECONDS = 10
def __init__(self,
client,
name: str,
target: Target) -> None:
self.client = client
self.name = name
self.target = target
# Retrieve task objs by name
res = self.__get_info(filter = name)
if len(res) == 0:
# If no result retrieved, create it
self.create()
else:
logging.debug("Already created. Collected from server")
if len(res) > 1:
# If one result has been collected, consider the first one
msg = f"The port_list name {name} retrieved {len(res)} results"
logging.warning(msg)
logging.warning("The first one will be considered")
self.name = res[0]['name']
self.id = res[0]['id']
self.in_use = res[0]['in_use']
self.status = res[0]['status']
self.report_id = res[0].get("report_id", None)
def __get_info(self, filter: str) -> List[dict]:
res = []
tasks = self.client.get_tasks(filter_string = filter) \
.xpath('task')
for t in tasks:
t_name = str(t.xpath(GVM_XPATH_NAME_TEXT)[0])
t_id = str(t.xpath(GVM_XPATH_ID)[0])
t_in_use = str(t.xpath(GVM_XPATH_INUSE_TEXT)[0])
t_status = str(t.xpath(GVM_XPATH_STATUS_TEXT_3)[0])
t_dict = {"name": t_name,
"id": t_id,
"in_use": t_in_use,
"status": t_status}
try:
t_report_id = t.xpath(GVM_XPATH_LAST_REPORT_ID)[0]
except Exception:
pass
else:
t_dict['report_id'] = t_report_id
res.append(t_dict)
return res
def create(self) -> None:
res = self.client.create_task(
name = self.name,
config_id = Configs.config,
target_id = self.target.id,
scanner_id = Configs.scanner)
status = res.xpath(GVM_XPATH_STATUS)[0]
status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
if status == GVM_STATUS_CREATE_OK:
t_id = str(res.xpath(GVM_XPATH_ID)[0])
res = self.__get_info(filter = t_id)
if len(res) == 0:
# No obj retrieved. Error during creation
msg = f"The task id {t_id} retrieved 0 results after creation"
logging.error(msg)
else:
if len(res) > 1:
# Multiple objs retrieved, consider the first one
msg = f"The task id {t_id} retrieved {len(res)} results"
logging.warning(msg)
logging.warning("The first one will be considered")
self.name = res[0]['name']
self.id = res[0]['id']
self.in_use = res[0]['in_use']
self.status = res[0]['status']
self.report_id = res[0].get("report_id", None)
msg = "Created task obj. "
msg += f"Name: {self.name}, id: {self.id}"
logging.debug(msg)
else:
msg = "ERROR during Task creation. "
msg += f"Status code: {status}, msg: {status_text}"
raise GvmException(msg)
def __str__(self):
d = {'name': self.name,
'id': self.id,
'in_use': self.in_use,
'status': self.status,
'report_id': self.report_id}
return pretty_json(d)
def start(self):
res = self.client.start_task(self.id)
self.report_id = res.xpath(GVM_XPATH_REPORT_ID_TEXT)[0]
logging.info(f"Task {self} STARTED")
def stop(self):
res = self.client.stop_task(self.id)
pretty_print(res)
logging.info(f"Task {self} STARTED")
def delete(self):
logging.debug(f"Deletion task {self.name}")
res = self.client.delete_task(self.id)
self.client = None
status = res.xpath(GVM_XPATH_STATUS)[0]
status_text = res.xpath(GVM_XPATH_STATUS_TEXT)[0]
if status == GVM_STATUS_OK:
logging.info(f"Task {self} DELETED")
else:
logging.error(f"ERROR during the task deletion {status}: {status_text}")
def update_status(self):
task_info = self.client.get_tasks(filter_string = self.id) \
.xpath('task')[0]
self.name = task_info.xpath(GVM_XPATH_NAME_TEXT)[0]
self.status = task_info.xpath(GVM_XPATH_STATUS_TEXT_3)[0] # New -> Requested -> Queued -> Running -> Done
self.progress = int(task_info.xpath(GVM_XPATH_PROGRESS_TEXT)[0])# 0 0 0 0 -> 100 -1
self.in_use = task_info.xpath(GVM_XPATH_INUSE_TEXT)[0]
try:
self.report_id = task_info.xpath(GVM_XPATH_LAST_REPORT_ID)[0]
except Exception:
pass
"""d = {
"name": self.name,
"status": self.status,
"progress":self.progress,
"in_use":self.in_use,
}
logging.debug(f"update_status: \n {pretty_json(d)}")"""
def wait(self, timeout: int = 7200) -> bool:
start_time = time()
logging.debug("Waiting for scans ends the task")
while True:
self.update_status()
if self.status not in ["New","Requested","Queued","Running","Done"]: # ["Interrupted", ...]
logging.warning(f"Task in the undesired status: '{self.status}'")
return False
if self.status == "Done" and self.progress == -1:
logging.info("Task completed")
return True
if time() - start_time > timeout:
logging.error("TIMEOUT during waiting for task ending")
return False
logging.debug(f"Waiting for the task ends. Now {int(time() - start_time)}s from start. Status: {self.status}")
sleep(self.WAIT_SECONDS)
def save_report(self, format: str, filename: str):
res = self.client.get_report(self.report_id,
report_format_id=format,
ignore_pagination=True,
details=True)
code = str(res.xpath(GVM_XPATH_REPORT_TEXT)[0])
with open(filename, "wb") as fh:
fh.write(base64.b64decode(code))
def get_report_info(self,
accepted_issues: List[str],
known_issues: List[str]) -> Dict:
report = dict()
res = self.client.get_report(self.report_id,
report_format_id=ReportFormats.anonymous_xml,
ignore_pagination=True,
details="1")
o_ids = res.xpath('report/report/results/result/nvt/@oid')
severities = res.xpath('report/report/results/result/nvt/severities/@score')
severities = list(map(lambda a : float(a), severities))
treats = res.xpath('report/report/results/result/threat/text()')
ports = res.xpath('report/report/results/result/port/text()')
glob_severity = -1 # severities are not negative
glob_threat = 'None'
for o, s, t, p in zip(o_ids, severities, treats, ports):
msg = f"Detected oid: {o}, severity: {s}, threat: {t} and port: {p}"
if s >= 4: # If severity is not negligible
if o in accepted_issues:
msg += "\t=> ACCEPTED"
else:
if o in known_issues:
msg += "\t=> DROPPED (not accepted but known)"
continue
else:
msg += "\t=> NEW (not accepted and not known)"
logging.debug(msg)
if p in report:
if s > report[p]['severity']:
report[p] = {'severity': s, 'threat': t}
else:
report[p] = {'severity': s, 'threat': t}
if s > glob_severity:
glob_severity = s
glob_threat = t
report['global'] = {'threat': glob_threat, 'severity': glob_severity}
return report
class GVMClient():
"""
This class provides API to interact with GVM in order to
get, create and delete port_lists, targets, tasks and reports
"""
CONNECTION_RETRIES = 5
LOCAL_IP = "127.0.0.1"
def __init__(self,
auth_n: str,
auth_p: str,
host_ip: str = LOCAL_IP):
self.auth_name = auth_n
self.auth_passwd = auth_p
self.host_ip = host_ip
self.client = None
self.create_client()
def create_client(self):
retry = self.CONNECTION_RETRIES
while(retry > 0):
try:
logging.debug('Creation of the GMP Client')
logging.debug(f'host_ip: {self.host_ip}')
self.client = Gmp(TLSConnection(hostname = self.host_ip),
transform=EtreeTransform())
break
except Exception:
logging.error(f"Connection error with the gmp endpoint. Remaining {retry} retries")
retry -= 1
sleep(0.5)
logging.debug('GMP Client Created')
if retry == 0:
raise GvmException("Impossible connect to the gmp endpoint even after 5 retries")
self.client.authenticate(self.auth_name, self.auth_passwd)
logging.debug('GMP Client Authenticated')
def get_client(self):
return self.client
def get_version(self) -> str:
res = self.client.get_version()
return str(res.xpath('version/text()')[0])