mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-18 18:55:28 -04:00
Harden CalDAV write-back with retries (#1193)
Co-authored-by: Alexandre Teixeira <111787685+alteixeira20@users.noreply.github.com>
This commit is contained in:
+178
-1
@@ -128,6 +128,17 @@ def validate_caldav_url(raw_url: str) -> str:
|
||||
return urlunparse(parsed._replace(fragment="")).rstrip("/")
|
||||
|
||||
|
||||
def _event_etag(obj) -> str:
|
||||
"""Best-effort ETag extraction from python-caldav resources."""
|
||||
try:
|
||||
etag = getattr(obj, "etag", None)
|
||||
if callable(etag):
|
||||
etag = etag()
|
||||
return str(etag or "")
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
def _stable_cal_id(remote_url: str, owner: str = "", account_id: str = "") -> str:
|
||||
"""Deterministic local id for a remote CalDAV calendar, scoped to owner
|
||||
and account so two users — or one user with two accounts — pointing at
|
||||
@@ -316,11 +327,12 @@ def _sync_blocking(owner: str, url: str, username: str, password: str, account_i
|
||||
color="#5b8abf",
|
||||
source="caldav",
|
||||
account_id=account_id or None,
|
||||
caldav_base_url=remote_url,
|
||||
)
|
||||
db.add(local_cal)
|
||||
db.commit()
|
||||
else:
|
||||
# Refresh display name and stamp account_id if missing.
|
||||
# Refresh display name and stamp CalDAV metadata if missing.
|
||||
changed = False
|
||||
if local_cal.name != display_name:
|
||||
local_cal.name = display_name
|
||||
@@ -328,6 +340,9 @@ def _sync_blocking(owner: str, url: str, username: str, password: str, account_i
|
||||
if account_id and not local_cal.account_id:
|
||||
local_cal.account_id = account_id
|
||||
changed = True
|
||||
if local_cal.caldav_base_url != remote_url:
|
||||
local_cal.caldav_base_url = remote_url
|
||||
changed = True
|
||||
if changed:
|
||||
db.commit()
|
||||
result["calendars"] += 1
|
||||
@@ -395,6 +410,9 @@ def _sync_blocking(owner: str, url: str, username: str, password: str, account_i
|
||||
|
||||
existing = _find_existing_event(db, pending, uid_val, local_cal.id)
|
||||
if existing:
|
||||
if existing.caldav_sync_pending in {"create", "update"}:
|
||||
result["events"] += 1
|
||||
continue
|
||||
existing.calendar_id = local_cal.id
|
||||
existing.summary = summary
|
||||
existing.description = description
|
||||
@@ -405,6 +423,9 @@ def _sync_blocking(owner: str, url: str, username: str, password: str, account_i
|
||||
existing.is_utc = row_is_utc
|
||||
existing.rrule = rrule
|
||||
existing.origin = "caldav"
|
||||
existing.remote_href = str(getattr(obj, "url", "") or "") or None
|
||||
existing.remote_etag = _event_etag(obj) or None
|
||||
existing.caldav_sync_pending = None
|
||||
else:
|
||||
new_ev = CalendarEvent(
|
||||
uid=uid_val,
|
||||
@@ -418,6 +439,8 @@ def _sync_blocking(owner: str, url: str, username: str, password: str, account_i
|
||||
is_utc=row_is_utc,
|
||||
rrule=rrule,
|
||||
origin="caldav",
|
||||
remote_href=str(getattr(obj, "url", "") or "") or None,
|
||||
remote_etag=_event_etag(obj) or None,
|
||||
)
|
||||
db.add(new_ev)
|
||||
pending[uid_val] = new_ev
|
||||
@@ -442,6 +465,8 @@ def _sync_blocking(owner: str, url: str, username: str, password: str, account_i
|
||||
CalendarEvent.origin == "caldav",
|
||||
CalendarEvent.dtstart >= start,
|
||||
CalendarEvent.dtstart <= end,
|
||||
CalendarEvent.remote_href.isnot(None),
|
||||
CalendarEvent.caldav_sync_pending.is_(None),
|
||||
~CalendarEvent.uid.in_(seen_uids) if seen_uids else CalendarEvent.uid.isnot(None),
|
||||
).all()
|
||||
for ev in stale:
|
||||
@@ -458,6 +483,92 @@ def _sync_blocking(owner: str, url: str, username: str, password: str, account_i
|
||||
return result
|
||||
|
||||
|
||||
def _event_payload(ev) -> dict:
|
||||
return {
|
||||
"uid": ev.uid,
|
||||
"summary": ev.summary,
|
||||
"description": ev.description,
|
||||
"location": ev.location,
|
||||
"dtstart": ev.dtstart,
|
||||
"dtend": ev.dtend,
|
||||
"all_day": ev.all_day,
|
||||
"is_utc": ev.is_utc,
|
||||
"rrule": ev.rrule or "",
|
||||
}
|
||||
|
||||
|
||||
def _load_event_for_writeback(owner: str, uid: str) -> tuple[str, str, dict] | None:
|
||||
from core.database import CalendarCal, CalendarEvent, SessionLocal
|
||||
|
||||
db = SessionLocal()
|
||||
try:
|
||||
ev = (
|
||||
db.query(CalendarEvent)
|
||||
.join(CalendarCal)
|
||||
.filter(CalendarEvent.uid == uid, CalendarCal.owner == owner)
|
||||
.first()
|
||||
)
|
||||
if not ev or not ev.calendar or ev.calendar.source != "caldav":
|
||||
return None
|
||||
return ev.calendar.source, ev.calendar.id, _event_payload(ev)
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def _load_delete_for_writeback(owner: str, uid: str) -> tuple[str, str, dict] | None:
|
||||
from core.database import CalendarCal, CalendarDeletedEvent, CalendarEvent, SessionLocal
|
||||
|
||||
db = SessionLocal()
|
||||
try:
|
||||
tombstone = db.query(CalendarDeletedEvent).filter(
|
||||
CalendarDeletedEvent.uid == uid,
|
||||
CalendarDeletedEvent.owner == owner,
|
||||
).first()
|
||||
if tombstone:
|
||||
return "caldav", tombstone.calendar_id, {"uid": uid}
|
||||
|
||||
ev = (
|
||||
db.query(CalendarEvent)
|
||||
.join(CalendarCal)
|
||||
.filter(CalendarEvent.uid == uid, CalendarCal.owner == owner)
|
||||
.first()
|
||||
)
|
||||
if not ev or not ev.calendar or ev.calendar.source != "caldav":
|
||||
return None
|
||||
return ev.calendar.source, ev.calendar.id, {"uid": uid}
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def _pending_writeback_uids(owner: str) -> tuple[list[str], list[str]]:
|
||||
from core.database import CalendarCal, CalendarDeletedEvent, CalendarEvent, SessionLocal
|
||||
|
||||
db = SessionLocal()
|
||||
try:
|
||||
rows = (
|
||||
db.query(CalendarEvent.uid)
|
||||
.join(CalendarCal)
|
||||
.filter(
|
||||
CalendarCal.owner == owner,
|
||||
CalendarCal.source == "caldav",
|
||||
CalendarEvent.status != "cancelled",
|
||||
(
|
||||
(CalendarEvent.caldav_sync_pending.isnot(None))
|
||||
| (CalendarEvent.remote_href.is_(None))
|
||||
),
|
||||
)
|
||||
.all()
|
||||
)
|
||||
delete_rows = (
|
||||
db.query(CalendarDeletedEvent.uid)
|
||||
.filter(CalendarDeletedEvent.owner == owner)
|
||||
.all()
|
||||
)
|
||||
return [row[0] for row in rows], [row[0] for row in delete_rows]
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def _load_caldav_accounts(owner: str) -> list:
|
||||
"""Return the list of CalDAV accounts for *owner*, auto-migrating the legacy
|
||||
single-account ``caldav`` key to the new ``caldav_accounts`` list on first call.
|
||||
@@ -533,3 +644,69 @@ async def sync_caldav(owner: str) -> dict:
|
||||
for err in result.get("errors", []):
|
||||
totals["errors"].append(f"{label}: {err}")
|
||||
return totals
|
||||
|
||||
|
||||
async def push_event_create(owner: str, uid: str) -> dict:
|
||||
loaded = _load_event_for_writeback(owner, uid)
|
||||
if not loaded:
|
||||
return {"ok": True, "skipped": True}
|
||||
source, calendar_id, payload = loaded
|
||||
from src.caldav_writeback import writeback_event
|
||||
return await writeback_event(owner, source, calendar_id, payload)
|
||||
|
||||
|
||||
async def push_event_update(owner: str, uid: str) -> dict:
|
||||
return await push_event_create(owner, uid)
|
||||
|
||||
|
||||
async def push_event_delete(owner: str, uid: str) -> dict:
|
||||
loaded = _load_delete_for_writeback(owner, uid)
|
||||
if not loaded:
|
||||
return {"ok": True, "skipped": True}
|
||||
source, calendar_id, payload = loaded
|
||||
from src.caldav_writeback import writeback_event
|
||||
return await writeback_event(owner, source, calendar_id, payload, delete=True)
|
||||
|
||||
|
||||
async def push_pending_events(owner: str) -> dict:
|
||||
result = {"events": 0, "errors": []}
|
||||
uids, delete_uids = _pending_writeback_uids(owner)
|
||||
for event_uid in uids:
|
||||
try:
|
||||
out = await push_event_update(owner, event_uid)
|
||||
if out.get("ok"):
|
||||
result["events"] += 1
|
||||
elif not out.get("skipped"):
|
||||
result["errors"].append(f"{event_uid}: {str(out.get('error') or out)[:160]}")
|
||||
except Exception as e:
|
||||
logger.warning("CalDAV pending push failed for uid=%s: %s", event_uid, e)
|
||||
result["errors"].append(f"{event_uid}: {str(e)[:160]}")
|
||||
for event_uid in delete_uids:
|
||||
try:
|
||||
out = await push_event_delete(owner, event_uid)
|
||||
if out.get("ok"):
|
||||
result["events"] += 1
|
||||
elif not out.get("skipped"):
|
||||
result["errors"].append(f"{event_uid}: {str(out.get('error') or out)[:160]}")
|
||||
except Exception as e:
|
||||
logger.warning("CalDAV pending delete failed for uid=%s: %s", event_uid, e)
|
||||
result["errors"].append(f"{event_uid}: {str(e)[:160]}")
|
||||
return result
|
||||
|
||||
|
||||
async def sync_caldav_direction(owner: str, direction: str = "pull") -> dict:
|
||||
direction = (direction or "pull").strip().lower()
|
||||
if direction == "pull":
|
||||
return await sync_caldav(owner)
|
||||
if direction == "push":
|
||||
return await push_pending_events(owner)
|
||||
if direction == "both":
|
||||
pushed = await push_pending_events(owner)
|
||||
pulled = await sync_caldav(owner)
|
||||
return {"push": pushed, "pull": pulled}
|
||||
return {
|
||||
"calendars": 0,
|
||||
"events": 0,
|
||||
"deleted": 0,
|
||||
"errors": [f"Unsupported CalDAV sync direction: {direction}"],
|
||||
}
|
||||
|
||||
+92
-6
@@ -89,6 +89,23 @@ def find_remote_calendar(calendars, local_cal_id: str, owner: str = "", account_
|
||||
return None
|
||||
|
||||
|
||||
def _resource_href(obj) -> str:
|
||||
try:
|
||||
return str(getattr(obj, "url", "") or "")
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
def _resource_etag(obj) -> str:
|
||||
try:
|
||||
etag = getattr(obj, "etag", None)
|
||||
if callable(etag):
|
||||
etag = etag()
|
||||
return str(etag or "")
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
def push_event(calendars, local_cal_id: str, ev: dict, *, delete: bool = False,
|
||||
owner: str = "", account_id: str = "") -> dict:
|
||||
"""Create/update (or delete) ``ev`` on the matching remote calendar.
|
||||
@@ -105,6 +122,7 @@ def push_event(calendars, local_cal_id: str, ev: dict, *, delete: bool = False,
|
||||
remote = find_remote_calendar(calendars, local_cal_id, owner=owner, account_id=account_id)
|
||||
if remote is None:
|
||||
return {"ok": False, "error": "remote calendar not found"}
|
||||
remote_url = str(getattr(remote, "url", "") or "")
|
||||
|
||||
try:
|
||||
existing = remote.event_by_uid(uid)
|
||||
@@ -113,17 +131,34 @@ def push_event(calendars, local_cal_id: str, ev: dict, *, delete: bool = False,
|
||||
|
||||
if delete:
|
||||
if existing is None:
|
||||
return {"ok": True, "note": "already absent on remote"}
|
||||
return {"ok": True, "note": "already absent on remote", "calendar_url": remote_url}
|
||||
existing.delete()
|
||||
return {"ok": True}
|
||||
return {
|
||||
"ok": True,
|
||||
"calendar_url": remote_url,
|
||||
"remote_href": _resource_href(existing),
|
||||
"remote_etag": _resource_etag(existing),
|
||||
}
|
||||
|
||||
ical = build_event_ical(ev)
|
||||
if existing is not None:
|
||||
existing.data = ical
|
||||
existing.save()
|
||||
return {"ok": True, "updated": True}
|
||||
remote.save_event(ical)
|
||||
return {"ok": True, "created": True}
|
||||
return {
|
||||
"ok": True,
|
||||
"updated": True,
|
||||
"calendar_url": remote_url,
|
||||
"remote_href": _resource_href(existing),
|
||||
"remote_etag": _resource_etag(existing),
|
||||
}
|
||||
created = remote.save_event(ical)
|
||||
return {
|
||||
"ok": True,
|
||||
"created": True,
|
||||
"calendar_url": remote_url,
|
||||
"remote_href": _resource_href(created),
|
||||
"remote_etag": _resource_etag(created),
|
||||
}
|
||||
|
||||
|
||||
def _discover_calendars(client):
|
||||
@@ -154,6 +189,54 @@ def _writeback_blocking(local_cal_id, ev, delete, url, username, password,
|
||||
owner=owner, account_id=account_id)
|
||||
|
||||
|
||||
def _persist_writeback_result(owner: str, calendar_id: str, uid: str, result: dict, *, delete: bool) -> None:
|
||||
from core.database import CalendarCal, CalendarDeletedEvent, CalendarEvent, SessionLocal
|
||||
|
||||
if not uid or not isinstance(result, dict):
|
||||
return
|
||||
|
||||
db = SessionLocal()
|
||||
try:
|
||||
calendar = db.query(CalendarCal).filter(
|
||||
CalendarCal.id == calendar_id,
|
||||
CalendarCal.owner == owner,
|
||||
).first()
|
||||
if calendar and result.get("calendar_url"):
|
||||
calendar.caldav_base_url = result.get("calendar_url")
|
||||
|
||||
if delete:
|
||||
tombstone = db.query(CalendarDeletedEvent).filter(
|
||||
CalendarDeletedEvent.uid == uid,
|
||||
CalendarDeletedEvent.owner == owner,
|
||||
).first()
|
||||
if result.get("ok"):
|
||||
if tombstone:
|
||||
db.delete(tombstone)
|
||||
elif tombstone:
|
||||
tombstone.last_error = str(result.get("error") or result)[:500]
|
||||
db.commit()
|
||||
return
|
||||
|
||||
event = (
|
||||
db.query(CalendarEvent)
|
||||
.join(CalendarCal)
|
||||
.filter(CalendarEvent.uid == uid, CalendarCal.owner == owner)
|
||||
.first()
|
||||
)
|
||||
if event and result.get("ok"):
|
||||
if result.get("remote_href"):
|
||||
event.remote_href = result.get("remote_href")
|
||||
if result.get("remote_etag"):
|
||||
event.remote_etag = result.get("remote_etag")
|
||||
event.caldav_sync_pending = None
|
||||
db.commit()
|
||||
except Exception:
|
||||
db.rollback()
|
||||
logger.exception("CalDAV write-back metadata persistence failed")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
async def writeback_event(owner: str, calendar_source: str, calendar_id: str,
|
||||
ev: dict, *, delete: bool = False) -> dict:
|
||||
"""Best-effort push of a local change to the remote CalDAV server.
|
||||
@@ -204,9 +287,12 @@ async def writeback_event(owner: str, calendar_source: str, calendar_id: str,
|
||||
result = await asyncio.to_thread(
|
||||
_writeback_blocking, calendar_id, ev, delete, url, user, pw, owner, acc_id
|
||||
)
|
||||
_persist_writeback_result(owner, calendar_id, (ev or {}).get("uid", ""), result, delete=delete)
|
||||
if not result.get("ok"):
|
||||
logger.warning("CalDAV write-back did not apply: %s", result.get("error") or result)
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.exception("CalDAV write-back raised")
|
||||
return {"ok": False, "error": str(e)[:200]}
|
||||
result = {"ok": False, "error": str(e)[:200]}
|
||||
_persist_writeback_result(owner, calendar_id, (ev or {}).get("uid", ""), result, delete=delete)
|
||||
return result
|
||||
|
||||
@@ -1445,7 +1445,15 @@ async def do_manage_calendar(content: str, owner: Optional[str] = None) -> Dict:
|
||||
"""Handle manage_calendar tool calls: list/create/update/delete calendar events (local SQLite)."""
|
||||
from datetime import datetime, timedelta
|
||||
from core.database import SessionLocal, CalendarCal, CalendarEvent, Note
|
||||
from routes.calendar_routes import _ensure_default_calendar, _parse_dt, _parse_dt_pair, parse_due_for_user, _resolve_base_uid
|
||||
from routes.calendar_routes import (
|
||||
_ensure_default_calendar,
|
||||
_parse_dt,
|
||||
_parse_dt_pair,
|
||||
parse_due_for_user,
|
||||
_resolve_base_uid,
|
||||
_push_caldav_event_after_commit,
|
||||
_record_caldav_delete_tombstone,
|
||||
)
|
||||
import uuid as _uuid
|
||||
|
||||
try:
|
||||
@@ -1825,6 +1833,7 @@ async def do_manage_calendar(content: str, owner: Optional[str] = None) -> Dict:
|
||||
rrule=args.get("rrule", "") or "",
|
||||
event_type=event_type,
|
||||
importance=importance,
|
||||
caldav_sync_pending="create" if cal.source == "caldav" else None,
|
||||
)
|
||||
db.add(ev)
|
||||
reminder_note_id = None
|
||||
@@ -1839,6 +1848,8 @@ async def do_manage_calendar(content: str, owner: Optional[str] = None) -> Dict:
|
||||
dtstart_is_utc and not all_day,
|
||||
)
|
||||
db.commit()
|
||||
if cal.source == "caldav":
|
||||
await _push_caldav_event_after_commit(owner, uid, "create")
|
||||
tag_blurb = f" [{event_type}]" if event_type else ""
|
||||
if minutes_before is None:
|
||||
reminder_blurb = ""
|
||||
@@ -1896,7 +1907,12 @@ async def do_manage_calendar(content: str, owner: Optional[str] = None) -> Dict:
|
||||
ev.event_type = _tag or None
|
||||
if args.get("importance") is not None:
|
||||
ev.importance = args["importance"]
|
||||
is_caldav = ev.calendar and ev.calendar.source == "caldav"
|
||||
if is_caldav:
|
||||
ev.caldav_sync_pending = "update"
|
||||
db.commit()
|
||||
if is_caldav:
|
||||
await _push_caldav_event_after_commit(owner, base_uid, "update")
|
||||
return {"response": f"Updated event {uid}", "exit_code": 0}
|
||||
|
||||
elif action == "delete_event":
|
||||
@@ -1910,8 +1926,13 @@ async def do_manage_calendar(content: str, owner: Optional[str] = None) -> Dict:
|
||||
ev = _event_query().filter(CalendarEvent.uid == base_uid).first()
|
||||
if not ev:
|
||||
return {"error": f"Event {uid} not found", "exit_code": 1}
|
||||
is_caldav = ev.calendar and ev.calendar.source == "caldav" and ev.remote_href
|
||||
if is_caldav:
|
||||
_record_caldav_delete_tombstone(db, ev, owner)
|
||||
db.delete(ev)
|
||||
db.commit()
|
||||
if is_caldav:
|
||||
await _push_caldav_event_after_commit(owner, base_uid, "delete")
|
||||
return {"response": f"Deleted event {uid}", "exit_code": 0}
|
||||
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user