Skip to content
Snippets Groups Projects

Pandas classifier

Merged Gioacchino Vino requested to merge pandas_classifier into main
1 file
+ 2
0
Compare changes
  • Side-by-side
  • Inline
+ 109
117
@@ -11,6 +11,7 @@ import yaml
from functools import reduce
import os
import git
import pandas as pd
# GVM Xpath Constants
GVM_XPATH_ID = '@id'
@@ -461,16 +462,17 @@ class Task:
with open(filename, "wb") as fh:
fh.write(base64.b64decode(code))
def get_report(self) -> List[Tuple[str,str,str,str]]:
def get_report(self) -> Dict[str,Tuple]:
res = self.client.get_report(self.report_id,
report_format_id=ReportFormats.anonymous_xml,
ignore_pagination=True,
details="1")
o_ids: list[str] = res.xpath('report/report/results/result/nvt/@oid')
severities: list[str] = res.xpath('report/report/results/result/nvt/severities/@score')
treats: list[str] = res.xpath('report/report/results/result/threat/text()')
ports: list[str] = res.xpath('report/report/results/result/port/text()')
return [ResultReport(o,s,t,p) for o,s,t,p in zip(o_ids, severities, treats, ports)]
oids: tuple[str] = tuple(res.xpath('report/report/results/result/nvt/@oid'))
sev: tuple[str] = tuple(res.xpath('report/report/results/result/nvt/severities/@score'))
threat: tuple[str] = tuple(res.xpath('report/report/results/result/threat/text()'))
ports: tuple[str] = tuple(res.xpath('report/report/results/result/port/text()'))
sev: tuple[float] = tuple(map(float,sev))
return {"oids":oids, "severity":sev, "threat":threat, "ports":ports}
class GVMClient():
"""
@@ -536,12 +538,7 @@ class ReportManager():
REPORT_GLOBAL = "global"
REPORT_SEVERITY = "severity"
REPORT_THREAT = "threat"
# OIDS Classes
OID_ACCEPTED = 'accepted-oids'
OID_NEW = 'new-oids'
OID_DROPPED = 'dropped-oids'
OID_OS = 'os-related-oids'
REPORT_PORTS = "ports"
# OS security repository configuration
OS_GIT_REPO = "baltig.infn.it/infn-cloud/os_security_checks.git"
@@ -558,8 +555,15 @@ class ReportManager():
SS_SEC_TOKEN = "GIT_SEC_TOKEN"
SS_SEC_DEST_DIR = "ss-repo"
SS_SEC_CHILD_DIR = "queues"
SS_SEC_ACCEPTED_FILES = ['accepted.txt']
SS_SEC_KNOWN_FILES = ['held.txt', 'new.txt', 'overridden.txt']
SS_SEC_ACKNOWLEDGED_OIDS_FILES = ['accepted.txt']
SS_SEC_REJECTED_OIDS_FILES = ['held.txt', 'new.txt', 'overridden.txt']
# Classification configuration
LABEL_COLUMN = "label"
LABEL_NEW_VULNS = "NEW"
LABEL_ACKNOWLEDGED_VULNS = "ACKNOWLEDGED"
LABEL_REJECTED_VULNS = "REJECTED"
LABEL_OS_RELATED_VULNS = "OS_RELATED"
def __init__(self, os_name: str, is_os: bool) -> None:
logging.info("Report Manager Iniziatation started...")
@@ -604,17 +608,11 @@ class ReportManager():
self.os_all_oids = []
else:
self.os_oids = os_oids
try:
self.os_all_oids = list(reduce(lambda x,y: x + y,
os_oids.values()))
logging.debug("Imported os security oids")
logging.debug(pretty_json(self.os_all_oids))
except Exception as e:
logging.warning("Impossible extract oids from imported yaml")
self.os_all_oids = []
self.os_all_oids = os_oids.get(self.os_name,[])
logging.debug("Imported os security oids")
logging.debug(pretty_json(self.os_all_oids))
def extract_oids(self, lines: List[str]) -> Set[str]:
def extract_oids(self, lines: List[str]) -> Tuple[str]:
oids: List[str] = list()
for line in lines:
line = line.strip()
@@ -622,7 +620,7 @@ class ReportManager():
if len(v_line := line.split(" ")[0]) > 0:
oids.append(v_line)
return set(oids)
return tuple(set(oids))
def import_security_oids(self) -> None:
user = os.environ.get(self.SS_SEC_USER)
@@ -633,107 +631,111 @@ class ReportManager():
git.Repo.clone_from(repo_url, self.SS_SEC_DEST_DIR)
except Exception as e:
logging.warning(f"Impossible clone the ss scans repository, {e}")
self.accepted_oids = []
self.known_oids = []
self.acknowledged_oids = []
self.rejected_oids = []
else:
accepted_oids: List[str] = []
known_oids: List[str] = []
acknowledged_oids: List[str] = []
rejected_oids: List[str] = []
for f in self.SS_SEC_ACCEPTED_FILES:
for f in self.SS_SEC_ACKNOWLEDGED_OIDS_FILES:
filename = os.path.join(files_dir,f)
with open(filename, 'r') as file:
accepted_oids += self.extract_oids(file.readlines())
acknowledged_oids += self.extract_oids(file.readlines())
for f in self.SS_SEC_KNOWN_FILES:
for f in self.SS_SEC_REJECTED_OIDS_FILES:
filename = os.path.join(files_dir,f)
with open(filename, 'r') as file:
known_oids += self.extract_oids(file.readlines())
rejected_oids += self.extract_oids(file.readlines())
self.accepted_oids = accepted_oids
self.known_oids = known_oids
logging.debug("accepted oids")
logging.debug(pretty_json(self.accepted_oids))
logging.debug("known oids")
logging.debug(pretty_json(self.known_oids))
self.acknowledged_oids = tuple(sorted(acknowledged_oids))
self.rejected_oids = tuple(sorted(rejected_oids))
logging.debug("Acknowledged Oids")
logging.debug(pretty_json(self.acknowledged_oids))
logging.debug("Rejected Oids")
logging.debug(pretty_json(self.rejected_oids))
def import_report(self, host: str, report: List[ResultReport]):
def import_report(self, host: str, report: Dict[str,Tuple]) -> None:
self.imported_oids[host] = report
def show_imported_reports(self) -> None:
logging.debug("\nIMPORTED REPORTS")
for host, report in self.imported_oids.items():
logging.debug(f"HOST: {host}")
logging.debug(f"\n{pd.DataFrame(report)}")
logging.debug("")
def classify_reports(self) -> None:
logging.debug("\n\nCLASSIFIED OIDS")
def init_glob_vars(self):
self.report = dict()
self.report[self.REPORT_DEPLOYMENT] = {self.REPORT_SEVERITY: self.DEFAULT_SEVERITY,
self.REPORT_THREAT: self.DEFAULT_THREAT}
self.oids = dict()
TO_SOLVE_VULNS = [self.LABEL_NEW_VULNS,self.LABEL_ACKNOWLEDGED_VULNS]
def init_host_vars(self,host: str):
self.oids[host] = {self.OID_ACCEPTED: [],
self.OID_DROPPED: [],
self.OID_NEW: []}
if not self.is_os:
self.oids[host][self.OID_OS] = []
to_solve = pd.Series({"oids": self.acknowledged_oids})
to_exclude = pd.Series({"oids": self.rejected_oids})
os_vulns = pd.Series({"oids": self.os_all_oids})
self.report[host] = dict()
self.report[host][self.REPORT_GLOBAL] = {self.REPORT_SEVERITY: self.DEFAULT_SEVERITY,
self.REPORT_THREAT: self.DEFAULT_THREAT}
self.report, self.oids = dict(), dict()
self.report[self.REPORT_DEPLOYMENT] = {self.REPORT_SEVERITY: self.DEFAULT_SEVERITY}
def update_summary(self,host, r: ResultReport) -> None:
for host, host_report in self.imported_oids.items():
self.oids[host], self.report[host] = dict(), dict()
# Evaluate max port severity per host
if r.port not in self.report[host] or \
r.severity > self.report[host][r.port][self.REPORT_SEVERITY]:
self.report[host][r.port] = {self.REPORT_SEVERITY: r.severity,
self.REPORT_THREAT: r.threat}
# Create Pandas DataFrame from GreenBone report
vulns = pd.DataFrame(host_report)
# Evaluate max global severity per host
if r.severity > self.report[host][self.REPORT_GLOBAL][self.REPORT_SEVERITY]:
self.report[host][self.REPORT_GLOBAL] = {self.REPORT_SEVERITY: r.severity,
self.REPORT_THREAT: r.threat}
# Add Label column
vulns[self.LABEL_COLUMN] = self.LABEL_NEW_VULNS
# Evaluate Global max severity
if r.severity > self.report[self.REPORT_DEPLOYMENT][self.REPORT_SEVERITY]:
self.report[self.REPORT_DEPLOYMENT] = {self.REPORT_SEVERITY: r.severity,
self.REPORT_THREAT: r.threat}
# Remove not important Vulnerabilties
vulns = vulns[vulns.severity >= self.SEVERITY_THR]
def classify_reports(self) -> None:
# Label Acknowledged Vulnerabilities
vulns.loc[vulns.oids.isin(to_solve.oids),
self.LABEL_COLUMN] = self.LABEL_ACKNOWLEDGED_VULNS
# Init global aggregated variables
self.init_glob_vars()
# Label Excluded Vulnerabilities
vulns.loc[vulns.oids.isin(to_exclude.oids),
self.LABEL_COLUMN] = self.LABEL_REJECTED_VULNS
for host, host_report in self.imported_oids.items():
# Init aggregated variables per host
self.init_host_vars(host)
for res_report in host_report:
logging.debug(res_report)
# Skip if oid is not relevant
if res_report.severity < self.SEVERITY_THR:
self.update_summary(host, res_report)
logging.debug("LOW SEVERITY -> SKIPPED")
continue
# Classify oid
if not self.is_os and res_report.oid in self.os_all_oids:
self.oids[host][self.OID_OS] += [res_report]
logging.debug("OS RELATED")
elif res_report.oid in self.accepted_oids:
self.oids[host][self.OID_ACCEPTED].append(res_report)
self.update_summary(host, res_report)
logging.debug("ACCEPTED")
elif res_report.oid in self.known_oids:
self.oids[host][self.OID_DROPPED].append(res_report)
logging.debug("DROPPED")
else:
self.oids[host][self.OID_NEW].append(res_report)
self.update_summary(host, res_report)
logging.debug("NEW")
if not self.is_os:
# Label Os Vulnerabilities
vulns.loc[vulns.oids.isin(os_vulns.oids),
self.LABEL_COLUMN] = self.LABEL_OS_RELATED_VULNS
for host,data in self.oids.items():
for k,v_list in data.items():
for o in v_list:
logging.debug((host,k,o.oid,o.severity,o.threat,o.port))
# Collect Os Vulnerability oids
self.oids[host][self.LABEL_OS_RELATED_VULNS] = \
vulns[vulns.label == self.LABEL_OS_RELATED_VULNS].oids.to_list()
# Extract global estimation
# Collect Acknowledged Vulnerability oids
self.oids[host][self.LABEL_ACKNOWLEDGED_VULNS] = \
vulns[vulns.label == self.LABEL_ACKNOWLEDGED_VULNS].oids.to_list()
# Collect Rejected Vulnerability oids
self.oids[host][self.LABEL_REJECTED_VULNS] = \
vulns[vulns.label == self.LABEL_REJECTED_VULNS].oids.to_list()
# Collect New Vulnerability oids
self.oids[host][self.LABEL_NEW_VULNS] = \
vulns[vulns.label == self.LABEL_NEW_VULNS].oids.to_list()
# Collect Acknowledged and New Vulnerabilities to create To-Solve Dataframe
to_solve = vulns[vulns[self.LABEL_COLUMN].isin(TO_SOLVE_VULNS)]
# Extract Max Severity per "ports" parameter
for ports, sev in to_solve.groupby(self.REPORT_PORTS).severity.max().items():
self.report[host][ports] = {self.REPORT_SEVERITY: sev}
# Compute Host Max Severity
max_severity = to_solve.severity.max()
self.report[host][self.REPORT_GLOBAL] = {self.REPORT_SEVERITY: max_severity}
# Check if Host Max Severity is greater the Deployment Max Severity
if max_severity > self.report[self.REPORT_DEPLOYMENT][self.REPORT_SEVERITY]:
self.report[self.REPORT_DEPLOYMENT][self.REPORT_SEVERITY] = max_severity
logging.debug(f"HOST: {host}")
logging.debug(f"\n{vulns}")
# Check if the Deployment Max Severity whether relevent or not
if self.report[self.REPORT_DEPLOYMENT][self.REPORT_SEVERITY] < self.SEVERITY_THR:
self.report[self.REPORT_GLOBAL] = self.MSG_OK
else:
@@ -743,17 +745,7 @@ class ReportManager():
return pretty_json(self.report)
def get_classified_oids(self) -> str:
json_oids = dict()
for host, data in self.oids.items():
json_oids[host] = dict()
for key, oids in data.items():
json_oids[host][key] = [str(o) for o in oids]
return pretty_json(json_oids)
def create_msg(self, r: ResultReport):
msg = f" Detected oid: {r.oid}, severity: {r.severity}"
msg += f", threat: {r.threat} and port: {r.port}\n"
return msg
return pretty_json(self.oids)
def write_data(self,
summary_filename: str,
@@ -776,8 +768,8 @@ class ReportManager():
# Overwrite the detected oids to the host oids
self.os_oids[self.os_name] = []
for _ , data in self.oids.items():
self.os_oids[self.os_name] += [a.oid for a in data[self.OID_ACCEPTED]]
self.os_oids[self.os_name] += [n.oid for n in data[self.OID_NEW]]
self.os_oids[self.os_name] += [a.oid for a in data[self.LABEL_ACKNOWLEDGED_VULNS]]
self.os_oids[self.os_name] += [n.oid for n in data[self.LABEL_NEW_VULNS]]
with open(self.os_file, 'w') as f:
yaml.dump(self.os_oids, f)
Loading