This commit is contained in:
Dmitry Belyaev 2025-03-13 12:07:25 +03:00
parent f59b7ab639
commit 9fbb30145a
Signed by: b4tman
GPG Key ID: 41A00BF15EA7E5F3

@ -2,45 +2,99 @@ import re
import datetime
import sys
import pathlib
from typing import NamedTuple, TypeAlias
from typing import NamedTuple
datepattern = re.compile(r"(?P<date>\d{4}(-\d{2}){2})\s")
pattern = re.compile(
r"(?P<date>\d{4}(-\d{2}){2})\s(?P<time>\d{2}(:\d{2}){2})\s(?P<IP>\d{1,3}(\.\d{1,3}){3}):\d+\s\[(?P<name>\w+)\]\sPeer\sConnection\sInitiated"
)
class ConnectionInfo(NamedTuple):
"""
A named tuple representing connection information.
Attributes:
ip (str): The IP address of the connection.
datetime (datetime.datetime): The date and time of the connection.
"""
ip: str
datetime: datetime.datetime
ConnectionName: TypeAlias = str
ConnectionMap = dict[ConnectionName, list[ConnectionInfo]]
ConnectionMap = dict[str, list[ConnectionInfo]]
class ConnectionEntry(NamedTuple):
name: ConnectionName
"""
A named tuple representing a connection entry.
Attributes:
name (str): The name of the connection.
info (ConnectionInfo): The connection information.
"""
name: str
info: ConnectionInfo
def log_lines(filepath: pathlib.Path):
"""
A generator function that reads lines from a log file.
Args:
filepath (pathlib.Path): The path to the log file.
Yields:
str: A line from the log file.
"""
with filepath.open("r") as f:
for line in f:
yield line
def filter_log_lines_for_date(filepath: pathlib.Path, date: str):
return filter(
lambda line: datepattern.match(line).group("date") == date, log_lines(filepath)
)
def filter_log_lines_for_date(lines, date: str):
"""
A function that filters log lines for a specific date.
Args:
lines (iterable): An iterable of log lines.
date (str): The date to filter the log lines for.
Returns:
iterable: An iterable of log lines that start with the specified date.
"""
return filter(lambda line: line.startswith(date + " "), lines)
def parse_date_time(date: str, time: str) -> datetime.datetime:
"""
A function that parses a date and time string into a datetime object.
Args:
date (str): The date string to parse.
time (str): The time string to parse.
Returns:
datetime.datetime: The parsed datetime object.
"""
return datetime.datetime.strptime(date + " " + time, "%Y-%m-%d %H:%M:%S")
def parse_connections(lines):
"""
A generator function that parses log lines into connection entries.
Args:
lines (Iterable[str]): An iterable of log lines.
Yields:
ConnectionEntry: A connection entry parsed from a log line.
"""
for line in lines:
if match := pattern.match(line):
yield ConnectionEntry(
@ -53,6 +107,16 @@ def parse_connections(lines):
def get_conn_map(lines):
"""
A function that gets a map of connection entries from log lines.
Args:
lines (Iterable[str]): An iterable of log lines.
Returns:
dict: A map of connection entries, where the keys are connection names and the values are lists of connection information.
"""
result = {}
for name, info in parse_connections(lines):
result.setdefault(name, []).append(info)
@ -60,6 +124,16 @@ def get_conn_map(lines):
def find_names_with_multiple_ips(connmap: ConnectionMap) -> ConnectionMap:
"""
A function that finds connection entries with same cn and multiple IPs.
Args:
connmap (ConnectionMap): A map of connection entries.
Returns:
ConnectionMap: A map of connection entries with multiple IPs, where the keys are connection names (common name) and the values are lists of connection information.
"""
result = {}
for name, infos in connmap.items():
ips = {info.ip for info in infos}
@ -77,6 +151,18 @@ def find_names_with_multiple_ips(connmap: ConnectionMap) -> ConnectionMap:
def find_fast_repeats(
connmap: ConnectionMap, threshold: datetime.timedelta, min_repeats: int
) -> ConnectionMap:
"""
A function that finds connection entries with fast repeats.
Args:
connmap (ConnectionMap): A map of connection entries.
threshold (datetime.timedelta): The maximum time difference between two connection entries to be considered a repeat.
min_repeats (int): The minimum number of repeats for a connection entry to be included in the result.
Returns:
ConnectionMap: A map of connection entries with fast repeats, where the keys are connection names and the values are lists of connection information.
"""
result = {}
for name, infos in connmap.items():
if len(infos) < 2:
@ -94,6 +180,13 @@ def find_fast_repeats(
def print_multiple_ips(connmap: ConnectionMap):
"""
A function that prints connection entries with multiple IPs.
Args:
connmap (ConnectionMap): A map of connection entries.
"""
if len(connmap) == 0:
return
print("Multiple IPs:")
@ -104,13 +197,21 @@ def print_multiple_ips(connmap: ConnectionMap):
def print_fast_repeats(connmap: ConnectionMap, limit_for_one=10):
"""
A function that prints connection entries with fast repeats.
Args:
connmap (ConnectionMap): A map of connection entries.
limit_for_one (int, optional): The maximum number of repeats to print for each connection entry. Defaults to 10.
"""
if len(connmap) == 0:
return
print("Fast repeats:")
for name, infos in connmap.items():
print(f"- {name}:")
for n, info in enumerate(
reversed(sorted(infos, key=lambda info: info.datetime)), 1
sorted(infos, key=lambda info: info.datetime, reverse=True), 1
):
print(f" {n:2}. {info.ip}: {info.datetime}")
if n >= limit_for_one:
@ -125,7 +226,7 @@ def main():
date = datetime.date.today().strftime("%Y-%m-%d")
log_file = pathlib.Path(sys.argv[1])
lines = log_lines(log_file)
lines = filter_log_lines_for_date(pathlib.Path(log_file), date)
lines = filter_log_lines_for_date(lines, date)
connmap = get_conn_map(lines)
multiple_ips = find_names_with_multiple_ips(connmap)
fast_repeats = find_fast_repeats(connmap, datetime.timedelta(minutes=3), 10)