1
0
mirror of https://github.com/b4tman/sync_ics2gcal synced 2026-02-04 07:14:59 +00:00

5 Commits

Author SHA1 Message Date
d2d43d02da Merge branch 'converter/pydantic' of https://github.com/b4tman/sync_ics2gcal into converter/pydantic 2021-10-21 09:58:14 +03:00
2e114db5c9 + bench_converter 2021-10-21 09:57:17 +03:00
15951ba200 try pydantic for converter
---
x2 slower:
before: best: _82001700 ns,     avg: _85379966.2 ns,    median: _84408700.0 ns
after__: best: 162860900 ns,     avg: 175015097.0 ns,    median: 171212750.0 ns
2021-10-17 17:10:44 +03:00
d03e5691ee add pydantic 2021-10-17 17:07:36 +03:00
3686bc29ee + bench_converter 2021-10-15 14:53:20 +03:00
17 changed files with 1330 additions and 1305 deletions

View File

@@ -16,7 +16,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v3
uses: actions/checkout@v2
with:
# We must fetch at least the immediate parents so that if this is
# a pull request then we can checkout the head.

View File

@@ -17,16 +17,21 @@ jobs:
strategy:
max-parallel: 4
matrix:
python-version: ['3.7', '3.8', '3.9', '3.10']
python-version: [3.6, 3.7, 3.8, 3.9]
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Upgrade pip
run: python -m pip install --upgrade pip
- name: Load cached Poetry installation
uses: actions/cache@v2
with:
path: ~/.local
key: poetry-0
- name: Install Poetry
uses: snok/install-poetry@v1
- name: Install deps

View File

@@ -8,13 +8,18 @@ jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.x'
- name: Upgrade pip
run: python -m pip install --upgrade pip
- name: Load cached Poetry installation
uses: actions/cache@v2
with:
path: ~/.local
key: poetry-0
- name: Install Poetry
uses: snok/install-poetry@v1
- name: Install deps

21
.travis.yml Normal file
View File

@@ -0,0 +1,21 @@
language: python
os: linux
dist: focal
python:
- "3.6"
- "3.7"
- "3.8"
- "3.9"
before_install:
- pip install poetry
install:
- poetry install
script:
# stop the build if there are Python syntax errors or undefined names
- poetry run flake8 sync_ics2gcal --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
- poetry run flake8 sync_ics2gcal --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
# run tests
- poetry run pytest -v

View File

@@ -1,6 +1,7 @@
# sync_ics2gcal
[![PyPI version](https://badge.fury.io/py/sync-ics2gcal.svg)](https://badge.fury.io/py/sync-ics2gcal)
[![Build Status](https://travis-ci.org/b4tman/sync_ics2gcal.svg?branch=master)](https://travis-ci.org/b4tman/sync_ics2gcal)
![Python package status](https://github.com/b4tman/sync_ics2gcal/workflows/Python%20package/badge.svg)
Python scripts for sync .ics file with Google calendar

1599
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "sync_ics2gcal"
version = "0.1.4"
version = "0.1.3"
description = "Sync ics file with Google calendar"
authors = ["Dmitry Belyaev <b4tm4n@mail.ru>"]
license = "MIT"
@@ -11,6 +11,7 @@ keywords = ["icalendar", "sync", "google", "calendar"]
classifiers = [
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
@@ -18,18 +19,19 @@ classifiers = [
]
[tool.poetry.dependencies]
python = "^3.7"
google-auth = "2.6.6"
google-api-python-client = "2.49.0"
icalendar = "4.0.9"
pytz = "2022.1"
PyYAML = "6.0"
python = ">=3.6.1"
google-auth = "2.2.1"
google-api-python-client = "2.23.0"
icalendar = "4.0.7"
pytz = "2021.1"
PyYAML = "5.4.1"
fire = "0.4.0"
pydantic = "^1.8.2"
[tool.poetry.dev-dependencies]
pytest = "^7.1.2"
flake8 = "^4.0.1"
black = "^22.3.0"
pytest = "^6.2.5"
flake8 = "^3.9.2"
autopep8 = "^1.5.7"
[tool.poetry.scripts]
sync-ics2gcal = "sync_ics2gcal.sync_calendar:main"

6
requirements.txt Normal file
View File

@@ -0,0 +1,6 @@
google-auth==2.2.1
google-api-python-client==2.23.0
icalendar==4.0.7
pytz==2021.1
PyYAML==5.4.1
fire==0.4.0

View File

@@ -1,11 +1,18 @@
from .ical import CalendarConverter, EventConverter, DateDateTime
from .ical import (
CalendarConverter,
EventConverter,
DateDateTime
)
from .gcal import (
GoogleCalendarService,
GoogleCalendar,
EventData,
EventList,
EventTuple,
EventTuple
)
from .sync import CalendarSync
from .sync import (
CalendarSync
)

View File

@@ -7,7 +7,7 @@ from google.oauth2 import service_account
from googleapiclient import discovery
from pytz import utc
EventData = Dict[str, Union[str, "EventData", None]]
EventData = Dict[str, Union[str, 'EventData', None]]
EventList = List[EventData]
EventTuple = Tuple[EventData, EventData]
@@ -26,25 +26,24 @@ class GoogleCalendarService:
( https://googleapis.dev/python/google-auth/latest/reference/google.auth.html#google.auth.default )
"""
scopes = ["https://www.googleapis.com/auth/calendar"]
scopes = ['https://www.googleapis.com/auth/calendar']
credentials, _ = google.auth.default(scopes=scopes)
service = discovery.build(
"calendar", "v3", credentials=credentials, cache_discovery=False
)
'calendar', 'v3', credentials=credentials, cache_discovery=False)
return service
@staticmethod
def from_srv_acc_file(service_account_file: str):
"""make service Resource from service account filename (authorize)"""
"""make service Resource from service account filename (authorize)
"""
scopes = ["https://www.googleapis.com/auth/calendar"]
scopes = ['https://www.googleapis.com/auth/calendar']
credentials = service_account.Credentials.from_service_account_file(
service_account_file
)
service_account_file)
scoped_credentials = credentials.with_scopes(scopes)
service = discovery.build(
"calendar", "v3", credentials=scoped_credentials, cache_discovery=False
)
'calendar', 'v3', credentials=scoped_credentials,
cache_discovery=False)
return service
@staticmethod
@@ -59,8 +58,9 @@ class GoogleCalendarService:
-- None: default credentials will be used
"""
if config is not None and "service_account" in config:
service = GoogleCalendarService.from_srv_acc_file(config["service_account"])
if config is not None and 'service_account' in config:
service = GoogleCalendarService.from_srv_acc_file(
config['service_account'])
else:
service = GoogleCalendarService.default()
return service
@@ -77,21 +77,22 @@ def select_event_key(event: EventData) -> Optional[str]:
"""
key = None
if "iCalUID" in event:
key = "iCalUID"
elif "id" in event:
key = "id"
if 'iCalUID' in event:
key = 'iCalUID'
elif 'id' in event:
key = 'id'
return key
class GoogleCalendar:
"""class to interact with calendar on Google"""
"""class to interact with calendar on google
"""
logger = logging.getLogger("GoogleCalendar")
logger = logging.getLogger('GoogleCalendar')
def __init__(self, service: discovery.Resource, calendar_id: Optional[str]):
def __init__(self, service: discovery.Resource, calendarId: Optional[str]):
self.service: discovery.Resource = service
self.calendar_id: str = calendar_id
self.calendarId: str = calendarId
def _make_request_callback(self, action: str, events_by_req: EventList) -> Callable:
"""make callback for log result of batch request
@@ -110,51 +111,43 @@ class GoogleCalendar:
if exception is not None:
self.logger.error(
"failed to %s event with %s: %s, exception: %s",
action,
key,
event.get(key),
str(exception),
'failed to %s event with %s: %s, exception: %s',
action, key, event.get(key), str(exception)
)
else:
resp_key = select_event_key(response)
if resp_key is not None:
event = response
key = resp_key
self.logger.info("event %s ok, %s: %s", action, key, event.get(key))
self.logger.info('event %s ok, %s: %s',
action, key, event.get(key))
return callback
def list_events_from(self, start: datetime) -> EventList:
"""list events from calendar, where start date >= start"""
fields = "nextPageToken,items(id,iCalUID,updated)"
""" list events from calendar, where start date >= start
"""
fields = 'nextPageToken,items(id,iCalUID,updated)'
events = []
page_token = None
time_min = (
utc.normalize(start.astimezone(utc)).replace(tzinfo=None).isoformat() + "Z"
)
timeMin = utc.normalize(start.astimezone(utc)).replace(
tzinfo=None).isoformat() + 'Z'
while True:
response = (
self.service.events()
.list(
calendarId=self.calendar_id,
pageToken=page_token,
singleEvents=True,
timeMin=time_min,
fields=fields,
)
.execute()
)
if "items" in response:
events.extend(response["items"])
page_token = response.get("nextPageToken")
response = self.service.events().list(calendarId=self.calendarId,
pageToken=page_token,
singleEvents=True,
timeMin=timeMin,
fields=fields).execute()
if 'items' in response:
events.extend(response['items'])
page_token = response.get('nextPageToken')
if not page_token:
break
self.logger.info("%d events listed", len(events))
self.logger.info('%d events listed', len(events))
return events
def find_exists(self, events: List) -> Tuple[List[EventTuple], EventList]:
"""find existing events from list, by 'iCalUID' field
""" find existing events from list, by 'iCalUID' field
Arguments:
events {list} -- list of events
@@ -164,7 +157,7 @@ class GoogleCalendar:
events_exist - list of tuples: (new_event, exists_event)
"""
fields = "items(id,iCalUID,updated)"
fields = 'items(id,iCalUID,updated)'
events_by_req = []
exists = []
not_found = []
@@ -173,15 +166,14 @@ class GoogleCalendar:
found = False
cur_event = events_by_req[int(request_id)]
if exception is None:
found = [] != response["items"]
found = ([] != response['items'])
else:
self.logger.error(
"exception %s, while listing event with UID: %s",
str(exception),
cur_event["iCalUID"],
)
'exception %s, while listing event with UID: %s',
str(exception), cur_event['iCalUID'])
if found:
exists.append((cur_event, response["items"][0]))
exists.append(
(cur_event, response['items'][0]))
else:
not_found.append(events_by_req[int(request_id)])
@@ -189,102 +181,89 @@ class GoogleCalendar:
i = 0
for event in events:
events_by_req.append(event)
batch.add(
self.service.events().list(
calendarId=self.calendar_id,
iCalUID=event["iCalUID"],
showDeleted=True,
fields=fields,
),
request_id=str(i),
)
batch.add(self.service.events().list(calendarId=self.calendarId,
iCalUID=event['iCalUID'],
showDeleted=True,
fields=fields
),
request_id=str(i)
)
i += 1
batch.execute()
self.logger.info("%d events exists, %d not found", len(exists), len(not_found))
self.logger.info('%d events exists, %d not found',
len(exists), len(not_found))
return exists, not_found
def insert_events(self, events: EventList):
"""insert list of events
""" insert list of events
Arguments:
events - events list
"""
fields = "id"
fields = 'id'
events_by_req = []
insert_callback = self._make_request_callback("insert", events_by_req)
insert_callback = self._make_request_callback('insert', events_by_req)
batch = self.service.new_batch_http_request(callback=insert_callback)
i = 0
for event in events:
events_by_req.append(event)
batch.add(
self.service.events().insert(
calendarId=self.calendar_id, body=event, fields=fields
),
request_id=str(i),
batch.add(self.service.events().insert(
calendarId=self.calendarId, body=event, fields=fields),
request_id=str(i)
)
i += 1
batch.execute()
def patch_events(self, event_tuples: List[EventTuple]):
"""patch (update) events
""" patch (update) events
Arguments:
event_tuples -- list of tuples: (new_event, exists_event)
"""
fields = "id"
fields = 'id'
events_by_req = []
patch_callback = self._make_request_callback("patch", events_by_req)
patch_callback = self._make_request_callback('patch', events_by_req)
batch = self.service.new_batch_http_request(callback=patch_callback)
i = 0
for event_new, event_old in event_tuples:
if "id" not in event_old:
if 'id' not in event_old:
continue
events_by_req.append(event_new)
batch.add(
self.service.events().patch(
calendarId=self.calendar_id, eventId=event_old["id"], body=event_new
),
fields=fields,
request_id=str(i),
)
batch.add(self.service.events().patch(
calendarId=self.calendarId, eventId=event_old['id'],
body=event_new), fields=fields, request_id=str(i))
i += 1
batch.execute()
def update_events(self, event_tuples: List[EventTuple]):
"""update events
""" update events
Arguments:
event_tuples -- list of tuples: (new_event, exists_event)
"""
fields = "id"
fields = 'id'
events_by_req = []
update_callback = self._make_request_callback("update", events_by_req)
update_callback = self._make_request_callback('update', events_by_req)
batch = self.service.new_batch_http_request(callback=update_callback)
i = 0
for event_new, event_old in event_tuples:
if "id" not in event_old:
if 'id' not in event_old:
continue
events_by_req.append(event_new)
batch.add(
self.service.events().update(
calendarId=self.calendar_id,
eventId=event_old["id"],
body=event_new,
fields=fields,
),
request_id=str(i),
)
batch.add(self.service.events().update(
calendarId=self.calendarId, eventId=event_old['id'],
body=event_new, fields=fields), request_id=str(i))
i += 1
batch.execute()
def delete_events(self, events: EventList):
"""delete events
""" delete events
Arguments:
events -- list of events
@@ -292,21 +271,18 @@ class GoogleCalendar:
events_by_req = []
delete_callback = self._make_request_callback("delete", events_by_req)
delete_callback = self._make_request_callback('delete', events_by_req)
batch = self.service.new_batch_http_request(callback=delete_callback)
i = 0
for event in events:
events_by_req.append(event)
batch.add(
self.service.events().delete(
calendarId=self.calendar_id, eventId=event["id"]
),
request_id=str(i),
)
batch.add(self.service.events().delete(
calendarId=self.calendarId,
eventId=event['id']), request_id=str(i))
i += 1
batch.execute()
def create(self, summary: str, time_zone: Optional[str] = None) -> Any:
def create(self, summary: str, timeZone: Optional[str] = None) -> Any:
"""create calendar
Arguments:
@@ -319,33 +295,36 @@ class GoogleCalendar:
calendar Resource
"""
calendar = {"summary": summary}
if time_zone is not None:
calendar["timeZone"] = time_zone
calendar = {'summary': summary}
if timeZone is not None:
calendar['timeZone'] = timeZone
created_calendar = self.service.calendars().insert(body=calendar).execute()
self.calendar_id = created_calendar["id"]
created_calendar = self.service.calendars().insert(
body=calendar
).execute()
self.calendarId = created_calendar['id']
return created_calendar
def delete(self):
"""delete calendar"""
"""delete calendar
"""
self.service.calendars().delete(calendarId=self.calendar_id).execute()
self.service.calendars().delete(calendarId=self.calendarId).execute()
def make_public(self):
"""make calendar public"""
"""make calendar puplic
"""
rule_public = {
"scope": {
"type": "default",
'scope': {
'type': 'default',
},
"role": "reader",
'role': 'reader'
}
return (
self.service.acl()
.insert(calendarId=self.calendar_id, body=rule_public)
.execute()
)
return self.service.acl().insert(
calendarId=self.calendarId,
body=rule_public
).execute()
def add_owner(self, email: str):
"""add calendar owner by email
@@ -355,14 +334,13 @@ class GoogleCalendar:
"""
rule_owner = {
"scope": {
"type": "user",
"value": email,
'scope': {
'type': 'user',
'value': email,
},
"role": "owner",
'role': 'owner'
}
return (
self.service.acl()
.insert(calendarId=self.calendar_id, body=rule_owner)
.execute()
)
return self.service.acl().insert(
calendarId=self.calendarId,
body=rule_owner
).execute()

View File

@@ -5,11 +5,56 @@ from typing import Union, Dict, Callable, Optional
from icalendar import Calendar, Event
from pytz import utc
import pydantic
from .gcal import EventData, EventList
DateDateTime = Union[datetime.date, datetime.datetime]
class GCal_DateDateTime(pydantic.BaseModel):
date: Optional[str] = pydantic.Field(default=None)
date_time: Optional[str] = pydantic.Field(alias='dateTime', default=None)
timezone: Optional[str] = pydantic.Field(alias='timeZone', default=None)
@pydantic.root_validator(allow_reuse=True)
def check_only_date_or_datetime(cls, values):
date = values.get('date', None)
date_time = values.get('date_time', None)
assert (date is None) != (date_time is None), \
'only date or date_time must be provided'
return values
@classmethod
def create_from(cls, value: DateDateTime) -> 'GCal_DateDateTime':
key: str = 'date'
str_value: str = ''
if type(value) is datetime.datetime:
key = 'date_time'
str_value = format_datetime_utc(value)
else:
str_value = value.isoformat()
return cls(**{key: str_value})
class Config:
allow_population_by_field_name = True
class GCal_Event(pydantic.BaseModel):
created: Optional[str] = None
updated: Optional[str] = None
summary: Optional[str] = None
description: Optional[str] = None
location: Optional[str] = None
start: GCal_DateDateTime
end: GCal_DateDateTime
transparency: Optional[str] = None
ical_uid: str = pydantic.Field(alias='iCalUID')
class Config:
allow_population_by_field_name = True
def format_datetime_utc(value: DateDateTime) -> str:
"""utc datetime as string from date or datetime value
@@ -19,17 +64,20 @@ def format_datetime_utc(value: DateDateTime) -> str:
Returns:
utc datetime value as string in iso format
"""
if not isinstance(value, datetime.datetime):
value = datetime.datetime(value.year, value.month, value.day, tzinfo=utc)
if type(value) is datetime.date:
value = datetime.datetime(
value.year, value.month, value.day, tzinfo=utc)
value = value.replace(microsecond=1)
return utc.normalize(value.astimezone(utc)).replace(tzinfo=None).isoformat() + "Z"
return utc.normalize(
value.astimezone(utc)
).replace(tzinfo=None).isoformat() + 'Z'
def gcal_date_or_datetime(
value: DateDateTime, check_value: Optional[DateDateTime] = None
) -> Dict[str, str]:
"""date or datetime to gcal (start or end dict)
def gcal_date_or_dateTime(value: DateDateTime,
check_value: Optional[DateDateTime] = None) \
-> Dict[str, str]:
"""date or dateTime to gcal (start or end dict)
Arguments:
value: date or datetime
@@ -44,12 +92,12 @@ def gcal_date_or_datetime(
result: Dict[str, str] = {}
if isinstance(check_value, datetime.datetime):
result["dateTime"] = format_datetime_utc(value)
result['dateTime'] = format_datetime_utc(value)
else:
if isinstance(check_value, datetime.date):
if isinstance(value, datetime.datetime):
value = datetime.date(value.year, value.month, value.day)
result["date"] = value.isoformat()
result['date'] = value.isoformat()
return result
@@ -62,13 +110,13 @@ class EventConverter(Event):
"""decoded string property
Arguments:
prop - property name
prop - propperty name
Returns:
string value
"""
return self.decoded(prop).decode(encoding="utf-8")
return self.decoded(prop).decode(encoding='utf-8')
def _datetime_str_prop(self, prop: str) -> str:
"""utc datetime as string from property
@@ -82,8 +130,8 @@ class EventConverter(Event):
return format_datetime_utc(self.decoded(prop))
def _gcal_start(self) -> Dict[str, str]:
"""event start dict from icalendar event
def _gcal_start(self) -> GCal_DateDateTime:
""" event start dict from icalendar event
Raises:
ValueError -- if DTSTART not date or datetime
@@ -92,10 +140,10 @@ class EventConverter(Event):
dict
"""
value = self.decoded("DTSTART")
return gcal_date_or_datetime(value)
value = self.decoded('DTSTART')
return GCal_DateDateTime.create_from(value)
def _gcal_end(self) -> Dict[str, str]:
def _gcal_end(self) -> GCal_DateDateTime:
"""event end dict from icalendar event
Raises:
@@ -104,31 +152,31 @@ class EventConverter(Event):
dict
"""
result: Dict[str, str]
if "DTEND" in self:
value = self.decoded("DTEND")
result = gcal_date_or_datetime(value)
elif "DURATION" in self:
start_val = self.decoded("DTSTART")
duration = self.decoded("DURATION")
result = None
if 'DTEND' in self:
value = self.decoded('DTEND')
result = GCal_DateDateTime.create_from(value)
elif 'DURATION' in self:
start_val = self.decoded('DTSTART')
duration = self.decoded('DURATION')
end_val = start_val + duration
if type(start_val) is datetime.date:
if type(end_val) is datetime.datetime:
end_val = datetime.date(
end_val.year, end_val.month, end_val.day)
result = gcal_date_or_datetime(end_val, check_value=start_val)
result = GCal_DateDateTime.create_from(end_val)
else:
raise ValueError("no DTEND or DURATION")
raise ValueError('no DTEND or DURATION')
return result
def _put_to_gcal(
self,
gcal_event: EventData,
prop: str,
func: Callable[[str], str],
ics_prop: Optional[str] = None,
):
"""get property from ical event if existed, and put to gcal event
def _put_to_gcal(self, gcal_event: EventData,
prop: str, func: Callable[[str], str],
ics_prop: Optional[str] = None):
"""get property from ical event if exist, and put to gcal event
Arguments:
gcal_event -- destination event
gcal_event -- dest event
prop -- property name
func -- function to convert
ics_prop -- ical property name (default: {None})
@@ -139,6 +187,18 @@ class EventConverter(Event):
if ics_prop in self:
gcal_event[prop] = func(ics_prop)
def _get_prop(self, prop: str, func: Callable[[str], str]):
"""get property from ical event if exist else None
Arguments:
prop -- property name
func -- function to convert
"""
if prop not in self:
return None
return func(prop)
def to_gcal(self) -> EventData:
"""Convert
@@ -146,48 +206,50 @@ class EventConverter(Event):
dict - google calendar#event resource
"""
event = {
"iCalUID": self._str_prop("UID"),
"start": self._gcal_start(),
"end": self._gcal_end(),
kwargs = {
'ical_uid': self._str_prop('UID'),
'start': self._gcal_start(),
'end': self._gcal_end(),
'summary': self._get_prop('SUMMARY', self._str_prop),
'description': self._get_prop('DESCRIPTION', self._str_prop),
'location': self._get_prop('LOCATION', self._str_prop),
'created': self._get_prop('CREATED', self._datetime_str_prop),
'updated': self._get_prop('LAST-MODIFIED', self._datetime_str_prop),
'transparency': self._get_prop('TRANSP', lambda prop: self._str_prop(prop).lower()),
}
self._put_to_gcal(event, "summary", self._str_prop)
self._put_to_gcal(event, "description", self._str_prop)
self._put_to_gcal(event, "location", self._str_prop)
self._put_to_gcal(event, "created", self._datetime_str_prop)
self._put_to_gcal(event, "updated", self._datetime_str_prop, "LAST-MODIFIED")
self._put_to_gcal(
event, "transparency", lambda prop: self._str_prop(prop).lower(), "TRANSP"
)
return event
return GCal_Event(**kwargs).dict(by_alias=True, exclude_defaults=True)
class CalendarConverter:
"""Convert icalendar events to google calendar resources"""
"""Convert icalendar events to google calendar resources
"""
logger = logging.getLogger("CalendarConverter")
logger = logging.getLogger('CalendarConverter')
def __init__(self, calendar: Optional[Calendar] = None):
self.calendar: Optional[Calendar] = calendar
def load(self, filename: str):
"""load calendar from ics file"""
with open(filename, "r", encoding="utf-8") as f:
""" load calendar from ics file
"""
with open(filename, 'r', encoding='utf-8') as f:
self.calendar = Calendar.from_ical(f.read())
self.logger.info("%s loaded", filename)
self.logger.info('%s loaded', filename)
def loads(self, string: str):
"""load calendar from ics string"""
""" load calendar from ics string
"""
self.calendar = Calendar.from_ical(string)
def events_to_gcal(self) -> EventList:
"""Convert events to google calendar resources"""
"""Convert events to google calendar resources
"""
ics_events = self.calendar.walk(name="VEVENT")
self.logger.info("%d events read", len(ics_events))
ics_events = self.calendar.walk(name='VEVENT')
self.logger.info('%d events readed', len(ics_events))
result = list(map(lambda event: EventConverter(event).to_gcal(), ics_events))
self.logger.info("%d events converted", len(result))
result = list(
map(lambda event: EventConverter(event).to_gcal(), ics_events))
self.logger.info('%d events converted', len(result))
return result

View File

@@ -10,7 +10,7 @@ from . import GoogleCalendar, GoogleCalendarService
def load_config(filename: str) -> Optional[Dict[str, Any]]:
result = None
try:
with open(filename, "r", encoding="utf-8") as f:
with open(filename, 'r', encoding='utf-8') as f:
result = yaml.safe_load(f)
except FileNotFoundError:
pass
@@ -19,27 +19,24 @@ def load_config(filename: str) -> Optional[Dict[str, Any]]:
class PropertyCommands:
"""get/set google calendar properties"""
""" get/set google calendar properties """
def __init__(self, _service):
self._service = _service
def get(self, calendar_id: str, property_name: str) -> None:
"""get calendar property
""" get calendar property
Args:
calendar_id: calendar id
property_name: property key
"""
response = (
self._service.calendarList()
.get(calendarId=calendar_id, fields=property_name)
.execute()
)
response = self._service.calendarList().get(calendarId=calendar_id,
fields=property_name).execute()
print(response.get(property_name))
def set(self, calendar_id: str, property_name: str, property_value: str) -> None:
"""set calendar property
""" set calendar property
Args:
calendar_id: calendar id
@@ -47,60 +44,53 @@ class PropertyCommands:
property_value: property value
"""
body = {property_name: property_value}
response = (
self._service.calendarList()
.patch(body=body, calendarId=calendar_id)
.execute()
)
response = self._service.calendarList().patch(body=body, calendarId=calendar_id).execute()
print(response)
class Commands:
"""manage google calendars in service account"""
""" manage google calendars in service account """
def __init__(self, config: str = "config.yml"):
def __init__(self, config: str = 'config.yml'):
"""
Args:
config(str): config filename
"""
self._config: Optional[Dict[str, Any]] = load_config(config)
if self._config is not None and "logging" in self._config:
logging.config.dictConfig(self._config["logging"])
if self._config is not None and 'logging' in self._config:
logging.config.dictConfig(self._config['logging'])
self._service = GoogleCalendarService.from_config(self._config)
self.property = PropertyCommands(self._service)
def list(self, show_hidden: bool = False, show_deleted: bool = False) -> None:
"""list calendars
""" list calendars
Args:
show_hidden: show hidden calendars
show_deleted: show deleted calendars
"""
fields: str = "nextPageToken,items(id,summary)"
fields: str = 'nextPageToken,items(id,summary)'
calendars: List[Dict[str, Any]] = []
page_token: Optional[str] = None
while True:
calendars_api = self._service.calendarList()
response = calendars_api.list(
fields=fields,
pageToken=page_token,
showHidden=show_hidden,
showDeleted=show_deleted,
).execute()
if "items" in response:
calendars.extend(response["items"])
page_token = response.get("nextPageToken")
response = calendars_api.list(fields=fields,
pageToken=page_token,
showHidden=show_hidden,
showDeleted=show_deleted
).execute()
if 'items' in response:
calendars.extend(response['items'])
page_token = response.get('nextPageToken')
if page_token is None:
break
for calendar in calendars:
print("{summary}: {id}".format_map(calendar))
print('{summary}: {id}'.format_map(calendar))
def create(
self, summary: str, timezone: Optional[str] = None, public: bool = False
) -> None:
"""create calendar
def create(self, summary: str, timezone: Optional[str] = None, public: bool = False) -> None:
""" create calendar
Args:
summary: new calendar summary
@@ -111,10 +101,10 @@ class Commands:
calendar.create(summary, timezone)
if public:
calendar.make_public()
print("{}: {}".format(summary, calendar.calendar_id))
print('{}: {}'.format(summary, calendar.calendarId))
def add_owner(self, calendar_id: str, email: str) -> None:
"""add owner to calendar
""" add owner to calendar
Args:
calendar_id: calendar id
@@ -122,33 +112,33 @@ class Commands:
"""
calendar = GoogleCalendar(self._service, calendar_id)
calendar.add_owner(email)
print("to {} added owner: {}".format(calendar_id, email))
print('to {} added owner: {}'.format(calendar_id, email))
def remove(self, calendar_id: str) -> None:
"""remove calendar
""" remove calendar
Args:
calendar_id: calendar id
"""
calendar = GoogleCalendar(self._service, calendar_id)
calendar.delete()
print("removed: {}".format(calendar_id))
print('removed: {}'.format(calendar_id))
def rename(self, calendar_id: str, summary: str) -> None:
"""rename calendar
""" rename calendar
Args:
calendar_id: calendar id
summary:
"""
calendar = {"summary": summary}
calendar = {'summary': summary}
self._service.calendars().patch(body=calendar, calendarId=calendar_id).execute()
print("{}: {}".format(summary, calendar_id))
print('{}: {}'.format(summary, calendar_id))
def main():
fire.Fire(Commands, name="manage-ics2gcal")
fire.Fire(Commands, name='manage-ics2gcal')
if __name__ == "__main__":
if __name__ == '__main__':
main()

View File

@@ -11,9 +11,10 @@ from .ical import CalendarConverter, DateDateTime
class CalendarSync:
"""class for synchronize calendar with Google"""
"""class for syncronize calendar with google
"""
logger = logging.getLogger("CalendarSync")
logger = logging.getLogger('CalendarSync')
def __init__(self, gcalendar: GoogleCalendar, converter: CalendarConverter):
self.gcalendar: GoogleCalendar = gcalendar
@@ -23,14 +24,15 @@ class CalendarSync:
self.to_delete: EventList = []
@staticmethod
def _events_list_compare(
items_src: EventList, items_dst: EventList, key: str = "iCalUID"
) -> Tuple[EventList, List[EventTuple], EventList]:
"""compare list of events by key
def _events_list_compare(items_src: EventList,
items_dst: EventList,
key: str = 'iCalUID') \
-> Tuple[EventList, List[EventTuple], EventList]:
""" compare list of events by key
Arguments:
items_src {list of dict} -- source events
items_dst {list of dict} -- destination events
items_dst {list of dict} -- dest events
key {str} -- name of key to compare (default: {'iCalUID'})
Returns:
@@ -39,8 +41,7 @@ class CalendarSync:
items_to_delete)
"""
def get_key(item: EventData) -> str:
return item[key]
def get_key(item: EventData) -> str: return item[key]
keys_src: Set[str] = set(map(get_key, items_src))
keys_dst: Set[str] = set(map(get_key, items_dst))
@@ -49,7 +50,9 @@ class CalendarSync:
keys_to_update = keys_src & keys_dst
keys_to_delete = keys_dst - keys_src
def items_by_keys(items: EventList, key_name: str, keys: Set[str]) -> EventList:
def items_by_keys(items: EventList,
key_name: str,
keys: Set[str]) -> EventList:
return list(filter(lambda item: item[key_name] in keys, items))
items_to_insert = items_by_keys(items_src, key, keys_to_insert)
@@ -64,53 +67,50 @@ class CalendarSync:
return items_to_insert, items_to_update, items_to_delete
def _filter_events_to_update(self):
"""filter 'to_update' events by 'updated' datetime"""
""" filter 'to_update' events by 'updated' datetime
"""
def filter_updated(event_tuple: EventTuple) -> bool:
new, old = event_tuple
if "updated" not in new or "updated" not in old:
return True
new_date = dateutil.parser.parse(new["updated"])
old_date = dateutil.parser.parse(old["updated"])
new_date = dateutil.parser.parse(new['updated'])
old_date = dateutil.parser.parse(old['updated'])
return new_date > old_date
self.to_update = list(filter(filter_updated, self.to_update))
@staticmethod
def _filter_events_by_date(
events: EventList,
date: DateDateTime,
op: Callable[[DateDateTime, DateDateTime], bool],
) -> EventList:
"""filter events by start datetime
def _filter_events_by_date(events: EventList,
date: DateDateTime,
op: Callable[[DateDateTime,
DateDateTime], bool]) -> EventList:
""" filter events by start datetime
Arguments:
events -- events list
date {datetime} -- datetime to compare
op {operator} -- comparison operator
op {operator} -- comparsion operator
Returns:
list of filtered events
list of filtred events
"""
def filter_by_date(event: EventData) -> bool:
date_cmp = date
event_start: Dict[str, str] = event["start"]
event_start: Dict[str, str] = event['start']
event_date: Union[DateDateTime, str, None] = None
compare_dates = False
if "date" in event_start:
event_date = event_start["date"]
if 'date' in event_start:
event_date = event_start['date']
compare_dates = True
elif "dateTime" in event_start:
event_date = event_start["dateTime"]
elif 'dateTime' in event_start:
event_date = event_start['dateTime']
event_date = dateutil.parser.parse(event_date)
if compare_dates:
date_cmp = datetime.date(date.year, date.month, date.day)
event_date = datetime.date(
event_date.year, event_date.month, event_date.day
)
event_date.year, event_date.month, event_date.day)
return op(event_date, date_cmp)
@@ -118,13 +118,13 @@ class CalendarSync:
@staticmethod
def _tz_aware_datetime(date: DateDateTime) -> datetime.datetime:
"""make tz aware datetime from datetime/date (utc if no tz-info)
"""make tz aware datetime from datetime/date (utc if no tzinfo)
Arguments:
date - date or datetime / with or without tz-info
date - date or datetime / with or without tzinfo
Returns:
datetime with tz-info
datetime with tzinfo
"""
if not isinstance(date, datetime.datetime):
@@ -134,7 +134,7 @@ class CalendarSync:
return date
def prepare_sync(self, start_date: DateDateTime) -> None:
"""prepare sync lists by comparison of events
"""prepare sync lists by comparsion of events
Arguments:
start_date -- date/datetime to start sync
@@ -147,47 +147,44 @@ class CalendarSync:
# divide source events by start datetime
events_src_pending = CalendarSync._filter_events_by_date(
events_src, start_date, operator.ge
)
events_src, start_date, operator.ge)
events_src_past = CalendarSync._filter_events_by_date(
events_src, start_date, operator.lt
)
events_src, start_date, operator.lt)
# first events comparison
(
self.to_insert,
self.to_update,
self.to_delete,
) = CalendarSync._events_list_compare(events_src_pending, events_dst)
# first events comparsion
self.to_insert, self.to_update, self.to_delete = CalendarSync._events_list_compare(
events_src_pending, events_dst)
# find in events 'to_delete' past events from source, for update (move to past)
_, add_to_update, self.to_delete = CalendarSync._events_list_compare(
events_src_past, self.to_delete
)
events_src_past, self.to_delete)
self.to_update.extend(add_to_update)
# find if events 'to_insert' exists in gcalendar, for update them
add_to_update, self.to_insert = self.gcalendar.find_exists(self.to_insert)
add_to_update, self.to_insert = self.gcalendar.find_exists(
self.to_insert)
self.to_update.extend(add_to_update)
# exclude outdated events from 'to_update' list, by 'updated' field
self._filter_events_to_update()
self.logger.info(
"prepared to sync: ( insert: %d, update: %d, delete: %d )",
'prepared to sync: ( insert: %d, update: %d, delete: %d )',
len(self.to_insert),
len(self.to_update),
len(self.to_delete),
len(self.to_delete)
)
def clear(self) -> None:
"""clear prepared sync lists (insert, update, delete)"""
""" clear prepared sync lists (insert, update, delete)
"""
self.to_insert.clear()
self.to_update.clear()
self.to_delete.clear()
def apply(self) -> None:
"""apply sync (insert, update, delete), using prepared lists of events"""
""" apply sync (insert, update, delete), using prepared lists of events
"""
self.gcalendar.insert_events(self.to_insert)
self.gcalendar.update_events(self.to_update)
@@ -195,4 +192,4 @@ class CalendarSync:
self.clear()
self.logger.info("sync done")
self.logger.info('sync done')

View File

@@ -1,4 +1,4 @@
from typing import Dict, Any, Union
from typing import Dict, Any
import yaml
@@ -6,48 +6,49 @@ import dateutil.parser
import datetime
import logging
import logging.config
from . import CalendarConverter, GoogleCalendarService, GoogleCalendar, CalendarSync
ConfigDate = Union[str, datetime.datetime]
from . import (
CalendarConverter,
GoogleCalendarService,
GoogleCalendar,
CalendarSync
)
def load_config() -> Dict[str, Any]:
with open("config.yml", "r", encoding="utf-8") as f:
with open('config.yml', 'r', encoding='utf-8') as f:
result = yaml.safe_load(f)
return result
def get_start_date(date: ConfigDate) -> datetime.datetime:
if isinstance(date, datetime.datetime):
return date
if "now" == date:
def get_start_date(date_str: str) -> datetime.datetime:
if 'now' == date_str:
result = datetime.datetime.utcnow()
else:
result = dateutil.parser.parse(date)
result = dateutil.parser.parse(date_str)
return result
def main():
config = load_config()
if "logging" in config:
logging.config.dictConfig(config["logging"])
if 'logging' in config:
logging.config.dictConfig(config['logging'])
calendar_id: str = config["calendar"]["google_id"]
ics_filepath: str = config["calendar"]["source"]
calendarId: str = config['calendar']['google_id']
ics_filepath: str = config['calendar']['source']
start = get_start_date(config["start_from"])
start = get_start_date(config['start_from'])
converter = CalendarConverter()
converter.load(ics_filepath)
service = GoogleCalendarService.from_config(config)
gcalendar = GoogleCalendar(service, calendar_id)
gcalendar = GoogleCalendar(service, calendarId)
sync = CalendarSync(gcalendar, converter)
sync.prepare_sync(start)
sync.apply()
if __name__ == "__main__":
if __name__ == '__main__':
main()

105
tests/bench_converter.py Normal file
View File

@@ -0,0 +1,105 @@
from typing import Iterable, List, Tuple, Union, Optional
from uuid import uuid4
import datetime
from itertools import islice
from dataclasses import dataclass
import time
import statistics
import functools
from sync_ics2gcal import CalendarConverter
@dataclass
class IcsTestEvent:
uid: str
start_date: Union[datetime.datetime, datetime.date]
end_date: Union[datetime.datetime, datetime.date, None] = None
duration: Optional[datetime.timedelta] = None
created: Union[datetime.datetime, datetime.date, None] = None
updated: Union[datetime.datetime, datetime.date, None] = None
@staticmethod
def _format_datetime(value: Union[datetime.datetime, datetime.date]):
result: str = ''
if isinstance(value, datetime.datetime):
result += f'DATE-TIME:{value.strftime("%Y%m%dT%H%M%SZ")}'
else:
result += f'DATE:{value.strftime("%Y%m%d")}'
return result
def render(self) -> str:
result: str = ''
result += 'BEGIN:VEVENT\r\n'
result += f'UID:{self.uid}\r\n'
result += f'DTSTART;VALUE={IcsTestEvent._format_datetime(self.start_date)}\r\n'
if self.end_date is not None:
result += f'DTEND;VALUE={IcsTestEvent._format_datetime(self.end_date)}\r\n'
else:
result += f'DURATION:P{self.duration.days}D\r\n'
if self.created is not None:
result += f'CREATED:{self.created.strftime("%Y%m%dT%H%M%SZ")}\r\n'
if self.updated is not None:
result += f'LAST-MODIFIED:{self.updated.strftime("%Y%m%dT%H%M%SZ")}\r\n'
result += 'END:VEVENT\r\n'
return result
@dataclass
class IcsTestCalendar:
events: List[IcsTestEvent]
def render(self) -> str:
result: str = ''
result += 'BEGIN:VCALENDAR\r\n'
for event in self.events:
result += event.render()
result += 'END:VCALENDAR\r\n'
return result
def gen_test_calendar(events_count: int) -> IcsTestCalendar:
def gen_events() -> Iterable[IcsTestEvent]:
for i in range(10000000):
uid = f'{uuid4()}@test.com'
start_date = datetime.datetime.now() + datetime.timedelta(hours=i)
end_date = start_date + datetime.timedelta(hours=1)
event: IcsTestEvent = IcsTestEvent(
uid=uid, start_date=start_date, end_date=end_date, created=start_date, updated=start_date)
yield event
events: List[IcsTestEvent] = list(islice(gen_events(), events_count))
result: IcsTestCalendar = IcsTestCalendar(events)
return result
test_calendar: IcsTestCalendar = gen_test_calendar(1000)
ics_test_calendar: str = test_calendar.render()
converter = CalendarConverter()
converter.loads(ics_test_calendar)
def bench(num_iters=1000):
def make_wrapper(func):
@functools.wraps(func)
def wrapper(*args, **kw):
times = []
for _ in range(num_iters):
t0 = time.perf_counter_ns()
result = func(*args, **kw)
t1 = time.perf_counter_ns()
times.append(t1 - t0)
best = min(times)
avg = round(sum(times) / num_iters, 2)
median = statistics.median(times)
print(
f'{func.__name__} x {num_iters} => best: {best} ns, \tavg: {avg} ns, \tmedian: {median} ns')
return result
return wrapper()
return make_wrapper
@bench(num_iters=500)
def events_to_gcal():
converter.events_to_gcal()

View File

@@ -1,52 +1,31 @@
import datetime
from typing import Tuple
import pytest
from pytz import timezone, utc
from sync_ics2gcal import CalendarConverter
from sync_ics2gcal.ical import format_datetime_utc
uid = "UID:uisgtr8tre93wewe0yr8wqy@test.com"
only_start_date = (
uid
+ """
only_start_date = uid + """
DTSTART;VALUE=DATE:20180215
"""
)
date_val = (
only_start_date
+ """
date_val = only_start_date + """
DTEND;VALUE=DATE:20180217
"""
)
date_duration = (
only_start_date
+ """
date_duration = only_start_date + """
DURATION:P2D
"""
)
datetime_utc_val = (
uid
+ """
datetime_utc_val = uid + """
DTSTART;VALUE=DATE-TIME:20180319T092001Z
DTEND:20180321T102501Z
"""
)
datetime_utc_duration = (
uid
+ """
datetime_utc_duration = uid + """
DTSTART;VALUE=DATE-TIME:20180319T092001Z
DURATION:P2DT1H5M
"""
)
created_updated = (
date_val
+ """
created_updated = date_val + """
CREATED:20180320T071155Z
LAST-MODIFIED:20180326T120235Z
"""
)
def ics_test_cal(content: str) -> str:
@@ -78,29 +57,14 @@ def test_event_no_end():
converter.events_to_gcal()
@pytest.fixture(
params=[
("date", ics_test_event(date_val), "2018-02-15", "2018-02-17"),
("date", ics_test_event(date_duration), "2018-02-15", "2018-02-17"),
(
"dateTime",
ics_test_event(datetime_utc_val),
"2018-03-19T09:20:01.000001Z",
"2018-03-21T10:25:01.000001Z",
),
(
"dateTime",
ics_test_event(datetime_utc_duration),
"2018-03-19T09:20:01.000001Z",
"2018-03-21T10:25:01.000001Z",
),
],
ids=[
"date values",
"date duration",
"datetime utc values",
"datetime utc duration",
],
@pytest.fixture(params=[
("date", ics_test_event(date_val), '2018-02-15', '2018-02-17'),
("date", ics_test_event(date_duration), '2018-02-15', '2018-02-17'),
("dateTime", ics_test_event(datetime_utc_val),
'2018-03-19T09:20:01.000001Z', '2018-03-21T10:25:01.000001Z'),
("dateTime", ics_test_event(datetime_utc_duration), '2018-03-19T09:20:01.000001Z', '2018-03-21T10:25:01.000001Z')],
ids=['date values', 'date duration',
'datetime utc values', 'datetime utc duration']
)
def param_events_start_end(request):
return request.param
@@ -113,8 +77,12 @@ def test_event_start_end(param_events_start_end: Tuple[str, str, str, str]):
events = converter.events_to_gcal()
assert len(events) == 1
event = events[0]
assert event["start"] == {date_type: start}
assert event["end"] == {date_type: end}
assert event['start'] == {
date_type: start
}
assert event['end'] == {
date_type: end
}
def test_event_created_updated():
@@ -123,24 +91,5 @@ def test_event_created_updated():
events = converter.events_to_gcal()
assert len(events) == 1
event = events[0]
assert event["created"] == "2018-03-20T07:11:55.000001Z"
assert event["updated"] == "2018-03-26T12:02:35.000001Z"
@pytest.mark.parametrize(
"value,expected_str",
[
(
datetime.datetime(2022, 6, 3, 13, 52, 15, 1, utc),
"2022-06-03T13:52:15.000001Z",
),
(
datetime.datetime(2022, 6, 3, 13, 52, 15, 1, timezone("Europe/Moscow")),
"2022-06-03T11:22:15.000001Z",
),
(datetime.date(2022, 6, 3), "2022-06-03T00:00:00.000001Z"),
],
ids=["utc", "with timezone", "date"],
)
def test_format_datetime_utc(value: datetime.datetime, expected_str: str):
assert format_datetime_utc(value) == expected_str
assert event['created'] == '2018-03-20T07:11:55.000001Z'
assert event['updated'] == '2018-03-26T12:02:35.000001Z'

View File

@@ -14,30 +14,28 @@ from sync_ics2gcal import CalendarSync
def sha1(string: Union[str, bytes]) -> str:
if isinstance(string, str):
string = string.encode("utf8")
string = string.encode('utf8')
h = hashlib.sha1()
h.update(string)
return h.hexdigest()
def gen_events(
start: int,
stop: int,
start_time: Union[datetime.datetime, datetime.date],
no_time: bool = False,
) -> List[Dict[str, Union[str, Dict[str, str]]]]:
def gen_events(start: int,
stop: int,
start_time: Union[datetime.datetime, datetime.date],
no_time: bool = False) -> List[Dict[str, Union[str, Dict[str, str]]]]:
if no_time:
start_time = datetime.date(start_time.year, start_time.month, start_time.day)
start_time = datetime.date(
start_time.year, start_time.month, start_time.day)
duration: datetime.timedelta = datetime.date(1, 1, 2) - datetime.date(1, 1, 1)
date_key: str = "date"
date_end: str = ""
date_end: str = ''
else:
start_time = utc.normalize(start_time.astimezone(utc)).replace(tzinfo=None)
duration: datetime.timedelta = datetime.datetime(
1, 1, 1, 2
) - datetime.datetime(1, 1, 1, 1)
start_time = utc.normalize(
start_time.astimezone(utc)).replace(tzinfo=None)
duration: datetime.timedelta = datetime.datetime(1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1)
date_key: str = "dateTime"
date_end: str = "Z"
date_end: str = 'Z'
result: List[Dict[str, Union[str, Dict[str, str]]]] = []
for i in range(start, stop):
@@ -47,18 +45,17 @@ def gen_events(
updated: Union[datetime.datetime, datetime.date] = event_start
if no_time:
updated = datetime.datetime(
updated.year, updated.month, updated.day, 0, 0, 0, 1, tzinfo=utc
)
updated.year, updated.month, updated.day, 0, 0, 0, 1, tzinfo=utc)
event: Dict[str, Union[str, Dict[str, str]]] = {
"summary": "test event __ {}".format(i),
"location": "la la la {}".format(i),
"description": "test TEST -- test event {}".format(i),
'summary': 'test event __ {}'.format(i),
'location': 'la la la {}'.format(i),
'description': 'test TEST -- test event {}'.format(i),
"iCalUID": "{}@test.com".format(sha1("test - event {}".format(i))),
"updated": updated.isoformat() + "Z",
"created": updated.isoformat() + "Z",
"start": {date_key: event_start.isoformat() + date_end},
"end": {date_key: event_end.isoformat() + date_end},
"updated": updated.isoformat() + 'Z',
"created": updated.isoformat() + 'Z',
'start': {date_key: event_start.isoformat() + date_end},
'end': {date_key: event_end.isoformat() + date_end}
}
result.append(event)
return result
@@ -67,21 +64,19 @@ def gen_events(
def gen_list_to_compare(start: int, stop: int) -> List[Dict[str, str]]:
result: List[Dict[str, str]] = []
for i in range(start, stop):
result.append({"iCalUID": "test{:06d}".format(i)})
result.append({'iCalUID': 'test{:06d}'.format(i)})
return result
def get_start_date(
event: Dict[str, Union[str, Dict[str, str]]]
) -> Union[datetime.datetime, datetime.date]:
event_start: Dict[str, str] = event["start"]
def get_start_date(event: Dict[str, Union[str, Dict[str, str]]]) -> Union[datetime.datetime, datetime.date]:
event_start: Dict[str, str] = event['start']
start_date: Optional[str] = None
is_date = False
if "date" in event_start:
start_date = event_start["date"]
if 'date' in event_start:
start_date = event_start['date']
is_date = True
if "dateTime" in event_start:
start_date = event_start["dateTime"]
if 'dateTime' in event_start:
start_date = event_start['dateTime']
result = dateutil.parser.parse(start_date)
if is_date:
@@ -95,7 +90,8 @@ def test_compare():
# [1..2n]
lst_src = gen_list_to_compare(1, 1 + part_len * 2)
# [n..3n]
lst_dst = gen_list_to_compare(1 + part_len, 1 + part_len * 3)
lst_dst = gen_list_to_compare(
1 + part_len, 1 + part_len * 3)
lst_src_rnd = deepcopy(lst_src)
lst_dst_rnd = deepcopy(lst_dst)
@@ -103,14 +99,15 @@ def test_compare():
shuffle(lst_src_rnd)
shuffle(lst_dst_rnd)
to_ins, to_upd, to_del = CalendarSync._events_list_compare(lst_src_rnd, lst_dst_rnd)
to_ins, to_upd, to_del = CalendarSync._events_list_compare(
lst_src_rnd, lst_dst_rnd)
assert len(to_ins) == part_len
assert len(to_upd) == part_len
assert len(to_del) == part_len
assert sorted(to_ins, key=lambda x: x["iCalUID"]) == lst_src[:part_len]
assert sorted(to_del, key=lambda x: x["iCalUID"]) == lst_dst[part_len:]
assert sorted(to_ins, key=lambda x: x['iCalUID']) == lst_src[:part_len]
assert sorted(to_del, key=lambda x: x['iCalUID']) == lst_dst[part_len:]
to_upd_ok = list(zip(lst_src[part_len:], lst_dst[:part_len]))
assert len(to_upd) == len(to_upd_ok)
@@ -118,29 +115,35 @@ def test_compare():
assert item in to_upd
@pytest.mark.parametrize("no_time", [True, False], ids=["date", "dateTime"])
@pytest.mark.parametrize("no_time", [True, False], ids=['date', 'dateTime'])
def test_filter_events_by_date(no_time: bool):
msk = timezone("Europe/Moscow")
msk = timezone('Europe/Moscow')
now = utc.localize(datetime.datetime.utcnow())
msk_now = msk.normalize(now.astimezone(msk))
part_len = 5
if no_time:
duration = datetime.date(1, 1, 2) - datetime.date(1, 1, 1)
duration = datetime.date(
1, 1, 2) - datetime.date(1, 1, 1)
else:
duration = datetime.datetime(1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1)
duration = datetime.datetime(
1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1)
date_cmp = msk_now + (duration * part_len)
if no_time:
date_cmp = datetime.date(date_cmp.year, date_cmp.month, date_cmp.day)
date_cmp = datetime.date(
date_cmp.year, date_cmp.month, date_cmp.day)
events = gen_events(1, 1 + (part_len * 2), msk_now, no_time)
events = gen_events(
1, 1 + (part_len * 2), msk_now, no_time)
shuffle(events)
events_pending = CalendarSync._filter_events_by_date(events, date_cmp, operator.ge)
events_past = CalendarSync._filter_events_by_date(events, date_cmp, operator.lt)
events_pending = CalendarSync._filter_events_by_date(
events, date_cmp, operator.ge)
events_past = CalendarSync._filter_events_by_date(
events, date_cmp, operator.lt)
assert len(events_pending) == 1 + part_len
assert len(events_past) == part_len - 1
@@ -153,11 +156,12 @@ def test_filter_events_by_date(no_time: bool):
def test_filter_events_to_update():
msk = timezone("Europe/Moscow")
msk = timezone('Europe/Moscow')
now = utc.localize(datetime.datetime.utcnow())
msk_now = msk.normalize(now.astimezone(msk))
one_hour = datetime.datetime(1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1)
one_hour = datetime.datetime(
1, 1, 1, 2) - datetime.datetime(1, 1, 1, 1)
date_upd = msk_now + (one_hour * 5)
count = 10
@@ -174,30 +178,3 @@ def test_filter_events_to_update():
assert len(sync1.to_update) == count
assert sync2.to_update == []
def test_filter_events_no_updated():
"""
test filtering events that not have 'updated' field
such events should always pass the filter
"""
now = datetime.datetime.utcnow()
yesterday = now - datetime.timedelta(days=-1)
count = 10
events_old = gen_events(1, 1 + count, now)
events_new = gen_events(1, 1 + count, now)
# 1/2 updated=yesterday, 1/2 no updated field
i = 0
for event in events_new:
if 0 == i % 2:
event["updated"] = yesterday.isoformat() + "Z"
else:
del event["updated"]
i += 1
sync = CalendarSync(None, None)
sync.to_update = list(zip(events_old, events_new))
sync._filter_events_to_update()
assert len(sync.to_update) == count // 2