Skip to content
Snippets Groups Projects
Commit e4316c4a authored by Gioacchino Vino's avatar Gioacchino Vino
Browse files

Reviewed scan python code

parent d72f33ab
No related branches found
No related tags found
1 merge request!108Updated Python scan code
......@@ -294,14 +294,20 @@ class GVMClient():
res.append(o)
return res
def create_port_list(self, name: str, ports: List[str]) -> Optional[str]:
def create_port_list(self, name: str, ports: List[str]) -> Optional[PortList]:
res = self.client.create_port_list(name, ','.join(ports))
status = res.xpath('@status')[0]
status_text = res.xpath('@status_text')[0]
if status == "201":
pl_id = str(res.xpath('@id')[0])
client_res = self.client.get_port_lists(filter_string = pl_id)
pl_info = client_res.xpath('port_list')[0]
o = PortList()
o.client = self.client
o.name = pl_info.xpath('name/text()')[0]
o.id = pl_info.xpath('@id')[0]
o.in_use = pl_info.xpath('in_use/text()')[0]
logging.debug(f'Created port list obj. Name: {name}, id: {id}, ports: {ports}')
return pl_id
else:
logging.error(f"ERROR during Port list creation. Status code: {status}, msg: {status_text}")
msg = f"ERROR during Port list creation. Status code: {status}, msg: {status_text}"
......@@ -319,15 +325,14 @@ class GVMClient():
def get_or_create_port_list(self, pl_name: str, ports: List[str]) -> PortList:
res = self.get_port_lists(pl_name)
if len(res) == 0:
pl_id = self.create_port_list(pl_name, ports)
return self.get_port_lists(pl_id)[0]
return self.create_port_list(pl_name, ports)
elif len(res) == 1:
return res[0]
else:
logging.warning(f"Found {len(res)} results.")
return res
def create_target(self, name: str, ip: str, pl: PortList) -> Optional[str]:
def create_target(self, name: str, ip: str, pl: PortList) -> Optional[Target]:
res = self.client.create_target(
name = name,
comment = "",
......@@ -339,14 +344,22 @@ class GVMClient():
status_text = res.xpath('@status_text')[0]
if status == "201":
target_id = str(res.xpath('@id')[0])
return target_id
client_res = self.client.get_targets(filter_string = target_id)
target_info = client_res.xpath('target')[0]:
t = Target()
t.client = self.client
t.name = target_info.xpath('name/text()')[0]
t.id = target_info.xpath('@id')[0]
t.in_use = target_info.xpath('in_use/text()')[0]
t.port_list = pl
return t
else:
msg = f"ERROR during Target creation. Status code: {status}, msg: {status_text}"
raise Exception(msg)
raise Exception(msg)
def get_targets(self, filter: str) -> List[Target]:
res = []
targets = self.client.get_targets(filter_string=filter)
targets = self.client.get_targets(filter_string = filter)
for target in targets.xpath('target'):
t = Target()
t.client = self.client
......@@ -354,7 +367,7 @@ class GVMClient():
t.id = target.xpath('@id')[0]
t.in_use = target.xpath('in_use/text()')[0]
res.append(t)
return res
return res
def delete_target(self, target: Target):
res = self.client.delete_target(target.id)
......@@ -368,8 +381,7 @@ class GVMClient():
def get_or_create_target(self, name: str, ip: str, port_list: PortList) -> Target:
res = self.get_targets(name)
if len(res) == 0:
t_id = self.create_target(name, ip, port_list)
return self.get_targets(t_id)[0]
return self.create_target(name, ip, port_list)
elif len(res) == 1:
res[0].port_list = port_list
return res[0]
......@@ -389,7 +401,7 @@ class GVMClient():
for target in targets:
self.delete_target(target)
def create_task(self, name: str, target: Target) -> Optional[str]:
def create_task(self, name: str, target: Target) -> Optional[Task]:
res = self.client.create_task(
name = name,
config_id = Configs.config,
......@@ -399,7 +411,20 @@ class GVMClient():
status_text = res.xpath('@status_text')[0]
if status == "201":
task_id = str(res.xpath('@id')[0])
return task_id
client_res = self.client.get_tasks(filter_string = task_id)
task_info = client_res.xpath('task')[0]
t = Task()
t.client = self.client
t.name = task_info.xpath('name/text()')[0]
t.id = task_info.xpath('@id')[0]
t.in_use = task_info.xpath('in_use/text()')[0]
t.status = task_info.xpath('status/text()')[0]
try:
t.report_id = task_info.xpath('last_report/report/@id')[0]
except:
pass
t.target = target
return t
else:
msg = f"ERROR during Task creation. Status code: {status}, msg: {status_text}"
raise Exception(msg)
......@@ -424,8 +449,7 @@ class GVMClient():
def get_or_create_task(self, task_name: str, target: Target) -> Task:
res = self.get_tasks(task_name)
if len(res) == 0:
t_id = self.create_task(task_name, target)
return self.get_tasks(t_id)[0]
return self.create_task(task_name, target)
elif len(res) == 1:
res[0].target = target
return res[0]
......
......@@ -80,11 +80,11 @@ for host,ports in endpoints.items():
summary_filename = f"{output_dir}/summary-report.json"
print("prima di creare PL")
port_list = gvm_client.get_or_create_port_list(port_list_name,ports)
port_list = gvm_client.get_or_create_port_list(port_list_name, ports)
logging.info(f"Port list:\n {port_list}")
print(f"Port list:\n {port_list}")
target = gvm_client.get_or_create_target(target_name,host,port_list)
target = gvm_client.get_or_create_target(target_name, host, port_list)
logging.info(f"Target:\n {target}")
print(f"Target:\n {target}")
......@@ -100,7 +100,7 @@ for host,ports in endpoints.items():
#else:
# reports[host] = f"ERROR Task: {task.id}"
del task
#del task
#del target
#del port_list
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment