1
0
mirror of https://github.com/b4tman/sync_ics2gcal synced 2025-01-21 07:28:24 +00:00

Merge branch 'develop'

This commit is contained in:
Dmitry Belyaev 2018-04-17 23:10:30 +03:00
commit e16a4828e4
Signed by: b4tman
GPG Key ID: 014E87EC54B77673
4 changed files with 226 additions and 263 deletions

View File

@ -77,13 +77,23 @@ class CalendarSync():
""" """
def filter_by_date(event): def filter_by_date(event):
date_cmp = date
event_start = event['start'] event_start = event['start']
event_date = None event_date = None
compare_dates = False
if 'date' in event_start: if 'date' in event_start:
event_date = event_start['date'] event_date = event_start['date']
if 'dateTime' in event_start: compare_dates = True
elif 'dateTime' in event_start:
event_date = event_start['dateTime'] event_date = event_start['dateTime']
return op(dateutil.parser.parse(event_date), date)
event_date = dateutil.parser.parse(event_date)
if compare_dates:
date_cmp = datetime.date(date.year, date.month, date.day)
event_date = datetime.date(event_date.year, event_date.month, event_date.day)
return op(event_date, date_cmp)
return list(filter(filter_by_date, events)) return list(filter(filter_by_date, events))

View File

@ -1,154 +1,93 @@
import unittest import pytest
from gcal_sync import CalendarConverter from gcal_sync import CalendarConverter
ics_empty = """BEGIN:VCALENDAR uid = "UID:uisgtr8tre93wewe0yr8wqy@test.com"
VERSION:2.0 only_start_date = uid + """
PRODID:-//test//test//ES
CALSCALE:GREGORIAN
METHOD:PUBLISH
END:VCALENDAR
"""
ics_empty_event = """BEGIN:VCALENDAR
BEGIN:VEVENT
END:VEVENT
END:VCALENDAR
"""
ics_event_no_end = """BEGIN:VCALENDAR
BEGIN:VEVENT
UID:uisgtr8tre93wewe0yr8wqy@test.com
DTSTART;VALUE=DATE:20180215 DTSTART;VALUE=DATE:20180215
END:VEVENT
END:VCALENDAR
""" """
date_val = only_start_date + """
ics_event_date_val = """BEGIN:VCALENDAR
BEGIN:VEVENT
UID:uisgtr8tre93wewe0yr8wqy@test.com
DTSTART;VALUE=DATE:20180215
DTEND;VALUE=DATE:20180217 DTEND;VALUE=DATE:20180217
END:VEVENT
END:VCALENDAR
""" """
date_duration = only_start_date + """
ics_event_datetime_utc_val = """BEGIN:VCALENDAR DURATION:P2D
BEGIN:VEVENT """
UID:uisgtr8tre93wewe0yr8wqy@test.com datetime_utc_val = uid + """
DTSTART;VALUE=DATE-TIME:20180319T092001Z DTSTART;VALUE=DATE-TIME:20180319T092001Z
DTEND:20180319T102001Z DTEND:20180321T102501Z
END:VEVENT
END:VCALENDAR
""" """
datetime_utc_duration = uid + """
ics_event_date_duration = """BEGIN:VCALENDAR
BEGIN:VEVENT
UID:uisgtr8tre93wewe0yr8wqy@test.com
DTSTART;VALUE=DATE:20180215
DURATION:P3D
END:VEVENT
END:VCALENDAR
"""
ics_event_datetime_utc_duration = """BEGIN:VCALENDAR
BEGIN:VEVENT
UID:uisgtr8tre93wewe0yr8wqy@test.com
DTSTART;VALUE=DATE-TIME:20180319T092001Z DTSTART;VALUE=DATE-TIME:20180319T092001Z
DURATION:P2DT1H5M DURATION:P2DT1H5M
END:VEVENT
END:VCALENDAR
""" """
created_updated = date_val + """
ics_event_created_updated = """BEGIN:VCALENDAR
BEGIN:VEVENT
UID:uisgtr8tre93wewe0yr8wqy@test.com
DTSTART:20180215
DTEND:20180217
CREATED:20180320T071155Z CREATED:20180320T071155Z
LAST-MODIFIED:20180326T120235Z LAST-MODIFIED:20180326T120235Z
END:VEVENT
END:VCALENDAR
""" """
class TestCalendarConverter(unittest.TestCase):
def test_empty_calendar(self): def ics_test_cal(content):
converter = CalendarConverter() return "BEGIN:VCALENDAR\r\n{}END:VCALENDAR\r\n".format(content)
converter.loads(ics_empty)
evnts = converter.events_to_gcal()
self.assertEqual(len(evnts), 0) def ics_test_event(content):
return ics_test_cal("BEGIN:VEVENT\r\n{}END:VEVENT\r\n".format(content))
def test_empty_event(self):
converter = CalendarConverter()
converter.loads(ics_empty_event) def test_empty_calendar():
with self.assertRaises(KeyError): converter = CalendarConverter()
converter.events_to_gcal() converter.loads(ics_test_cal(""))
evnts = converter.events_to_gcal()
def test_event_no_end(self): assert evnts == []
converter = CalendarConverter()
converter.loads(ics_event_no_end)
with self.assertRaises(ValueError): def test_empty_event():
converter = CalendarConverter()
converter.loads(ics_test_event(""))
with pytest.raises(KeyError):
converter.events_to_gcal() converter.events_to_gcal()
def test_event_date_values(self):
converter = CalendarConverter()
converter.loads(ics_event_date_val)
events = converter.events_to_gcal()
self.assertEqual(len(events), 1)
event = events[0]
self.assertEqual(event['start'], {
'date': '2018-02-15'
})
self.assertEqual(event['end'], {
'date': '2018-02-17'
})
def test_event_datetime_utc_values(self): def test_event_no_end():
converter = CalendarConverter() converter = CalendarConverter()
converter.loads(ics_event_datetime_utc_val) converter.loads(ics_test_event(only_start_date))
events = converter.events_to_gcal() with pytest.raises(ValueError):
self.assertEqual(len(events), 1) converter.events_to_gcal()
event = events[0]
self.assertEqual(event['start'], {
'dateTime': '2018-03-19T09:20:01.000001Z'
})
self.assertEqual(event['end'], {
'dateTime': '2018-03-19T10:20:01.000001Z'
})
def test_event_date_duration(self):
converter = CalendarConverter()
converter.loads(ics_event_date_duration)
events = converter.events_to_gcal()
self.assertEqual(len(events), 1)
event = events[0]
self.assertEqual(event['start'], {
'date': '2018-02-15'
})
self.assertEqual(event['end'], {
'date': '2018-02-18'
})
def test_event_datetime_utc_duration(self):
converter = CalendarConverter()
converter.loads(ics_event_datetime_utc_duration)
events = converter.events_to_gcal()
self.assertEqual(len(events), 1)
event = events[0]
self.assertEqual(event['start'], {
'dateTime': '2018-03-19T09:20:01.000001Z'
})
self.assertEqual(event['end'], {
'dateTime': '2018-03-21T10:25:01.000001Z'
})
def test_event_created_updated(self): @pytest.fixture(params=[
converter = CalendarConverter() ("date", ics_test_event(date_val), '2018-02-15', '2018-02-17'),
converter.loads(ics_event_created_updated) ("date", ics_test_event(date_duration), '2018-02-15', '2018-02-17'),
events = converter.events_to_gcal() ("dateTime", ics_test_event(datetime_utc_val),
self.assertEqual(len(events), 1) '2018-03-19T09:20:01.000001Z', '2018-03-21T10:25:01.000001Z'),
event = events[0] ("dateTime", ics_test_event(datetime_utc_duration), '2018-03-19T09:20:01.000001Z', '2018-03-21T10:25:01.000001Z')],
self.assertEqual(event['created'], '2018-03-20T07:11:55.000001Z') ids=['date values', 'date duration',
self.assertEqual(event['updated'], '2018-03-26T12:02:35.000001Z') 'datetime utc values', 'datetime utc duration']
)
def param_events_start_end(request):
return request.param
if __name__ == '__main__':
unittest.main() def test_event_start_end(param_events_start_end):
(date_type, ics_str, start, end) = param_events_start_end
converter = CalendarConverter()
converter.loads(ics_str)
events = converter.events_to_gcal()
assert len(events) == 1
event = events[0]
assert event['start'] == {
date_type: start
}
assert event['end'] == {
date_type: end
}
def test_event_created_updated():
converter = CalendarConverter()
converter.loads(ics_test_event(created_updated))
events = converter.events_to_gcal()
assert len(events) == 1
event = events[0]
assert event['created'] == '2018-03-20T07:11:55.000001Z'
assert event['updated'] == '2018-03-26T12:02:35.000001Z'

View File

@ -1,14 +0,0 @@
import unittest
class TestImports(unittest.TestCase):
def test_imports(self):
from gcal_sync import (
CalendarConverter,
EventConverter,
GoogleCalendarService,
GoogleCalendar,
CalendarSync
)
if __name__ == '__main__':
unittest.main()

View File

@ -1,149 +1,177 @@
import datetime import datetime
import hashlib import hashlib
import operator import operator
import unittest
from copy import deepcopy from copy import deepcopy
from random import shuffle from random import shuffle
import dateutil.parser import dateutil.parser
import pytest
from pytz import timezone, utc from pytz import timezone, utc
from gcal_sync import CalendarSync from gcal_sync import CalendarSync
class TestCalendarSync(unittest.TestCase): def sha1(string):
@staticmethod if isinstance(string, str):
def sha1(string): string = string.encode('utf8')
if isinstance(string, str): h = hashlib.sha1()
string = string.encode('utf8') h.update(string)
h = hashlib.sha1() return h.hexdigest()
h.update(string)
return h.hexdigest()
@staticmethod
def gen_events(start, stop, start_time): def gen_events(start, stop, start_time, no_time=False):
one_hour = datetime.datetime( if no_time:
start_time = datetime.date(
start_time.year, start_time.month, start_time.day)
duration = datetime.date(1, 1, 2) - datetime.date(1, 1, 1)
date_key = "date"
suff = ''
else:
start_time = utc.normalize(
start_time.astimezone(utc)).replace(tzinfo=None)
duration = datetime.datetime(
1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1) 1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1)
result = [] date_key = "dateTime"
for i in range(start, stop): suff = 'Z'
event_start = start_time + (one_hour * i)
event_end = event_start + one_hour
updated = utc.normalize(
event_start.astimezone(utc)).replace(tzinfo=None)
result.append({
'summary': 'test event __ {}'.format(i),
'location': 'la la la {}'.format(i),
'description': 'test TEST -- test event {}'.format(i),
'start': {
'dateTime': event_start.isoformat()
},
'end': {
'dateTime': event_end.isoformat(),
},
"iCalUID": "{}@test.com".format(TestCalendarSync.sha1("test - event {}".format(i))),
"updated": updated.isoformat() + 'Z',
"created": updated.isoformat() + 'Z'
})
return result
@staticmethod result = []
def gen_list_to_compare(start, stop): for i in range(start, stop):
result = [] event_start = start_time + (duration * i)
for i in range(start, stop): event_end = event_start + duration
result.append({'iCalUID': 'test{:06d}'.format(i)})
return result
@staticmethod updated = event_start
def get_start_date(event): if no_time:
event_start = event['start'] updated = datetime.datetime(
start_date = None updated.year, updated.month, updated.day, 0, 0, 0, 1, tzinfo=utc)
if 'date' in event_start:
start_date = event_start['date']
if 'dateTime' in event_start:
start_date = event_start['dateTime']
return dateutil.parser.parse(start_date)
def test_compare(self): event = {
part_len = 20 'summary': 'test event __ {}'.format(i),
# [1..2n] 'location': 'la la la {}'.format(i),
lst_src = TestCalendarSync.gen_list_to_compare(1, 1 + part_len * 2) 'description': 'test TEST -- test event {}'.format(i),
# [n..3n] "iCalUID": "{}@test.com".format(sha1("test - event {}".format(i))),
lst_dst = TestCalendarSync.gen_list_to_compare(1 + part_len, 1 + part_len * 3) "updated": updated.isoformat() + 'Z',
"created": updated.isoformat() + 'Z'
}
event['start'] = {date_key: event_start.isoformat() + suff}
event['end'] = {date_key: event_end.isoformat() + suff}
result.append(event)
return result
lst_src_rnd = deepcopy(lst_src)
lst_dst_rnd = deepcopy(lst_dst)
shuffle(lst_src_rnd) def gen_list_to_compare(start, stop):
shuffle(lst_dst_rnd) result = []
for i in range(start, stop):
result.append({'iCalUID': 'test{:06d}'.format(i)})
return result
to_ins, to_upd, to_del = CalendarSync._events_list_compare(
lst_src_rnd, lst_dst_rnd)
self.assertEqual(len(to_ins), part_len) def get_start_date(event):
self.assertEqual(len(to_upd), part_len) event_start = event['start']
self.assertEqual(len(to_del), part_len) start_date = None
is_date = False
if 'date' in event_start:
start_date = event_start['date']
is_date = True
if 'dateTime' in event_start:
start_date = event_start['dateTime']
self.assertEqual( result = dateutil.parser.parse(start_date)
sorted(to_ins, key=lambda x: x['iCalUID']), lst_src[:part_len]) if is_date:
self.assertEqual( result = datetime.date(result.year, result.month, result.day)
sorted(to_del, key=lambda x: x['iCalUID']), lst_dst[part_len:])
to_upd_ok = list(zip(lst_src[part_len:], lst_dst[:part_len])) return result
self.assertEqual(len(to_upd), len(to_upd_ok))
for item in to_upd_ok:
self.assertIn(item, to_upd)
def test_filter_events_by_date(self):
msk = timezone('Europe/Moscow')
now = utc.localize(datetime.datetime.utcnow())
msk_now = msk.normalize(now.astimezone(msk))
one_hour = datetime.datetime( def test_compare():
part_len = 20
# [1..2n]
lst_src = gen_list_to_compare(1, 1 + part_len * 2)
# [n..3n]
lst_dst = gen_list_to_compare(
1 + part_len, 1 + part_len * 3)
lst_src_rnd = deepcopy(lst_src)
lst_dst_rnd = deepcopy(lst_dst)
shuffle(lst_src_rnd)
shuffle(lst_dst_rnd)
to_ins, to_upd, to_del = CalendarSync._events_list_compare(
lst_src_rnd, lst_dst_rnd)
assert len(to_ins) == part_len
assert len(to_upd) == part_len
assert len(to_del) == part_len
assert sorted(to_ins, key=lambda x: x['iCalUID']) == lst_src[:part_len]
assert sorted(to_del, key=lambda x: x['iCalUID']) == lst_dst[part_len:]
to_upd_ok = list(zip(lst_src[part_len:], lst_dst[:part_len]))
assert len(to_upd) == len(to_upd_ok)
for item in to_upd_ok:
assert item in to_upd
@pytest.mark.parametrize("no_time", [True, False], ids=['date', 'dateTime'])
def test_filter_events_by_date(no_time):
msk = timezone('Europe/Moscow')
now = utc.localize(datetime.datetime.utcnow())
msk_now = msk.normalize(now.astimezone(msk))
part_len = 5
if no_time:
duration = datetime.date(
1, 1, 2) - datetime.date(1, 1, 1)
else:
duration = datetime.datetime(
1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1) 1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1)
date_cmp = msk_now + (one_hour * 5)
events = TestCalendarSync.gen_events(1, 11, msk_now) date_cmp = msk_now + (duration * part_len)
shuffle(events)
events_pending = CalendarSync._filter_events_by_date( if no_time:
events, date_cmp, operator.ge) date_cmp = datetime.date(
events_past = CalendarSync._filter_events_by_date( date_cmp.year, date_cmp.month, date_cmp.day)
events, date_cmp, operator.lt)
self.assertEqual(len(events_pending), 6) events = gen_events(
self.assertEqual(len(events_past), 4) 1, 1 + (part_len * 2), msk_now, no_time)
shuffle(events)
for event in events_pending: events_pending = CalendarSync._filter_events_by_date(
self.assertGreaterEqual( events, date_cmp, operator.ge)
TestCalendarSync.get_start_date(event), date_cmp) events_past = CalendarSync._filter_events_by_date(
events, date_cmp, operator.lt)
for event in events_past: assert len(events_pending) == 1 + part_len
self.assertLess(TestCalendarSync.get_start_date(event), date_cmp) assert len(events_past) == part_len - 1
def test_filter_events_to_update(self): for event in events_pending:
msk = timezone('Europe/Moscow') assert get_start_date(event) >= date_cmp
now = utc.localize(datetime.datetime.utcnow())
msk_now = msk.normalize(now.astimezone(msk))
one_hour = datetime.datetime( for event in events_past:
1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1) assert get_start_date(event) < date_cmp
date_upd = msk_now + (one_hour * 5)
count = 10
events_old = TestCalendarSync.gen_events(1, 1 + count, msk_now)
events_new = TestCalendarSync.gen_events(1, 1 + count, date_upd)
sync1 = CalendarSync(None, None)
sync1.to_update = list(zip(events_new, events_old))
sync1._filter_events_to_update()
sync2 = CalendarSync(None, None)
sync2.to_update = list(zip(events_old, events_new))
sync2._filter_events_to_update()
self.assertEqual(len(sync1.to_update), count)
self.assertEqual(len(sync2.to_update), 0)
if __name__ == '__main__': def test_filter_events_to_update():
unittest.main() msk = timezone('Europe/Moscow')
now = utc.localize(datetime.datetime.utcnow())
msk_now = msk.normalize(now.astimezone(msk))
one_hour = datetime.datetime(
1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1)
date_upd = msk_now + (one_hour * 5)
count = 10
events_old = gen_events(1, 1 + count, msk_now)
events_new = gen_events(1, 1 + count, date_upd)
sync1 = CalendarSync(None, None)
sync1.to_update = list(zip(events_new, events_old))
sync1._filter_events_to_update()
sync2 = CalendarSync(None, None)
sync2.to_update = list(zip(events_old, events_new))
sync2._filter_events_to_update()
assert len(sync1.to_update) == count
assert sync2.to_update == []