196 lines
6.7 KiB
Python
196 lines
6.7 KiB
Python
import datetime
|
|
import logging
|
|
import operator
|
|
from typing import List, Dict, Set, Tuple, Union, Callable
|
|
|
|
import dateutil.parser
|
|
from pytz import utc
|
|
|
|
from .gcal import GoogleCalendar, EventData, EventList, EventTuple
|
|
from .ical import CalendarConverter, DateDateTime
|
|
|
|
|
|
class CalendarSync:
|
|
"""class for syncronize calendar with google
|
|
"""
|
|
|
|
logger = logging.getLogger('CalendarSync')
|
|
|
|
def __init__(self, gcalendar: GoogleCalendar, converter: CalendarConverter):
|
|
self.gcalendar: GoogleCalendar = gcalendar
|
|
self.converter: CalendarConverter = converter
|
|
self.to_insert: EventList = []
|
|
self.to_update: List[EventTuple] = []
|
|
self.to_delete: EventList = []
|
|
|
|
@staticmethod
|
|
def _events_list_compare(items_src: EventList,
|
|
items_dst: EventList,
|
|
key: str = 'iCalUID') \
|
|
-> Tuple[EventList, List[EventTuple], EventList]:
|
|
""" compare list of events by key
|
|
|
|
Arguments:
|
|
items_src {list of dict} -- source events
|
|
items_dst {list of dict} -- dest events
|
|
key {str} -- name of key to compare (default: {'iCalUID'})
|
|
|
|
Returns:
|
|
tuple -- (items_to_insert,
|
|
items_to_update,
|
|
items_to_delete)
|
|
"""
|
|
|
|
def get_key(item: EventData) -> str: return item[key]
|
|
|
|
keys_src: Set[str] = set(map(get_key, items_src))
|
|
keys_dst: Set[str] = set(map(get_key, items_dst))
|
|
|
|
keys_to_insert = keys_src - keys_dst
|
|
keys_to_update = keys_src & keys_dst
|
|
keys_to_delete = keys_dst - keys_src
|
|
|
|
def items_by_keys(items: EventList,
|
|
key_name: str,
|
|
keys: Set[str]) -> EventList:
|
|
return list(filter(lambda item: item[key_name] in keys, items))
|
|
|
|
items_to_insert = items_by_keys(items_src, key, keys_to_insert)
|
|
items_to_delete = items_by_keys(items_dst, key, keys_to_delete)
|
|
|
|
to_upd_src = items_by_keys(items_src, key, keys_to_update)
|
|
to_upd_dst = items_by_keys(items_dst, key, keys_to_update)
|
|
to_upd_src.sort(key=get_key)
|
|
to_upd_dst.sort(key=get_key)
|
|
items_to_update = list(zip(to_upd_src, to_upd_dst))
|
|
|
|
return items_to_insert, items_to_update, items_to_delete
|
|
|
|
def _filter_events_to_update(self):
|
|
""" filter 'to_update' events by 'updated' datetime
|
|
"""
|
|
|
|
def filter_updated(event_tuple: EventTuple) -> bool:
|
|
new, old = event_tuple
|
|
new_date = dateutil.parser.parse(new['updated'])
|
|
old_date = dateutil.parser.parse(old['updated'])
|
|
return new_date > old_date
|
|
|
|
self.to_update = list(filter(filter_updated, self.to_update))
|
|
|
|
@staticmethod
|
|
def _filter_events_by_date(events: EventList,
|
|
date: DateDateTime,
|
|
op: Callable[[DateDateTime,
|
|
DateDateTime], bool]) -> EventList:
|
|
""" filter events by start datetime
|
|
|
|
Arguments:
|
|
events -- events list
|
|
date {datetime} -- datetime to compare
|
|
op {operator} -- comparsion operator
|
|
|
|
Returns:
|
|
list of filtred events
|
|
"""
|
|
|
|
def filter_by_date(event: EventData) -> bool:
|
|
date_cmp = date
|
|
event_start: Dict[str, str] = event['start']
|
|
event_date: Union[DateDateTime, str, None] = None
|
|
compare_dates = False
|
|
|
|
if 'date' in event_start:
|
|
event_date = event_start['date']
|
|
compare_dates = True
|
|
elif 'dateTime' in event_start:
|
|
event_date = event_start['dateTime']
|
|
|
|
event_date = dateutil.parser.parse(event_date)
|
|
if compare_dates:
|
|
date_cmp = datetime.date(date.year, date.month, date.day)
|
|
event_date = datetime.date(
|
|
event_date.year, event_date.month, event_date.day)
|
|
|
|
return op(event_date, date_cmp)
|
|
|
|
return list(filter(filter_by_date, events))
|
|
|
|
@staticmethod
|
|
def _tz_aware_datetime(date: DateDateTime) -> datetime.datetime:
|
|
"""make tz aware datetime from datetime/date (utc if no tzinfo)
|
|
|
|
Arguments:
|
|
date - date or datetime / with or without tzinfo
|
|
|
|
Returns:
|
|
datetime with tzinfo
|
|
"""
|
|
|
|
if not isinstance(date, datetime.datetime):
|
|
date = datetime.datetime(date.year, date.month, date.day)
|
|
if date.tzinfo is None:
|
|
date = date.replace(tzinfo=utc)
|
|
return date
|
|
|
|
def prepare_sync(self, start_date: DateDateTime) -> None:
|
|
"""prepare sync lists by comparsion of events
|
|
|
|
Arguments:
|
|
start_date -- date/datetime to start sync
|
|
"""
|
|
|
|
start_date = CalendarSync._tz_aware_datetime(start_date)
|
|
|
|
events_src = self.converter.events_to_gcal()
|
|
events_dst = self.gcalendar.list_events_from(start_date)
|
|
|
|
# divide source events by start datetime
|
|
events_src_pending = CalendarSync._filter_events_by_date(
|
|
events_src, start_date, operator.ge)
|
|
events_src_past = CalendarSync._filter_events_by_date(
|
|
events_src, start_date, operator.lt)
|
|
|
|
# first events comparsion
|
|
self.to_insert, self.to_update, self.to_delete = CalendarSync._events_list_compare(
|
|
events_src_pending, events_dst)
|
|
|
|
# find in events 'to_delete' past events from source, for update (move to past)
|
|
_, add_to_update, self.to_delete = CalendarSync._events_list_compare(
|
|
events_src_past, self.to_delete)
|
|
self.to_update.extend(add_to_update)
|
|
|
|
# find if events 'to_insert' exists in gcalendar, for update them
|
|
add_to_update, self.to_insert = self.gcalendar.find_exists(
|
|
self.to_insert)
|
|
self.to_update.extend(add_to_update)
|
|
|
|
# exclude outdated events from 'to_update' list, by 'updated' field
|
|
self._filter_events_to_update()
|
|
|
|
self.logger.info(
|
|
'prepared to sync: ( insert: %d, update: %d, delete: %d )',
|
|
len(self.to_insert),
|
|
len(self.to_update),
|
|
len(self.to_delete)
|
|
)
|
|
|
|
def clear(self) -> None:
|
|
""" clear prepared sync lists (insert, update, delete)
|
|
"""
|
|
self.to_insert.clear()
|
|
self.to_update.clear()
|
|
self.to_delete.clear()
|
|
|
|
def apply(self) -> None:
|
|
""" apply sync (insert, update, delete), using prepared lists of events
|
|
"""
|
|
|
|
self.gcalendar.insert_events(self.to_insert)
|
|
self.gcalendar.update_events(self.to_update)
|
|
self.gcalendar.delete_events(self.to_delete)
|
|
|
|
self.clear()
|
|
|
|
self.logger.info('sync done')
|