test_sync rewrite using pytest

This commit is contained in:
Dmitry Belyaev 2018-04-17 22:41:14 +03:00
parent 508ddedb2c
commit c38a3afe00
Signed by: b4tman
GPG Key ID: 014E87EC54B77673
1 changed files with 153 additions and 165 deletions

View File

@ -1,185 +1,173 @@
import datetime
import hashlib
import operator
import unittest
from copy import deepcopy
from random import shuffle
import dateutil.parser
import pytest
from pytz import timezone, utc
from gcal_sync import CalendarSync
class TestCalendarSync(unittest.TestCase):
@staticmethod
def sha1(string):
if isinstance(string, str):
string = string.encode('utf8')
h = hashlib.sha1()
h.update(string)
return h.hexdigest()
def sha1(string):
if isinstance(string, str):
string = string.encode('utf8')
h = hashlib.sha1()
h.update(string)
return h.hexdigest()
@staticmethod
def gen_events(start, stop, start_time, no_time=False):
if no_time:
start_time = datetime.date(
start_time.year, start_time.month, start_time.day)
duration = datetime.date(1, 1, 2) - datetime.date(1, 1, 1)
date_key = "date"
suff = ''
else:
start_time = utc.normalize(
start_time.astimezone(utc)).replace(tzinfo=None)
duration = datetime.datetime(
1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1)
date_key = "dateTime"
suff = 'Z'
result = []
for i in range(start, stop):
event_start = start_time + (duration * i)
event_end = event_start + duration
updated = event_start
if no_time:
updated = datetime.datetime(
updated.year, updated.month, updated.day, 0, 0, 0, 1, tzinfo=utc)
event = {
'summary': 'test event __ {}'.format(i),
'location': 'la la la {}'.format(i),
'description': 'test TEST -- test event {}'.format(i),
"iCalUID": "{}@test.com".format(TestCalendarSync.sha1("test - event {}".format(i))),
"updated": updated.isoformat() + 'Z',
"created": updated.isoformat() + 'Z'
}
event['start'] = {date_key: event_start.isoformat() + suff}
event['end'] = {date_key: event_end.isoformat() + suff}
result.append(event)
return result
@staticmethod
def gen_list_to_compare(start, stop):
result = []
for i in range(start, stop):
result.append({'iCalUID': 'test{:06d}'.format(i)})
return result
@staticmethod
def get_start_date(event):
event_start = event['start']
start_date = None
is_date = False
if 'date' in event_start:
start_date = event_start['date']
is_date = True
if 'dateTime' in event_start:
start_date = event_start['dateTime']
result = dateutil.parser.parse(start_date)
if is_date:
result = datetime.date(result.year, result.month, result.day)
return result
def test_compare(self):
part_len = 20
# [1..2n]
lst_src = TestCalendarSync.gen_list_to_compare(1, 1 + part_len * 2)
# [n..3n]
lst_dst = TestCalendarSync.gen_list_to_compare(
1 + part_len, 1 + part_len * 3)
lst_src_rnd = deepcopy(lst_src)
lst_dst_rnd = deepcopy(lst_dst)
shuffle(lst_src_rnd)
shuffle(lst_dst_rnd)
to_ins, to_upd, to_del = CalendarSync._events_list_compare(
lst_src_rnd, lst_dst_rnd)
self.assertEqual(len(to_ins), part_len)
self.assertEqual(len(to_upd), part_len)
self.assertEqual(len(to_del), part_len)
self.assertEqual(
sorted(to_ins, key=lambda x: x['iCalUID']), lst_src[:part_len])
self.assertEqual(
sorted(to_del, key=lambda x: x['iCalUID']), lst_dst[part_len:])
to_upd_ok = list(zip(lst_src[part_len:], lst_dst[:part_len]))
self.assertEqual(len(to_upd), len(to_upd_ok))
for item in to_upd_ok:
self.assertIn(item, to_upd)
def test_filter_events_by_date(self, no_time=False):
msk = timezone('Europe/Moscow')
now = utc.localize(datetime.datetime.utcnow())
msk_now = msk.normalize(now.astimezone(msk))
part_len = 5
if no_time:
duration = datetime.date(
1, 1, 2) - datetime.date(1, 1, 1)
else:
duration = datetime.datetime(
1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1)
date_cmp = msk_now + (duration * part_len)
if no_time:
date_cmp = datetime.date(
date_cmp.year, date_cmp.month, date_cmp.day)
events = TestCalendarSync.gen_events(
1, 1 + (part_len * 2), msk_now, no_time)
shuffle(events)
events_pending = CalendarSync._filter_events_by_date(
events, date_cmp, operator.ge)
events_past = CalendarSync._filter_events_by_date(
events, date_cmp, operator.lt)
self.assertEqual(len(events_pending), 1 + part_len)
self.assertEqual(len(events_past), part_len - 1)
for event in events_pending:
self.assertGreaterEqual(
TestCalendarSync.get_start_date(event), date_cmp)
for event in events_past:
self.assertLess(TestCalendarSync.get_start_date(event), date_cmp)
def test_filter_events_by_date_no_time(self):
self.test_filter_events_by_date(no_time=True)
def test_filter_events_to_update(self):
msk = timezone('Europe/Moscow')
now = utc.localize(datetime.datetime.utcnow())
msk_now = msk.normalize(now.astimezone(msk))
one_hour = datetime.datetime(
def gen_events(start, stop, start_time, no_time=False):
if no_time:
start_time = datetime.date(
start_time.year, start_time.month, start_time.day)
duration = datetime.date(1, 1, 2) - datetime.date(1, 1, 1)
date_key = "date"
suff = ''
else:
start_time = utc.normalize(
start_time.astimezone(utc)).replace(tzinfo=None)
duration = datetime.datetime(
1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1)
date_upd = msk_now + (one_hour * 5)
date_key = "dateTime"
suff = 'Z'
count = 10
events_old = TestCalendarSync.gen_events(1, 1 + count, msk_now)
events_new = TestCalendarSync.gen_events(1, 1 + count, date_upd)
result = []
for i in range(start, stop):
event_start = start_time + (duration * i)
event_end = event_start + duration
sync1 = CalendarSync(None, None)
sync1.to_update = list(zip(events_new, events_old))
sync1._filter_events_to_update()
updated = event_start
if no_time:
updated = datetime.datetime(
updated.year, updated.month, updated.day, 0, 0, 0, 1, tzinfo=utc)
sync2 = CalendarSync(None, None)
sync2.to_update = list(zip(events_old, events_new))
sync2._filter_events_to_update()
event = {
'summary': 'test event __ {}'.format(i),
'location': 'la la la {}'.format(i),
'description': 'test TEST -- test event {}'.format(i),
"iCalUID": "{}@test.com".format(sha1("test - event {}".format(i))),
"updated": updated.isoformat() + 'Z',
"created": updated.isoformat() + 'Z'
}
event['start'] = {date_key: event_start.isoformat() + suff}
event['end'] = {date_key: event_end.isoformat() + suff}
result.append(event)
return result
self.assertEqual(len(sync1.to_update), count)
self.assertEqual(len(sync2.to_update), 0)
def gen_list_to_compare(start, stop):
result = []
for i in range(start, stop):
result.append({'iCalUID': 'test{:06d}'.format(i)})
return result
def get_start_date(event):
event_start = event['start']
start_date = None
is_date = False
if 'date' in event_start:
start_date = event_start['date']
is_date = True
if 'dateTime' in event_start:
start_date = event_start['dateTime']
if __name__ == '__main__':
unittest.main()
result = dateutil.parser.parse(start_date)
if is_date:
result = datetime.date(result.year, result.month, result.day)
return result
def test_compare():
part_len = 20
# [1..2n]
lst_src = gen_list_to_compare(1, 1 + part_len * 2)
# [n..3n]
lst_dst = gen_list_to_compare(
1 + part_len, 1 + part_len * 3)
lst_src_rnd = deepcopy(lst_src)
lst_dst_rnd = deepcopy(lst_dst)
shuffle(lst_src_rnd)
shuffle(lst_dst_rnd)
to_ins, to_upd, to_del = CalendarSync._events_list_compare(
lst_src_rnd, lst_dst_rnd)
assert len(to_ins) == part_len
assert len(to_upd) == part_len
assert len(to_del) == part_len
assert sorted(to_ins, key=lambda x: x['iCalUID']) == lst_src[:part_len]
assert sorted(to_del, key=lambda x: x['iCalUID']) == lst_dst[part_len:]
to_upd_ok = list(zip(lst_src[part_len:], lst_dst[:part_len]))
assert len(to_upd) == len(to_upd_ok)
for item in to_upd_ok:
assert item in to_upd
def test_filter_events_by_date(no_time=False):
msk = timezone('Europe/Moscow')
now = utc.localize(datetime.datetime.utcnow())
msk_now = msk.normalize(now.astimezone(msk))
part_len = 5
if no_time:
duration = datetime.date(
1, 1, 2) - datetime.date(1, 1, 1)
else:
duration = datetime.datetime(
1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1)
date_cmp = msk_now + (duration * part_len)
if no_time:
date_cmp = datetime.date(
date_cmp.year, date_cmp.month, date_cmp.day)
events = gen_events(
1, 1 + (part_len * 2), msk_now, no_time)
shuffle(events)
events_pending = CalendarSync._filter_events_by_date(
events, date_cmp, operator.ge)
events_past = CalendarSync._filter_events_by_date(
events, date_cmp, operator.lt)
assert len(events_pending) == 1 + part_len
assert len(events_past) == part_len - 1
for event in events_pending:
assert get_start_date(event) >= date_cmp
for event in events_past:
assert get_start_date(event) < date_cmp
def test_filter_events_by_date_no_time():
test_filter_events_by_date(no_time=True)
def test_filter_events_to_update():
msk = timezone('Europe/Moscow')
now = utc.localize(datetime.datetime.utcnow())
msk_now = msk.normalize(now.astimezone(msk))
one_hour = datetime.datetime(
1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1)
date_upd = msk_now + (one_hour * 5)
count = 10
events_old = gen_events(1, 1 + count, msk_now)
events_new = gen_events(1, 1 + count, date_upd)
sync1 = CalendarSync(None, None)
sync1.to_update = list(zip(events_new, events_old))
sync1._filter_events_to_update()
sync2 = CalendarSync(None, None)
sync2.to_update = list(zip(events_old, events_new))
sync2._filter_events_to_update()
assert len(sync1.to_update) == count
assert sync2.to_update == []