import json import logging from typing import Dict, List, Tuple import git import os SSH_PORT: str = '22' HTTP_PORT: str = '80' HTTPS_PORT: str = '443' def parse_endpoint(endpoint: str) -> Tuple[str, str]: prefix,url = endpoint.split("://") if ":" in url: host,port = url.split(":") else: host = url if prefix == "https": port = HTTPS_PORT elif prefix == 'http': port = HTTP_PORT else: raise Exception(f"Impossible to parse the endpoint port. Endpoint: {endpoint}") logging.info(f"Endpoint: {host}:{port}") return host,port def read_file(file_path: str) -> Dict: with open(file_path) as f: data = json.load(f) return data def import_dep_info(file_path: str, endpoint_keys: str) -> Dict[str,List[str]]: data = read_file(file_path) endpoints = {} if endpoint_keys != "None": list_endpoints = endpoint_keys.split(',') for key in data['outputs'].keys(): if key in list_endpoints: endpoint = str(data['outputs'][key]) host, port = parse_endpoint(endpoint) if host not in endpoints: endpoints[host] = {port} else: endpoints[host].add(port) else: for key,value in data['outputs'].items(): if "_ip" in key and isinstance(value, str): logging.info(f"endpoint: {value}:{SSH_PORT}") endpoints[value] = {SSH_PORT} for host,ports in endpoints.items(): endpoints[host] = sorted(list(ports)) return endpoints