239 lines
6.9 KiB
Python
239 lines
6.9 KiB
Python
import re
|
|
import datetime
|
|
import sys
|
|
import pathlib
|
|
from typing import NamedTuple
|
|
|
|
pattern = re.compile(
|
|
r"(?P<date>\d{4}(-\d{2}){2})\s(?P<time>\d{2}(:\d{2}){2})\s(?P<IP>\d{1,3}(\.\d{1,3}){3}):\d+\s\[(?P<name>\w+)\]\sPeer\sConnection\sInitiated"
|
|
)
|
|
|
|
|
|
class ConnectionInfo(NamedTuple):
|
|
"""
|
|
A named tuple representing connection information.
|
|
|
|
Attributes:
|
|
ip (str): The IP address of the connection.
|
|
datetime (datetime.datetime): The date and time of the connection.
|
|
"""
|
|
|
|
ip: str
|
|
datetime: datetime.datetime
|
|
|
|
|
|
ConnectionMap = dict[str, list[ConnectionInfo]]
|
|
|
|
|
|
class ConnectionEntry(NamedTuple):
|
|
"""
|
|
A named tuple representing a connection entry.
|
|
|
|
Attributes:
|
|
name (str): The name of the connection.
|
|
info (ConnectionInfo): The connection information.
|
|
"""
|
|
|
|
name: str
|
|
info: ConnectionInfo
|
|
|
|
|
|
def log_lines(filepath: pathlib.Path):
|
|
"""
|
|
A generator function that reads lines from a log file.
|
|
|
|
Args:
|
|
filepath (pathlib.Path): The path to the log file.
|
|
|
|
Yields:
|
|
str: A line from the log file.
|
|
"""
|
|
|
|
with filepath.open("r") as f:
|
|
for line in f:
|
|
yield line
|
|
|
|
|
|
def filter_log_lines_for_date(lines, date: str):
|
|
"""
|
|
A function that filters log lines for a specific date.
|
|
|
|
Args:
|
|
lines (iterable): An iterable of log lines.
|
|
date (str): The date to filter the log lines for.
|
|
|
|
Returns:
|
|
iterable: An iterable of log lines that start with the specified date.
|
|
"""
|
|
|
|
return filter(lambda line: line.startswith(date + " "), lines)
|
|
|
|
|
|
def parse_date_time(date: str, time: str) -> datetime.datetime:
|
|
"""
|
|
A function that parses a date and time string into a datetime object.
|
|
|
|
Args:
|
|
date (str): The date string to parse.
|
|
time (str): The time string to parse.
|
|
|
|
Returns:
|
|
datetime.datetime: The parsed datetime object.
|
|
"""
|
|
|
|
return datetime.datetime.strptime(date + " " + time, "%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def parse_connections(lines):
|
|
"""
|
|
A generator function that parses log lines into connection entries.
|
|
|
|
Args:
|
|
lines (Iterable[str]): An iterable of log lines.
|
|
|
|
Yields:
|
|
ConnectionEntry: A connection entry parsed from a log line.
|
|
"""
|
|
|
|
for line in lines:
|
|
if match := pattern.match(line):
|
|
yield ConnectionEntry(
|
|
match.group("name"),
|
|
ConnectionInfo(
|
|
match.group("IP"),
|
|
parse_date_time(match.group("date"), match.group("time")),
|
|
),
|
|
)
|
|
|
|
|
|
def get_conn_map(lines):
|
|
"""
|
|
A function that gets a map of connection entries from log lines.
|
|
|
|
Args:
|
|
lines (Iterable[str]): An iterable of log lines.
|
|
|
|
Returns:
|
|
dict: A map of connection entries, where the keys are connection names and the values are lists of connection information.
|
|
"""
|
|
|
|
result = {}
|
|
for name, info in parse_connections(lines):
|
|
result.setdefault(name, []).append(info)
|
|
return result
|
|
|
|
|
|
def find_names_with_multiple_ips(connmap: ConnectionMap) -> ConnectionMap:
|
|
"""
|
|
A function that finds connection entries with same cn and multiple IPs.
|
|
|
|
Args:
|
|
connmap (ConnectionMap): A map of connection entries.
|
|
|
|
Returns:
|
|
ConnectionMap: A map of connection entries with multiple IPs, where the keys are connection names (common name) and the values are lists of connection information.
|
|
"""
|
|
|
|
result = {}
|
|
for name, infos in connmap.items():
|
|
ips = {info.ip for info in infos}
|
|
if len(ips) < 2:
|
|
continue
|
|
for ip in ips:
|
|
max_ip_info = max(
|
|
filter(lambda info: info.ip == ip, infos),
|
|
key=lambda info: info.datetime,
|
|
)
|
|
result.setdefault(name, []).append(max_ip_info)
|
|
return result
|
|
|
|
|
|
def find_fast_repeats(
|
|
connmap: ConnectionMap, threshold: datetime.timedelta, min_repeats: int
|
|
) -> ConnectionMap:
|
|
"""
|
|
A function that finds connection entries with fast repeats.
|
|
|
|
Args:
|
|
connmap (ConnectionMap): A map of connection entries.
|
|
threshold (datetime.timedelta): The maximum time difference between two connection entries to be considered a repeat.
|
|
min_repeats (int): The minimum number of repeats for a connection entry to be included in the result.
|
|
|
|
Returns:
|
|
ConnectionMap: A map of connection entries with fast repeats, where the keys are connection names and the values are lists of connection information.
|
|
"""
|
|
|
|
result = {}
|
|
for name, infos in connmap.items():
|
|
if len(infos) < 2:
|
|
continue
|
|
infos = sorted(infos, key=lambda info: info.datetime)
|
|
for a, b in zip(infos, infos[1:]):
|
|
if a.ip == b.ip and b.datetime - a.datetime <= threshold:
|
|
for x in a, b:
|
|
lst = result.setdefault(name, [])
|
|
if x not in lst:
|
|
lst.append(x)
|
|
if name in result and len(result[name]) < min_repeats:
|
|
result.pop(name)
|
|
return result
|
|
|
|
|
|
def print_multiple_ips(connmap: ConnectionMap):
|
|
"""
|
|
A function that prints connection entries with multiple IPs.
|
|
|
|
Args:
|
|
connmap (ConnectionMap): A map of connection entries.
|
|
"""
|
|
|
|
if len(connmap) == 0:
|
|
return
|
|
print("Multiple IPs:")
|
|
for name, infos in connmap.items():
|
|
print(f"- {name}:")
|
|
for n, info in enumerate(sorted(infos, key=lambda info: info.datetime), 1):
|
|
print(f" {n:02}. {info.ip}: {info.datetime}")
|
|
|
|
|
|
def print_fast_repeats(connmap: ConnectionMap, limit_for_one=10):
|
|
"""
|
|
A function that prints connection entries with fast repeats.
|
|
|
|
Args:
|
|
connmap (ConnectionMap): A map of connection entries.
|
|
limit_for_one (int, optional): The maximum number of repeats to print for each connection entry. Defaults to 10.
|
|
"""
|
|
|
|
if len(connmap) == 0:
|
|
return
|
|
print("Fast repeats:")
|
|
for name, infos in connmap.items():
|
|
print(f"- {name}:")
|
|
for n, info in enumerate(
|
|
sorted(infos, key=lambda info: info.datetime, reverse=True), 1
|
|
):
|
|
print(f" {n:2}. {info.ip}: {info.datetime}")
|
|
if n >= limit_for_one:
|
|
break
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("Error: please specify a log file")
|
|
exit(1)
|
|
|
|
date = datetime.date.today().strftime("%Y-%m-%d")
|
|
log_file = pathlib.Path(sys.argv[1])
|
|
lines = log_lines(log_file)
|
|
lines = filter_log_lines_for_date(lines, date)
|
|
connmap = get_conn_map(lines)
|
|
multiple_ips = find_names_with_multiple_ips(connmap)
|
|
fast_repeats = find_fast_repeats(connmap, datetime.timedelta(minutes=3), 10)
|
|
print_multiple_ips(multiple_ips)
|
|
print_fast_repeats(fast_repeats)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|