import re import datetime import sys import pathlib from typing import NamedTuple, TypeAlias datepattern = re.compile(r"(?P<date>\d{4}(-\d{2}){2})\s") pattern = re.compile( r"(?P<date>\d{4}(-\d{2}){2})\s(?P<time>\d{2}(:\d{2}){2})\s(?P<IP>\d{1,3}(\.\d{1,3}){3}):\d+\s\[(?P<name>\w+)\]\sPeer\sConnection\sInitiated" ) class ConnectionInfo(NamedTuple): ip: str datetime: datetime.datetime ConnectionName: TypeAlias = str ConnectionMap = dict[ConnectionName, list[ConnectionInfo]] class ConnectionEntry(NamedTuple): name: ConnectionName info: ConnectionInfo def log_lines(filepath: pathlib.Path): with filepath.open("r") as f: for line in f: yield line def filter_log_lines_for_date(filepath: pathlib.Path, date: str): return filter( lambda line: datepattern.match(line).group("date") == date, log_lines(filepath) ) def parse_date_time(date: str, time: str) -> datetime.datetime: return datetime.datetime.strptime(date + " " + time, "%Y-%m-%d %H:%M:%S") def parse_connections(lines): for line in lines: if match := pattern.match(line): yield ConnectionEntry( match.group("name"), ConnectionInfo( match.group("IP"), parse_date_time(match.group("date"), match.group("time")), ), ) def get_conn_map(lines): result = {} for name, info in parse_connections(lines): result.setdefault(name, []).append(info) return result def find_names_with_multiple_ips(connmap: ConnectionMap) -> ConnectionMap: result = {} for name, infos in connmap.items(): ips = {info.ip for info in infos} if len(ips) < 2: continue for ip in ips: max_ip_info = max( filter(lambda info: info.ip == ip, infos), key=lambda info: info.datetime, ) result.setdefault(name, []).append(max_ip_info) return result def find_fast_repeats( connmap: ConnectionMap, threshold: datetime.timedelta, min_repeats: int ) -> ConnectionMap: result = {} for name, infos in connmap.items(): if len(infos) < 2: continue infos = sorted(infos, key=lambda info: info.datetime) for a, b in zip(infos, infos[1:]): if a.ip == b.ip and b.datetime - a.datetime <= threshold: result.setdefault(name, []).extend((a, b)) if name in result and len(result[name]) < min_repeats: result.pop(name) return result def print_multiple_ips(connmap: ConnectionMap): if len(connmap) == 0: return print("Multiple IPs:") for name, infos in connmap.items(): print(f"- {name}:") for n, info in enumerate(sorted(infos, key=lambda info: info.datetime), 1): print(f" {n:02}. {info.ip}: {info.datetime}") def print_fast_repeats(connmap: ConnectionMap, limit_for_one=10): if len(connmap) == 0: return print("Fast repeats:") for name, infos in connmap.items(): print(f"- {name}:") for n, info in enumerate( reversed(sorted(infos, key=lambda info: info.datetime)), 1 ): print(f" {n:2}. {info.ip}: {info.datetime}") if n >= limit_for_one: break def main(): if len(sys.argv) < 2: print("Error: please specify a log file") exit(1) date = datetime.date.today().strftime("%Y-%m-%d") log_file = pathlib.Path(sys.argv[1]) lines = log_lines(log_file) lines = filter_log_lines_for_date(pathlib.Path(log_file), date) connmap = get_conn_map(lines) multiple_ips = find_names_with_multiple_ips(connmap) fast_repeats = find_fast_repeats(connmap, datetime.timedelta(minutes=3), 10) print_multiple_ips(multiple_ips) print_fast_repeats(fast_repeats) if __name__ == "__main__": main()