sync_ics2gcal/tests/test_sync.py

202 lines
6.4 KiB
Python

import datetime
import hashlib
import operator
from copy import deepcopy
from random import shuffle
from typing import Union, List, Dict, Optional, AnyStr
import dateutil.parser
import pytest
from pytz import timezone, utc
from sync_ics2gcal import CalendarSync, DateDateTime
from sync_ics2gcal.gcal import EventDateOrDateTime, EventData, EventList
def sha1(s: AnyStr) -> str:
h = hashlib.sha1()
h.update(str(s).encode("utf8") if isinstance(s, str) else s)
return h.hexdigest()
def gen_events(
start: int,
stop: int,
start_time: DateDateTime,
no_time: bool = False,
) -> EventList:
duration: datetime.timedelta
date_key: str
date_end: str
if no_time:
start_time = datetime.date(start_time.year, start_time.month, start_time.day)
duration = datetime.date(1, 1, 2) - datetime.date(1, 1, 1)
date_key = "date"
date_end = ""
else:
start_time = utc.normalize(start_time.astimezone(utc)).replace(tzinfo=None) # type: ignore
duration = datetime.datetime(1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1)
date_key = "dateTime"
date_end = "Z"
result: EventList = []
for i in range(start, stop):
event_start = start_time + (duration * i)
event_end = event_start + duration
updated: DateDateTime = event_start
if no_time:
updated = datetime.datetime(
updated.year, updated.month, updated.day, 0, 0, 0, 1, tzinfo=utc
)
event: EventData = {
"summary": "test event __ {}".format(i),
"location": "la la la {}".format(i),
"description": "test TEST -- test event {}".format(i),
"iCalUID": "{}@test.com".format(sha1("test - event {}".format(i))),
"updated": updated.isoformat() + "Z",
"created": updated.isoformat() + "Z",
"start": {date_key: event_start.isoformat() + date_end}, # type: ignore
"end": {date_key: event_end.isoformat() + date_end}, # type: ignore
}
result.append(event)
return result
def gen_list_to_compare(start: int, stop: int) -> EventList:
result: EventList = []
for i in range(start, stop):
result.append({"iCalUID": "test{:06d}".format(i)})
return result
def get_start_date(event: EventData) -> DateDateTime:
event_start: EventDateOrDateTime = event["start"]
start_date: Optional[str] = None
is_date = False
if "date" in event_start:
start_date = event_start["date"] # type: ignore
is_date = True
if "dateTime" in event_start:
start_date = event_start["dateTime"] # type: ignore
result: DateDateTime = dateutil.parser.parse(str(start_date))
if is_date:
result = datetime.date(result.year, result.month, result.day)
return result
def test_compare() -> None:
part_len: int = 20
# [1..2n]
lst_src = gen_list_to_compare(1, 1 + part_len * 2)
# [n..3n]
lst_dst = gen_list_to_compare(1 + part_len, 1 + part_len * 3)
lst_src_rnd = deepcopy(lst_src)
lst_dst_rnd = deepcopy(lst_dst)
shuffle(lst_src_rnd)
shuffle(lst_dst_rnd)
to_ins, to_upd, to_del = CalendarSync._events_list_compare(lst_src_rnd, lst_dst_rnd)
assert len(to_ins) == part_len
assert len(to_upd) == part_len
assert len(to_del) == part_len
assert sorted(to_ins, key=lambda x: x["iCalUID"]) == lst_src[:part_len]
assert sorted(to_del, key=lambda x: x["iCalUID"]) == lst_dst[part_len:]
to_upd_ok = list(zip(lst_src[part_len:], lst_dst[:part_len]))
assert len(to_upd) == len(to_upd_ok)
for item in to_upd_ok:
assert item in to_upd
@pytest.mark.parametrize("no_time", [True, False], ids=["date", "dateTime"])
def test_filter_events_by_date(no_time: bool) -> None:
msk = timezone("Europe/Moscow")
now = utc.localize(datetime.datetime.utcnow())
msk_now = msk.normalize(now.astimezone(msk))
part_len = 5
if no_time:
duration = datetime.date(1, 1, 2) - datetime.date(1, 1, 1)
else:
duration = datetime.datetime(1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1)
date_cmp: DateDateTime = msk_now + (duration * part_len)
if no_time:
date_cmp = datetime.date(date_cmp.year, date_cmp.month, date_cmp.day)
events = gen_events(1, 1 + (part_len * 2), msk_now, no_time)
shuffle(events)
events_pending = CalendarSync._filter_events_by_date(events, date_cmp, operator.ge)
events_past = CalendarSync._filter_events_by_date(events, date_cmp, operator.lt)
assert len(events_pending) == 1 + part_len
assert len(events_past) == part_len - 1
for event in events_pending:
assert get_start_date(event) >= date_cmp
for event in events_past:
assert get_start_date(event) < date_cmp
def test_filter_events_to_update() -> None:
msk = timezone("Europe/Moscow")
now = utc.localize(datetime.datetime.utcnow())
msk_now = msk.normalize(now.astimezone(msk))
one_hour = datetime.datetime(1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1)
date_upd = msk_now + (one_hour * 5)
count = 10
events_old = gen_events(1, 1 + count, msk_now)
events_new = gen_events(1, 1 + count, date_upd)
sync1 = CalendarSync(None, None) # type: ignore
sync1.to_update = list(zip(events_new, events_old))
sync1._filter_events_to_update()
sync2 = CalendarSync(None, None) # type: ignore
sync2.to_update = list(zip(events_old, events_new))
sync2._filter_events_to_update()
assert len(sync1.to_update) == count
assert sync2.to_update == []
def test_filter_events_no_updated() -> None:
"""
test filtering events that not have 'updated' field
such events should always pass the filter
"""
now = datetime.datetime.utcnow()
yesterday = now - datetime.timedelta(days=-1)
count = 10
events_old = gen_events(1, 1 + count, now)
events_new = gen_events(1, 1 + count, now)
# 1/2 updated=yesterday, 1/2 no updated field
i = 0
for event in events_new:
if 0 == i % 2:
event["updated"] = yesterday.isoformat() + "Z"
else:
del event["updated"]
i += 1
sync = CalendarSync(None, None) # type: ignore
sync.to_update = list(zip(events_old, events_new))
sync._filter_events_to_update()
assert len(sync.to_update) == count // 2