138 lines
4.1 KiB
Python

import re
import datetime
import sys
import pathlib
from typing import NamedTuple, TypeAlias
datepattern = re.compile(r"(?P<date>\d{4}(-\d{2}){2})\s")
pattern = re.compile(
r"(?P<date>\d{4}(-\d{2}){2})\s(?P<time>\d{2}(:\d{2}){2})\s(?P<IP>\d{1,3}(\.\d{1,3}){3}):\d+\s\[(?P<name>\w+)\]\sPeer\sConnection\sInitiated"
)
class ConnectionInfo(NamedTuple):
ip: str
datetime: datetime.datetime
ConnectionName: TypeAlias = str
ConnectionMap = dict[ConnectionName, list[ConnectionInfo]]
class ConnectionEntry(NamedTuple):
name: ConnectionName
info: ConnectionInfo
def log_lines(filepath: pathlib.Path):
with filepath.open("r") as f:
for line in f:
yield line
def filter_log_lines_for_date(filepath: pathlib.Path, date: str):
return filter(
lambda line: datepattern.match(line).group("date") == date, log_lines(filepath)
)
def parse_date_time(date: str, time: str) -> datetime.datetime:
return datetime.datetime.strptime(date + " " + time, "%Y-%m-%d %H:%M:%S")
def parse_connections(lines):
for line in lines:
if match := pattern.match(line):
yield ConnectionEntry(
match.group("name"),
ConnectionInfo(
match.group("IP"),
parse_date_time(match.group("date"), match.group("time")),
),
)
def get_conn_map(lines):
result = {}
for name, info in parse_connections(lines):
result.setdefault(name, []).append(info)
return result
def find_names_with_multiple_ips(connmap: ConnectionMap) -> ConnectionMap:
result = {}
for name, infos in connmap.items():
ips = {info.ip for info in infos}
if len(ips) < 2:
continue
for ip in ips:
max_ip_info = max(
filter(lambda info: info.ip == ip, infos),
key=lambda info: info.datetime,
)
result.setdefault(name, []).append(max_ip_info)
return result
def find_fast_repeats(
connmap: ConnectionMap, threshold: datetime.timedelta, min_repeats: int
) -> ConnectionMap:
result = {}
for name, infos in connmap.items():
if len(infos) < 2:
continue
infos = sorted(infos, key=lambda info: info.datetime)
for a, b in zip(infos, infos[1:]):
if a.ip == b.ip and b.datetime - a.datetime <= threshold:
for x in a, b:
lst = result.setdefault(name, [])
if x not in lst:
lst.append(x)
if name in result and len(result[name]) < min_repeats:
result.pop(name)
return result
def print_multiple_ips(connmap: ConnectionMap):
if len(connmap) == 0:
return
print("Multiple IPs:")
for name, infos in connmap.items():
print(f"- {name}:")
for n, info in enumerate(sorted(infos, key=lambda info: info.datetime), 1):
print(f" {n:02}. {info.ip}: {info.datetime}")
def print_fast_repeats(connmap: ConnectionMap, limit_for_one=10):
if len(connmap) == 0:
return
print("Fast repeats:")
for name, infos in connmap.items():
print(f"- {name}:")
for n, info in enumerate(
reversed(sorted(infos, key=lambda info: info.datetime)), 1
):
print(f" {n:2}. {info.ip}: {info.datetime}")
if n >= limit_for_one:
break
def main():
if len(sys.argv) < 2:
print("Error: please specify a log file")
exit(1)
date = datetime.date.today().strftime("%Y-%m-%d")
log_file = pathlib.Path(sys.argv[1])
lines = log_lines(log_file)
lines = filter_log_lines_for_date(pathlib.Path(log_file), date)
connmap = get_conn_map(lines)
multiple_ips = find_names_with_multiple_ips(connmap)
fast_repeats = find_fast_repeats(connmap, datetime.timedelta(minutes=3), 10)
print_multiple_ips(multiple_ips)
print_fast_repeats(fast_repeats)
if __name__ == "__main__":
main()