Skip to content
Snippets Groups Projects
Commit c3203cad authored by qweqweasdasd's avatar qweqweasdasd
Browse files

Added python script

parent 94a6bca2
No related branches found
No related tags found
No related merge requests found
#gvm-script --gmp-username backbone --gmp-password krowfodwas3olrab tls --hostname "172.17.0.2" script.py
#auth_name = 'backbone'
#auth_passwd = 'krowfodwas3olrab'
from gvm.connections import TLSConnection
from gvm.protocols.gmpv208 import Gmp, AliveTest
from gvm.transforms import EtreeTransform
from gvm.xml import pretty_print
from time import time, sleep
import logging
from datetime import datetime
import json
import base64
from sys import argv, exit
def get_version():
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
pretty_print(gmp.get_version())
def get_or_create_port_list(port_list_name, ports):
pl = get_port_lists(port_list_name)
if len(pl) == 0:
return create_port_list(port_list_name, ports)
elif len(pl) == 1:
return pl[0]
else:
print(f"Found {len(pl)} results. Return None")
return None
def create_port_list(port_list_name, ports):
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
o = dict()
res = gmp.create_port_list(port_list_name, ','.join(ports))
o['status'] = res.xpath('@status')[0]
o['status_text'] = res.xpath('@status_text')[0]
o['name'] = port_list_name
if o['status'] == "201":
o['id'] = res.xpath('@id')[0]
o['created'] = True
print("Post list created")
else:
o['created'] = False
print(f"ERROR during Port list creation. Status code: {o['status']}, msg: {o['status_text']}")
return o
def get_port_lists(filter_str="rows=-1"):
res = []
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
pls = gmp.get_port_lists(filter_string=filter_str)
for pl in pls.xpath('port_list'):
o = dict()
o['name'] = pl.xpath('name/text()')[0]
o['id'] = pl.xpath('@id')[0]
o['in_use'] = pl.xpath('in_use/text()')
res.append(o)
return res
def delete_port_list(port_list):
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
res = gmp.delete_port_list(port_list['id'])
status = res.xpath('@status')[0]
status_text = res.xpath('@status_text')[0]
if status == "200":
print(f"Port_list with id: {port_list['id']} and name: {port_list['name']} DELETED")
else:
print(f"ERROR {status}: {status_text}")
def search_targets(filter_str):
res = []
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
targets = gmp.get_targets(filter_string=filter_str)
for target in targets.xpath('target'):
o = dict()
o['name'] = target.xpath('name/text()')[0]
o['hosts'] = target.xpath('hosts/text()')[0]
o['id'] = target.xpath('@id')[0]
o['in_use'] = target.xpath('in_use/text()')[0]
res.append(o)
return res
def delete_target(target):
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
res = gmp.delete_target(target['id'])
status = res.xpath('@status')[0]
status_text = res.xpath('@status_text')[0]
print(f"Target with id: {target['id']} and name: {target['name']} DELETED")
def search_and_delete_target(target_name):
targets = search_targets(target_name)
if len(targets) == 1:
delete_target(targets[0]['id'])
else:
raise("Multiple results for search")
def search_and_delete_all_targets(target_name):
targets = search_targets(target_name)
for target in targets:
delete_target(target)
def create_target(name,ip,port_list,ovs_ssh_credential_id):
o = dict()
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
res = gmp.create_target(
name=name,
comment = "",
hosts=[ip],
port_list_id = port_list['id'],
ssh_credential_id = ovs_ssh_credential_id,
alive_test=AliveTest('Consider Alive'))
o['status'] = res.xpath('@status')[0]
o['status_text'] = res.xpath('@status_text')[0]
if o['status'] == "201":
o['id'] = res.xpath('@id')[0]
o['name'] = name
o['created'] = True
else:
o['created'] = False
print("ERROR during Target creation")
return o
def create_task(name, config_id, target_id, scanner_id):
o = dict()
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
res = gmp.create_task(
name=name,
config_id=config_id,
target_id=target_id,
scanner_id=scanner_id)
o['status'] = res.xpath('@status')[0]
o['status_text'] = res.xpath('@status_text')[0]
if o['status'] == "201":
o['id'] = res.xpath('@id')[0]
o['name'] = name
o['created'] = True
print("Task created")
else:
o['created'] = False
print("ERROR during Task creation")
return o
def search_tasks(filter_str):
res = []
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
tasks = gmp.get_tasks(filter_string=filter_str)
for task in tasks.xpath('task'):
o = dict()
o['name'] = task.xpath('name/text()')[0]
o['id'] = task.xpath('@id')[0]
o['progress'] = task.xpath('progress/text()')[0]
o['in_use'] = task.xpath('in_use/text()')[0]
o['status'] = task.xpath('status/text()')[0]
o['target_id'] = task.xpath('target/@id')[0]
res.append(o)
return res
def get_all_tasks():
res = []
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
tasks = gmp.get_tasks(filter_string="rows=-1")
for task in tasks.xpath('task'):
o = dict()
o['name'] = task.xpath('name/text()')[0]
o['id'] = task.xpath('@id')[0]
o['progress'] = task.xpath('progress/text()')[0]
o['in_use'] = task.xpath('in_use/text()')[0]
o['status'] = task.xpath('status/text()')[0]
o['target_id'] = task.xpath('target/@id')[0]
res.append(o)
return res
def search_and_delete_all_tasks(filter_str):
tasks = search_tasks(filter_str)
for task in tasks:
delete_task(task)
def start_task(task):
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
res = gmp.start_task(task['id'])
o = dict()
o['status'] = res.xpath('@status')[0]
o['status_text'] = res.xpath('@status_text')[0]
o['id'] = res.xpath('report_id/text()')[0]
return o
def stop_task(task):
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
print(task)
pretty_print(gmp.stop_task(task['id']))
def delete_task(task):
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
res = gmp.delete_task(task['id'])
status = res.xpath('@status')[0]
status_text = res.xpath('@status_text')[0]
if status == "200":
print(f"Target with id: {task['id']} and name: {task['name']} DELETED")
else:
print(f"ERROR {status}: {status_text}")
def get_report_formats():
# |------------- ID -----------------| |--- NAME ---|
# 5057e5cc-b825-11e4-9d0e-28d24461215b Anonymous XML
# c1645568-627a-11e3-a660-406186ea4fc5 CSV Results
# 77bd6c4a-1f62-11e1-abf0-406186ea4fc5 ITG
# c402cc3e-b531-11e1-9163-406186ea4fc5 PDF
# a3810a62-1f62-11e1-9219-406186ea4fc5 TXT
# a994b278-1f62-11e1-96ac-406186ea4fc5 XML
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
res = gmp.get_report_formats()
for f in res.xpath('report_format'):
name = f.xpath('name/text()')[0]
id = f.xpath('@id')[0]
print(id,name)
def get_report_format(id):
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
res = gmp.get_report_formats()
pretty_print(res)
def get_progress(report):
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
res = gmp.get_report(report['id'])
return int(res.xpath('report/report/task/progress/text()')[0])
def wait_job_completition(report):
timeout = 3600
old_progress = 0
start_time = time()
while True:
progress = get_progress(report)
if progress > old_progress:
start_time = time()
old_progress = progress
else:
if time() - start_time > timeout:
# Se il task e' fermo ad una data percentuale per piu' di 1h
# si piu' reputare bloccato il task
print("TIMEOUT during waiting for task completition")
return False
now = datetime.now()
print(f"{now}\tProgress: {progress}%")
if progress > 99:
return True
else:
sleep(10)
def save_report(report,report_format_id, report_filename ):
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
res = gmp.get_report(report['id'],
report_format_id=report_format_id,
ignore_pagination=True,
details="1")
code = str(res.xpath('report/text()')[0])
with open(txt_report_filename, "wb") as fh:
fh.write(base64.b64decode(code))
def save_severity_report(report, severity_filename):
dict_severity = {"Log": 0, "Low": 1, "Medium": 2, "High": 3}
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
res = gmp.get_report(task_info['id'],
report_format_id="5057e5cc-b825-11e4-9d0e-28d24461215b",
ignore_pagination=True,
details="1")
severities = res.xpath('report/report/ports/port/threat/text()')
old_num_severity = 0
severity = "Log"
for sev in severities:
if dict_severity[sev] > old_num_severity:
old_num_severity = dict_severity[sev]
severity = sev
with open(severity_filename, "w") as f:
f.write(severity)
def get_reports(filter_str="rows=-1"):
lo = []
with Gmp(connection, transform=transform) as gmp:
gmp.authenticate(auth_name, auth_passwd)
reports = gmp.get_reports(filter_string = filter_str)
for report in reports.xpath('report'):
o = dict()
o['task_name'] = report.xpath('task/name/text()')[0]
o['id'] = report.xpath('@id')[0]
lo.append(o)
return lo
###########################################################
auth_name = "jenkins"
auth_passwd = "bleyrauvHecsUbDy"
logging.basicConfig(filename='debug.log', level=logging.DEBUG)
connection = TLSConnection(hostname='172.17.0.2')
transform = EtreeTransform()
config_id = "9866edc1-8869-4e80-acac-d15d5647b4d9"
scanner_id = "08b69003-5fc2-4037-a479-93b440211c73"
ovs_ssh_credential_id = "a89d5ebf-01bf-4836-ae72-a65bc633219a"
txt_report_format_id = "a3810a62-1f62-11e1-9219-406186ea4fc5"
csv_report_format_id = "c1645568-627a-11e3-a660-406186ea4fc5"
xml_report_format_id = "5057e5cc-b825-11e4-9d0e-28d24461215b"
print("len_argv:",len(argv))
for i in range(len(argv)):
print(i,argv[i])
dep_json = argv[1]
report_filename = argv[2]
severity_filename = argv[3]
print("dep_json", dep_json)
print("report_filename", report_filename)
print("severity_filename", severity_filename)
with open(dep_json) as f:
data = json.load(f)
endpoints = dict()
for key in data['outputs'].keys():
if "endpoint" in key:
endpoint = str(data['outputs'][key]).split("://")[1]
print("endpoint",endpoint)
host,port = endpoint.split(':')
if host not in endpoints:
endpoints[host] = ["22"]
endpoints[host].append(port)
print(json.dumps(endpoints,sort_keys=True,indent=4))
"""
for host,ports in endpoints.items():
print(host,ports)
target_name = f"{auth_name}_target_{host}"
task_name = f"{auth_name}_task_{host}"
port_list_name = f"{auth_name}_pl_{host}"
pl = get_or_create_port_list(port_list_name,ports)
if 'id' in pl:
target = create_target(target_name,host,pl,ovs_ssh_credential_id)
if target['created']:
print("Target created")
task = create_task(task_name, config_id, target['id'],scanner_id)
if task['created']:
print("Task created")
task_info = start_task(task)
done = wait_job_completition(task_info)
if done:
print("Saving report and severity")
save_report(task_info,txt_report_format_id, report_filename)
save_severity_report(task_info,severity_filename)
print("Done")
delete_task(task)
delete_target(target)
delete_port_list(pl)
"""
#stop_task(task)
#delete_task(task)
#delete_target(target)
#delete_port_list(pl)
#stop_task(task)
#search_and_delete_all_tasks(auth_name)
#search_and_delete_all_targets(auth_name)
#delete_port_list(pl)
#for r in get_reports():
# print(r)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment