-
Gioacchino Vino authoredGioacchino Vino authored
gvm_library.py 17.16 KiB
from gvm.connections import TLSConnection
from gvm.protocols.gmpv208 import Gmp, AliveTest
from gvm.transforms import EtreeTransform
from gvm.xml import pretty_print
from time import time, sleep
import logging
import base64
import json
from typing import Optional, Dict, List, Tuple
def pretty_json(j):
return json.dumps(j,sort_keys=True,indent=4)
class Configs:
config = "9866edc1-8869-4e80-acac-d15d5647b4d9"
scanner = "08b69003-5fc2-4037-a479-93b440211c73"
ovs_ssh_credential = "b9af5845-8b87-4378-bca4-cee39a894c17"
class ReportFormats:
anonymous_xml = "5057e5cc-b825-11e4-9d0e-28d24461215b"
csv_results = "c1645568-627a-11e3-a660-406186ea4fc5"
itg = "77bd6c4a-1f62-11e1-abf0-406186ea4fc5"
pdf = "c402cc3e-b531-11e1-9163-406186ea4fc5"
txt = "a3810a62-1f62-11e1-9219-406186ea4fc5"
xml = "a994b278-1f62-11e1-96ac-406186ea4fc5"
class PortList:
"""
This class helps the managing of the GVM port_list object
Attributes:
client = client used to interact with gvm server (created from GVMClient class)
name: str = name of port list object
id: str = id returned after the creation
in_use: str = state if the port_list object is in use
"""
def __init__(self,
name: str = "",
client = None,
id: str = None,
in_use: str = None):
self.client = client,
self.name = name
self.id = id
self.in_use = in_use
def __str__(self):
d = {'name': self.name,
'id': self.id,
'in_use': self.in_use}
return pretty_json(d)
def __del__(self):
self.delete()
def delete(self):
print("client pl", self.client)
res = self.client.delete_port_list(self.id)
status = res.xpath('@status')[0]
status_text = res.xpath('@status_text')[0]
if status == "200":
logging.info(f"Port_list {self} DELETED")
else:
logging.error(f"ERROR during the port_list deletion {status}: {status_text}")
class Target:
"""
This class helps the managing of the GVM target object
Attributes:
client = client used to interact with gvm server (created from GVMClient class)
name: str = name of target object
id: str = id returned after the creation
in_use: str = state if the target object is in use
"""
def __init__(self,
name: str = "",
id: str = "",
in_use: str = "",
hosts: str = "",
client = None,
port_list: PortList = None):
self.client = client
self.name = name
self.id = id
self.in_use = in_use
self.hosts = hosts
self.port_list = port_list
def __str__(self):
d = {'name': self.name,
'id': self.id,
"in_use": self.in_use,
'hosts': self.hosts}
return pretty_json(d)
def __del__(self):
self.delete()
def delete(self):
res = self.client.delete_target(self.id)
status = res.xpath('@status')[0]
status_text = res.xpath('@status_text')[0]
if status == "200":
logging.info(f"Target {self} DELETED")
else:
logging.error(f"ERROR during the target deletion {status}: {status_text}")
class Task:
"""
This class helps the managing of the GVM task object
Attributes:
client = client used to interact with gvm server (created from GVMClient class)
name: str = name of task object
id: str = id returned after the creation
in_use: str = state if the task object is in use
report_id: str = report id once task is completed
Methods:
start = starts the task
stop = stops the task
delete = deletes the tasks
get_progress = retrieves current task status
wait = waits for the task is completed
save_report = saves report on file
get_report_info = retrieves report information
"""
def __init__(self,
name: str = "",
id: str = "",
client = None,
in_use: str = "",
status: str = "",
progress: str = "",
report_id: str = None,
target: Target = None):
self.client = client
self.name = name
self.id = id
self.in_use = in_use
self.status = status
self.progress = progress
self.report_id = report_id
self.target = target
def __str__(self):
d = {'name': self.name,
'id': self.id,
'in_use': self.in_use,
'status': self.status,
'report_id': self.report_id}
return pretty_json(d)
def start(self):
res = self.client.start_task(self.id)
self.report_id = res.xpath('report_id/text()')[0]
logging.info(f"Task {self} STARTED")
def stop(self):
res = self.client.stop_task(self.id)
pretty_print(res)
logging.info(f"Task {self} STARTED")
def delete(self):
res = self.client.delete_task(self.id)
status = res.xpath('@status')[0]
status_text = res.xpath('@status_text')[0]
if status == "200":
logging.info(f"Task {self} DELETED")
else:
logging.error(f"ERROR during the task deletion {status}: {status_text}")
def __del__(self):
self.delete()
def update_status(self):
tasks_info = self.client.get_tasks(filter_string = self.id)
task_info = tasks_info.xpath('task')[0]
self.name = task_info.xpath('name/text()')[0]
self.status = task_info.xpath('status/text()')[0] # New -> Requested -> Queued -> Running -> Done
self.progress = int(task_info.xpath('progress/text()')[0])# 0 0 0 0 -> 100 -1
self.in_use = task_info.xpath('in_use/text()')[0]
try:
self.report_id = task_info.xpath('last_report/report/@id')[0]
except:
pass
def wait(self, timeout: int = 3600) -> bool:
start_time = time()
logging.debug("Waiting for scans ends the task")
while True:
self.update_status()
if self.status not in ["New","Requested","Queued","Running","Done"]: # ["Interrupted", ...]
logging.warning(f"Task in the undesired status: '{self.status}'")
return False
if self.status == "Done" and self.progress == -1:
logging.info(f"Task completed")
return True
if time() - start_time > timeout:
logging.error("TIMEOUT during waiting for task ending")
return False
logging.debug(f"Waiting for the task ends. Now {int(time() - start_time)}s from start. Status: {self.status}")
sleep(10)
def save_report(self, format: str, filename: str):
res = self.client.get_report(self.report_id,
report_format_id=format,
ignore_pagination=True,
details=True)
code = str(res.xpath('report/text()')[0])
with open(filename, "wb") as fh:
fh.write(base64.b64decode(code))
def get_report_info(self) -> Dict:
report = dict()
res = self.client.get_report(self.report_id,
report_format_id=ReportFormats.anonymous_xml,
ignore_pagination=True,
details="1")
threats = res.xpath('report/report/ports/port/threat/text()')
ports = res.xpath('report/report/ports/port/text()')
severities = res.xpath('report/report/ports/port/severity/text()')
severities = list(map(lambda a : float(a), severities))
for p,t,s in zip(ports, threats, severities):
report[p] = {'severity': s, 'threat': t}
glob_severity = -1 # returned severities are null or positive
glob_threat = 'Log'
for threat,severity in zip(threats,severities):
if severity > glob_severity:
glob_severity = severity
glob_threat = threat
glob_severity = severity
report['global'] = {'threat': glob_threat, 'severity': glob_severity}
return report
class GVMClient():
"""
This class provides API to interact with GVM in order to
get, create and delete port_lists, targets, tasks and reports
"""
CONNECTION_RETRIES = 5
LOCAL_IP = "127.0.0.1"
def __init__(self,
auth_n: str,
auth_p: str,
host_ip: str = LOCAL_IP):
self.auth_name = auth_n
self.auth_passwd = auth_p
self.host_ip = host_ip
self.client = None
self.create_client()
def create_client(self):
retry = self.CONNECTION_RETRIES
while(retry > 0):
try:
self.client = Gmp(TLSConnection(hostname = self.host_ip),
transform=EtreeTransform())
break
except:
logging.error(f"Connection error with the gmp endpoint. Remaining {retry} retries")
retry -= 1
sleep(0.5)
if retry == 0:
raise Exception("Impossible connect to the gmp endpoint even after 5 retries")
self.client.authenticate(self.auth_name, self.auth_passwd)
def get_version(self) -> str:
res = self.client.get_version()
return str(res.xpath('version/text()')[0])
def get_port_lists(self, filter: str = "rows=-1") -> List[PortList]:
res = []
client_res = self.client.get_port_lists(filter_string = filter)
for pl in client_res.xpath('port_list'):
o = PortList()
o.client = self.client
o.name = pl.xpath('name/text()')[0]
o.id = pl.xpath('@id')[0]
o.in_use = pl.xpath('in_use/text()')[0]
res.append(o)
return res
def create_port_list(self, name: str, ports: List[str]) -> Optional[PortList]:
res = self.client.create_port_list(name, ','.join(ports))
status = res.xpath('@status')[0]
status_text = res.xpath('@status_text')[0]
if status == "201":
pl_id = str(res.xpath('@id')[0])
client_res = self.client.get_port_lists(filter_string = pl_id)
pl_info = client_res.xpath('port_list')[0]
o = PortList()
o.client = self.client
o.name = pl_info.xpath('name/text()')[0]
o.id = pl_info.xpath('@id')[0]
o.in_use = pl_info.xpath('in_use/text()')[0]
logging.debug(f'Created port list obj. Name: {name}, id: {id}, ports: {ports}')
return o
else:
logging.error(f"ERROR during Port list creation. Status code: {status}, msg: {status_text}")
msg = f"ERROR during Port list creation. Status code: {status}, msg: {status_text}"
raise Exception(msg)
def delete_port_list(self, pl: PortList):
res = self.client.delete_port_list(pl.id)
status = res.xpath('@status')[0]
status_text = res.xpath('@status_text')[0]
if status == "200":
logging.info(f"Port_list {pl} DELETED")
else:
logging.error(f"ERROR {status}: {status_text}")
def get_or_create_port_list(self, pl_name: str, ports: List[str]) -> PortList:
res = self.get_port_lists(pl_name)
if len(res) == 0:
return self.create_port_list(pl_name, ports)
elif len(res) == 1:
return res[0]
else:
logging.warning(f"Found {len(res)} results.")
return res
def create_target(self, name: str, ip: str, pl: PortList) -> Optional[Target]:
res = self.client.create_target(
name = name,
comment = "",
hosts = [ip],
port_list_id = pl.id,
ssh_credential_id = Configs.ovs_ssh_credential,
alive_test = AliveTest('Consider Alive'))
status = res.xpath('@status')[0]
status_text = res.xpath('@status_text')[0]
if status == "201":
target_id = str(res.xpath('@id')[0])
client_res = self.client.get_targets(filter_string = target_id)
target_info = client_res.xpath('target')[0]
t = Target()
t.client = self.client
t.name = target_info.xpath('name/text()')[0]
t.id = target_info.xpath('@id')[0]
t.in_use = target_info.xpath('in_use/text()')[0]
t.port_list = pl
return t
else:
msg = f"ERROR during Target creation. Status code: {status}, msg: {status_text}"
raise Exception(msg)
def get_targets(self, filter: str) -> List[Target]:
res = []
targets = self.client.get_targets(filter_string = filter)
for target in targets.xpath('target'):
t = Target()
t.client = self.client
t.name = target.xpath('name/text()')[0]
t.id = target.xpath('@id')[0]
t.in_use = target.xpath('in_use/text()')[0]
res.append(t)
return res
def delete_target(self, target: Target):
res = self.client.delete_target(target.id)
status = res.xpath('@status')[0]
status_text = res.xpath('@status_text')[0]
if status == "200":
logging.info(f"Target {target} DELETED")
else:
logging.error(f"ERROR {status}: {status_text}")
def get_or_create_target(self, name: str, ip: str, port_list: PortList) -> Target:
res = self.get_targets(name)
if len(res) == 0:
return self.create_target(name, ip, port_list)
elif len(res) == 1:
res[0].port_list = port_list
return res[0]
else:
logging.warning(f"Found {len(res)} targets. Return None")
return None
def search_and_delete_target(self, target_name: str):
targets = self.get_targets(target_name)
if len(targets) == 1:
self.delete_target(targets[0]['id'])
else:
raise("Multiple results for search")
def search_and_delete_all_targets(self, target_name: str):
targets = self.get_targets(target_name)
for target in targets:
self.delete_target(target)
def create_task(self, name: str, target: Target) -> Optional[Task]:
res = self.client.create_task(
name = name,
config_id = Configs.config,
target_id = target.id,
scanner_id = Configs.scanner)
status = res.xpath('@status')[0]
status_text = res.xpath('@status_text')[0]
if status == "201":
task_id = str(res.xpath('@id')[0])
client_res = self.client.get_tasks(filter_string = task_id)
task_info = client_res.xpath('task')[0]
t = Task()
t.client = self.client
t.name = task_info.xpath('name/text()')[0]
t.id = task_info.xpath('@id')[0]
t.in_use = task_info.xpath('in_use/text()')[0]
t.status = task_info.xpath('status/text()')[0]
try:
t.report_id = task_info.xpath('last_report/report/@id')[0]
except:
pass
t.target = target
return t
else:
msg = f"ERROR during Task creation. Status code: {status}, msg: {status_text}"
raise Exception(msg)
def get_tasks(self, filter: str) -> List[Task]:
list_of_tasks = []
tasks = self.client.get_tasks(filter_string = filter)
for task in tasks.xpath('task'):
t = Task()
t.client = self.client
t.name = task.xpath('name/text()')[0]
t.id = task.xpath('@id')[0]
t.in_use = task.xpath('in_use/text()')[0]
t.status = task.xpath('status/text()')[0]
try:
t.report_id = task.xpath('last_report/report/@id')[0]
except:
pass
list_of_tasks.append(t)
return list_of_tasks
def get_or_create_task(self, task_name: str, target: Target) -> Task:
res = self.get_tasks(task_name)
if len(res) == 0:
return self.create_task(task_name, target)
elif len(res) == 1:
res[0].target = target
return res[0]
else:
print(f"Found {len(res)} results. Return None")
return res
def delete_all_tasks(self, filter: str):
tasks = self.get_tasks(filter)
for task in tasks:
self.delete_task(task)
def get_report_formats(self):
res = self.client.get_report_formats()
for f in res.xpath('report_format'):
name = f.xpath('name/text()')[0]
id = f.xpath('@id')[0]
print(f"Report format id: '{id}' and name: '{name}'")