""" builtin_actions.py Registry of built-in automation actions that can be executed by the task scheduler without needing an LLM call. """ import logging import os from datetime import datetime from typing import Tuple from src.auth_helpers import owner_filter from core.platform_compat import IS_WINDOWS, find_bash from core.constants import internal_api_base from src.constants import DATA_DIR, DEEP_RESEARCH_DIR, TIDY_CALENDAR_STATE_FILE, EMAIL_URGENCY_CACHE_DIR, COOKBOOK_STATE_FILE logger = logging.getLogger(__name__) class TaskNoop(BaseException): """Raised by an action when it determined there's nothing to do. Inherits from BaseException (not Exception) so the standard `except Exception` wrappers each action uses for real error handling don't accidentally catch it. The scheduler explicitly catches TaskNoop, drops the queued TaskRun row, advances last_run / next_run, and exits silently. Nothing appears in the Activity log; the message is logged server-side only. """ class TaskDeferred(BaseException): """Raised when a task should run later without recording a skipped run.""" def __init__(self, reason: str, delay_seconds: int = 20 * 60): super().__init__(reason) self.reason = reason self.delay_seconds = delay_seconds async def action_tidy_sessions(owner: str, **kwargs) -> Tuple[str, bool]: """Delete empty sessions for the owner. Pure heuristic — the LLM folder-sort phase is skipped (user opted to keep this task LLM-free; sorting can be triggered manually via the Chats UI).""" try: import asyncio from src.session_actions import run_auto_sort result = await asyncio.wait_for( run_auto_sort(owner, skip_llm=True, delete_throwaway=False), timeout=60, ) return result, True except asyncio.TimeoutError: logger.error("tidy_sessions action timed out") return "Chat session tidy timed out", False except Exception as e: logger.error(f"tidy_sessions action failed: {e}") return str(e), False async def action_tidy_documents(owner: str, **kwargs) -> Tuple[str, bool]: """Run tidy on documents for the owner.""" try: from src.document_actions import run_document_tidy result = await run_document_tidy(owner) return result, True except Exception as e: logger.error(f"tidy_documents action failed: {e}") return str(e), False async def action_consolidate_memory(owner: str, **kwargs) -> Tuple[str, bool]: """Consolidate/deduplicate memories for the owner.""" try: import json import re from src.constants import DATA_DIR from src.endpoint_resolver import resolve_endpoint from src.llm_core import llm_call_async from src.memory import MemoryManager manager = MemoryManager(DATA_DIR) all_memories = manager.load_all() _owner_clean = (owner or "").strip() text_limit = 2000 def _memory_owner(mem: dict) -> str: return (mem.get("owner") or "").strip() # Built-in housekeeping can run without an owner. In that case scan all # memories, but keep every AI prompt/apply step owner-local. if _owner_clean: memory_groups = { _owner_clean: [m for m in all_memories if _memory_owner(m) == _owner_clean] } else: memory_groups = {} for mem in all_memories: memory_groups.setdefault(_memory_owner(mem), []).append(mem) memory_groups = {group_owner: group for group_owner, group in memory_groups.items() if group} if not memory_groups: raise TaskNoop("no memories to consolidate") total_removed = 0 total_cleaned = 0 total_scanned = 0 removed_examples = [] ai_reasons = [] ai_used = False async def _try_ai_tidy_group(group_owner: str, group_memories: list) -> bool: nonlocal all_memories, total_removed, total_cleaned, total_scanned, ai_used if len(group_memories) < 2: return False url, model, headers = resolve_endpoint("utility", owner=group_owner or None) if not url or not model: url, model, headers = resolve_endpoint("default", owner=group_owner or None) if not url or not model: return False try: items = [ { "id": m.get("id"), "category": m.get("category", "fact"), "text": (m.get("text") or "").strip()[:text_limit], "truncated": len((m.get("text") or "").strip()) > text_limit, } for m in group_memories if m.get("id") and (m.get("text") or "").strip() ] if len(items) < 2: return False truncated_ids = {item["id"] for item in items if item.get("truncated")} prompt = ( "You are tidying a user's saved personal memories. Return ONLY raw JSON, no markdown.\n" "Remove memories that are empty, broken, trivial conversation filler, duplicates, or obsolete " "because a clearer newer memory replaces them. Preserve useful personal facts, preferences, " "contacts, project context, and instructions. If memories conflict, keep the clearest/latest " "one and drop the obsolete one.\n\n" "JSON shape:\n" "{\"keep\":[{\"id\":\"existing id\",\"text\":\"cleaned text\",\"category\":\"fact|preference|identity|event|contact|project|instruction\"}]," "\"drop\":[{\"id\":\"existing id\",\"reason\":\"short reason\"}]}\n\n" f"MEMORIES:\n{json.dumps(items, ensure_ascii=False)}" ) raw = await llm_call_async( url=url, model=model, messages=[{"role": "user", "content": prompt}], temperature=0.0, max_tokens=4096, headers=headers, timeout=120, ) from src.text_helpers import strip_think raw = strip_think(raw or "", prose=False, prompt_echo=False).strip() raw = re.sub(r"^```(?:json)?\s*|\s*```$", "", raw, flags=re.MULTILINE).strip() start = raw.find("{") end = raw.rfind("}") if start != -1 and end != -1 and end > start: decision = json.loads(raw[start:end + 1]) keep_items = decision.get("keep") if isinstance(decision, dict) else None drop_items = decision.get("drop") if isinstance(decision, dict) else None if isinstance(keep_items, list) and isinstance(drop_items, list): by_id = {m.get("id"): m for m in group_memories if m.get("id")} cleaned_by_id = {} for item in keep_items: if not isinstance(item, dict): continue mid = item.get("id") if mid not in by_id: continue text = (item.get("text") or "").strip() if not text: continue cleaned = { "category": (item.get("category") or by_id[mid].get("category") or "fact").strip(), } original_text = (by_id[mid].get("text") or "").strip() if len(original_text) <= text_limit: cleaned["text"] = text cleaned_by_id[mid] = cleaned # Delete only memories the model EXPLICITLY dropped, never # ones it merely omitted from `keep`. Treating the # complement of `keep` as deletions meant a model that # forgot to re-list an id (common) silently destroyed that # memory. Honor the explicit `drop` set instead. drop_ids = { d.get("id") for d in drop_items if isinstance(d, dict) and d.get("id") in by_id } # Never delete a memory the model only saw truncated. drop_ids -= truncated_ids if drop_ids or cleaned_by_id: changed_text = 0 group_ref_ids = {id(m) for m in group_memories} kept_all = [] for mem in all_memories: if id(mem) not in group_ref_ids: kept_all.append(mem) continue mid = mem.get("id") if mid in drop_ids: continue cleaned = cleaned_by_id.get(mid) or {} if mid in truncated_ids: cleaned.pop("text", None) if cleaned.get("text") and cleaned["text"] != mem.get("text"): mem["text"] = cleaned["text"] changed_text += 1 if cleaned.get("category"): mem["category"] = cleaned["category"] kept_all.append(mem) removed = sum(1 for m in group_memories if m.get("id") in drop_ids) total_scanned += len(group_memories) if removed or changed_text: all_memories = kept_all total_removed += removed total_cleaned += changed_text ai_used = True ai_reasons.extend([ (d.get("reason") or "").strip() for d in drop_items if isinstance(d, dict) and (d.get("reason") or "").strip() ]) return True except Exception as ai_err: logger.warning("AI memory tidy failed; falling back to duplicate cleanup: %s", ai_err) return False for group_owner, group_memories in memory_groups.items(): if await _try_ai_tidy_group(group_owner, group_memories): continue seen = {} keep_refs = set() total_scanned += len(group_memories) for mem in group_memories: text = (mem.get("text") or "").strip() key = " ".join(text.lower().split()) if not key: if len(removed_examples) < 3: removed_examples.append("(empty)") continue if key in seen: if len(removed_examples) < 3: removed_examples.append(text[:60] + ("..." if len(text) > 60 else "")) continue seen[key] = mem keep_refs.add(id(mem)) group_removed = len(group_memories) - len(keep_refs) if group_removed == 0: continue group_ref_ids = {id(m) for m in group_memories} all_memories = [ m for m in all_memories if id(m) not in group_ref_ids or id(m) in keep_refs ] total_removed += group_removed if total_removed or total_cleaned: manager.save(all_memories) if ai_used: reasons = ai_reasons[:3] reason_text = f": {'; '.join(reasons)}" if reasons else "" return ( f"AI tidied {total_scanned} memories: " f"removed {total_removed}, cleaned {total_cleaned}{reason_text}", True, ) preview = "; ".join(removed_examples) extra = f" (+{total_removed - len(removed_examples)} more)" if total_removed > len(removed_examples) else "" return f"Removed {total_removed} duplicate(s) of {total_scanned}: {preview}{extra}", True raise TaskNoop(f"scanned {total_scanned} memories, no duplicates") except Exception as e: logger.error(f"consolidate_memory action failed: {e}") return str(e), False # Registry: action name -> async function(owner, **kwargs) -> (result_str, success_bool) async def _run_subprocess(argv, *, shell: bool = False, timeout: int = 120, label: str = "Command") -> Tuple[str, bool]: """Shared subprocess runner. Wraps the blocking subprocess.run in asyncio.to_thread so the event loop stays responsive.""" import asyncio import subprocess try: result = await asyncio.to_thread( subprocess.run, argv, shell=shell, capture_output=True, text=True, timeout=timeout, ) output = (result.stdout or "").strip() if result.returncode != 0 and result.stderr: output += "\nSTDERR: " + result.stderr.strip() return output or "(no output)", result.returncode == 0 except subprocess.TimeoutExpired: return f"{label} timed out ({timeout}s)", False except Exception as e: return str(e), False async def action_ssh_command(owner: str, command: str = "", host: str = "localhost", **kwargs) -> Tuple[str, bool]: """Run a shell command locally or on a remote host via SSH.""" if not command: return "No command specified", False if host in ("localhost", "127.0.0.1", "local"): if IS_WINDOWS: bash = find_bash() if bash: return await _run_subprocess([bash, "-c", command], timeout=120, label="Command") return await _run_subprocess(command, shell=True, timeout=120, label="Command") return await _run_subprocess(["bash", "-c", command], timeout=120, label="Command") return await _run_subprocess( ["ssh", "-o", "ConnectTimeout=10", host, command], timeout=120, label="Command", ) async def action_run_script(owner: str, script: str = "", host: str = "", **kwargs) -> Tuple[str, bool]: """Run a script locally, or via SSH when a host is configured.""" if not script: return "No script specified", False target_host = (host or os.getenv("ODYSSEUS_SCRIPT_HOST", "localhost")).strip() if target_host in ("", "localhost", "127.0.0.1", "local"): if IS_WINDOWS and find_bash(): return await _run_subprocess([find_bash(), "-c", script], timeout=300, label="Script") return await _run_subprocess(script, shell=True, timeout=300, label="Script") return await _run_subprocess(["ssh", target_host, script], timeout=300, label="Script") async def action_run_local(owner: str, script: str = "", **kwargs) -> Tuple[str, bool]: """Run a script locally (no SSH).""" if not script: return "No script specified", False if IS_WINDOWS and find_bash(): return await _run_subprocess([find_bash(), "-c", script], timeout=300, label="Script") return await _run_subprocess(script, shell=True, timeout=300, label="Script") async def action_tidy_research(owner: str, **kwargs) -> Tuple[str, bool]: """Remove only broken research files (empty or unparseable JSON). Research history lives entirely in data/deep_research/.json and is NOT backed by chat-session rows — so a file must never be deleted just because no chat session matches its id. Only prune files that fail to load.""" try: from pathlib import Path import json as _json research_dir = Path(DEEP_RESEARCH_DIR) if not research_dir.exists(): raise TaskNoop("no research directory") files = list(research_dir.glob("*.json")) removed = [] for p in files: try: txt = p.read_text(encoding="utf-8").strip() if not txt: raise ValueError("empty file") _json.loads(txt) # valid JSON → keep except Exception: p.unlink(missing_ok=True) removed.append(p.stem[:8]) if not removed: raise TaskNoop(f"scanned {len(files)} research file(s), none broken") return f"Removed {len(removed)} broken research file(s) of {len(files)}", True except Exception as e: logger.error(f"tidy_research action failed: {e}") return str(e), False async def action_tidy_calendar(owner: str, **kwargs) -> Tuple[str, bool]: """Find duplicate calendar events (same title + start time) and DELETE the dups, keeping the oldest (first-seen) instance. Incremental: remembers the newest `created_at` already scanned in data/tidy_calendar_state.json. If no events have been added since then, short-circuits. Otherwise only events newer than the watermark are candidates for deletion, but they're checked against the FULL existing set so a new duplicate of an old event still gets caught. """ try: import json from pathlib import Path from core.database import SessionLocal, CalendarEvent from sqlalchemy import func STATE_FILE = Path(TIDY_CALENDAR_STATE_FILE) last_watermark = None try: if STATE_FILE.exists(): saved = json.loads(STATE_FILE.read_text(encoding="utf-8")) if saved.get("last_created_at"): last_watermark = datetime.fromisoformat(saved["last_created_at"]) except Exception: last_watermark = None db = SessionLocal() try: newest = db.query(func.max(CalendarEvent.created_at)).scalar() db.query(CalendarEvent).count() # Short-circuit: nothing new since last run if last_watermark is not None and newest is not None and newest <= last_watermark: raise TaskNoop(f"no new events since watermark {last_watermark.strftime('%Y-%m-%d %H:%M')}") events = db.query(CalendarEvent).order_by(CalendarEvent.dtstart).all() # Build full seen-set from events at or before the watermark (known-clean). # Events after the watermark are candidates for deletion. seen = {} candidates = [] no_title = 0 for e in events: title = (e.summary or "").strip() if not title: no_title += 1 continue if last_watermark is None or (e.created_at and e.created_at <= last_watermark): # Known-clean region: first occurrence wins key = (title.lower(), e.dtstart) if key not in seen: seen[key] = e # If a dup exists in the known-clean region (first run, or imported later # with the same created_at), still remove it — fall through to candidate check. else: candidates.append(e) else: candidates.append(e) removed = [] for e in candidates: title = (e.summary or "").strip() key = (title.lower(), e.dtstart) if key in seen: when = e.dtstart.strftime('%Y-%m-%d %H:%M') if e.dtstart else '?' removed.append(f"{title} @ {when}") db.delete(e) else: seen[key] = e if removed: db.commit() # Persist the new watermark (newest created_at among events that survive) try: STATE_FILE.parent.mkdir(parents=True, exist_ok=True) if newest is not None: STATE_FILE.write_text(json.dumps({ "last_created_at": newest.isoformat(), "last_run_at": datetime.utcnow().isoformat(), "scanned": len(events), "removed": len(removed), }, indent=2), encoding="utf-8") except Exception as se: logger.warning(f"tidy_calendar watermark save failed: {se}") new_since = len(candidates) parts = [f"Scanned {len(events)} event(s), {new_since} new since last run"] if removed: preview = "; ".join(removed[:5]) if len(removed) > 5: preview += f" (+{len(removed) - 5} more)" parts.append(f"removed {len(removed)} duplicate(s): {preview}") if no_title: parts.append(f"{no_title} untitled (kept)") if not removed and not no_title: parts.append("no duplicates") return " · ".join(parts), True finally: db.close() except Exception as e: logger.error(f"tidy_calendar action failed: {e}") return str(e), False def _result_has_work(result: str | None) -> bool: """Heuristic: did the email pass actually process anything? `_run_auto_summarize_once` returns strings like 'Processed 0 emails', 'No new emails to summarize', 'Tagged 0 / Moved 0', etc. when nothing was done. Used to decide whether to record the run or noop it. """ if not isinstance(result, str) or not result: return False low = result.lower() if "processed 0" in low or "no new" in low or "nothing to" in low: return False # "Tagged 0 / Moved 0" or similar zero-count summaries if low.count(" 0") >= 2 and ("tagged" in low or "moved" in low or "drafted" in low): return False return True async def action_summarize_emails(owner: str, **kwargs) -> Tuple[str, bool]: """Run one pass of email summary background processing.""" try: from routes.email_pollers import _run_auto_summarize_once result = await _run_auto_summarize_once(do_summary=True, do_reply=False) if not _result_has_work(result): raise TaskNoop(f"summarize: {result or 'no new emails'}") return result, True except Exception as e: logger.error(f"summarize_emails action failed: {e}") return str(e), False async def action_draft_email_replies(owner: str, **kwargs) -> Tuple[str, bool]: """Run one pass of AI reply drafting.""" try: from routes.email_pollers import _run_auto_summarize_once result = await _run_auto_summarize_once( do_summary=False, do_reply=True, days_back=7, progress_cb=kwargs.get("progress_cb"), ) if not _result_has_work(result): raise TaskNoop(f"draft replies: {result or 'no new emails'}") return result, True except Exception as e: logger.error(f"draft_email_replies action failed: {e}") return str(e), False _TYPE_COLORS = { "work": "#5b8abf", # blue "personal": "#a07ae0", # purple "health": "#e06c75", # red "travel": "#e5a33a", # orange "meal": "#d8b974", # tan "social": "#82c882", # green "admin": "#888888", # gray "other": "#6b9cb5", # default } _HEURISTIC_TYPES = { "health": ["doctor", "dentist", "clinic", "hospital", "appointment", "checkup", "therapy", "physio", "chiropract", "vaccine", "blood test", "xray", "scan", "surgery"], "travel": ["flight", "airport", "train", "shinkansen", "boarding", "uber", "taxi", "trip", "hotel", "airbnb", "depart", "arrival", "check-in", "checkout"], "meal": ["lunch", "dinner", "breakfast", "brunch", "coffee", "drinks", "restaurant", "reservation", "bar", "cafe"], "social": ["birthday", "party", "hangout", "wedding", "date with", "drinks with", "anniversary", "baby shower", "graduation", "picnic", "bbq"], "admin": ["bill", "renewal", "tax", "deadline", "filing", "submit", "due date", "registration", "license", "passport", "visa", "form"], "work": ["meeting", "standup", "sync", "1:1", "1on1", "review", "interview", "demo", "presentation", "kickoff", "retro", "all-hands", "town hall", "call with", "client", "deck"], } _HEURISTIC_HIGH = ["flight", "interview", "wedding", "surgery", "exam", "deadline", "court", "presentation", "demo", "kickoff", "launch"] _HEURISTIC_CRITICAL = ["surgery", "court", "wedding day", "funeral", "delivery date"] def _classify_event_heuristic(summary: str) -> tuple: """Quick heuristic classification — returns (event_type, importance) or (None, None) if unclear.""" s = (summary if isinstance(summary, str) else "").lower() etype = None for t, kws in _HEURISTIC_TYPES.items(): if any(k in s for k in kws): etype = t break if any(k in s for k in _HEURISTIC_CRITICAL): return etype, "critical" if any(k in s for k in _HEURISTIC_HIGH): return etype, "high" return etype, None def _memory_context_lines(mems, limit: int = 40) -> list: """Render Memory rows into short personal-context bullets for event classify. Reads the Memory ORM `text` column. The previous inline code read a non-existent `content` attribute, so it raised AttributeError on the first row, the surrounding except swallowed it, and the classifier ran with no personal context at all. getattr keeps it robust to future schema drift. """ lines: list = [] for m in mems: c = (getattr(m, "text", "") or "").strip() if c: lines.append(f"- {c[:200]}") if len(lines) >= limit: break return lines async def action_classify_events(owner: str, **kwargs) -> Tuple[str, bool]: """Hybrid classification of upcoming calendar events: fast heuristic for obvious cases, LLM fallback for ambiguous ones. Assigns event_type + importance + color. Re-classifies anything not already set.""" try: from datetime import timedelta from core.database import SessionLocal, CalendarEvent from src.endpoint_resolver import resolve_endpoint from src.llm_core import llm_call_async import re as _re, json as _json db = SessionLocal() try: now = datetime.utcnow() horizon = now + timedelta(days=30) events = db.query(CalendarEvent).filter( CalendarEvent.dtstart >= now, CalendarEvent.dtstart <= horizon, CalendarEvent.status != "cancelled", ).all() if not events: return "No upcoming events to classify", True llm_url, llm_model, llm_headers = resolve_endpoint("utility", owner=owner) if not llm_url: llm_url, llm_model, llm_headers = resolve_endpoint("default", owner=owner) llm_available = bool(llm_url and llm_model) # Pull user memories so the LLM has personal context (relationships, # job, hobbies). Helps it know e.g. " is your spouse" so their # events are personal/social, not work. _memory_context = "" try: from core.database import Memory as _Mem _mems = db.query(_Mem).filter(_Mem.owner == owner).limit(60).all() if owner else [] _lines = _memory_context_lines(_mems) if _lines: _memory_context = "USER CONTEXT (relationships, work, life):\n" + "\n".join(_lines) + "\n\n" except Exception as _me: logger.warning(f"Could not load memory for classify: {_me}") classified_h = 0 classified_llm = 0 failed = 0 unchanged = 0 # Pass 1: heuristic for obvious cases, collect ambiguous for LLM batch llm_queue = [] # list of CalendarEvent objects needing LLM for ev in events: if ev.event_type and ev.importance and ev.importance != "normal": unchanged += 1 continue etype, importance = _classify_event_heuristic(ev.summary or "") if etype and importance: ev.event_type = etype ev.color = _TYPE_COLORS.get(etype) ev.importance = importance classified_h += 1 continue # Apply partial heuristic; queue for LLM to fill missing if etype: ev.event_type = etype ev.color = _TYPE_COLORS.get(etype) if llm_available: llm_queue.append(ev) elif etype: classified_h += 1 # Persist heuristic results before LLM pass (in case LLM is slow/unavailable) try: db.commit() except Exception: pass # Pass 2: batch LLM classification (10 events per call) BATCH = 10 for i in range(0, len(llm_queue), BATCH): batch = llm_queue[i:i+BATCH] items = [ {"i": idx, "title": (ev.summary or "")[:120], "when": ev.dtstart.isoformat() if ev.dtstart else "", "loc": (ev.location or "")[:80]} for idx, ev in enumerate(batch) ] prompt = ( _memory_context + "Classify these calendar events using the USER CONTEXT above (people they know, " "their job, hobbies). Return ONLY a raw JSON array, no prose, no markdown.\n" "Each item: {\"i\": , \"type\": \"work|personal|health|travel|meal|social|admin|other\", " "\"importance\": \"low|normal|high|critical\"}\n\n" "Type guidance:\n" "- personal = family, partner, kids, pets, errands, home stuff\n" "- social = friends, parties, birthdays, hangouts\n" "- work = the user's own job/career commitments only (not their partner's)\n" "- health = doctor, gym, therapy\n" "- travel = flights, trips, hotels\n" "- meal = lunch/dinner/coffee specifically\n" "- admin = bills, taxes, paperwork\n" "- other = anything else\n\n" "Importance guide: critical = surgery/court/wedding day; high = flight/interview/big presentation/exam; " "normal = regular meetings/appointments; low = recurring routine.\n\n" f"EVENTS: {_json.dumps(items)}" ) try: raw = await llm_call_async( url=llm_url, model=llm_model, messages=[{"role": "user", "content": prompt}], temperature=0.1, max_tokens=16384, headers=llm_headers, timeout=180, ) from src.text_helpers import strip_think as _st raw = _st(raw or "", prose=False, prompt_echo=False) raw = _re.sub(r"^```(?:json)?\s*|\s*```$", "", raw, flags=_re.MULTILINE).strip() m = _re.search(r"\[.*\]", raw, _re.DOTALL) if not m: logger.warning(f"[classify-llm] no JSON array in response: {raw[:300]!r}") failed += len(batch) continue arr = _json.loads(m.group()) by_idx = {x.get("i"): x for x in arr if isinstance(x, dict)} for idx, ev in enumerate(batch): x = by_idx.get(idx) if not x: failed += 1 continue t = (x.get("type") or "other").lower() imp = (x.get("importance") or "normal").lower() if t in _TYPE_COLORS: ev.event_type = t ev.color = _TYPE_COLORS[t] if imp in ("low", "normal", "high", "critical"): ev.importance = imp classified_llm += 1 logger.info(f"[classify-llm] '{ev.summary}' → type={t} importance={imp}") except Exception as e: logger.warning(f"[classify-llm] batch failed: {e}") failed += len(batch) # Commit after each batch so partial progress persists try: db.commit() except Exception as ce: logger.warning(f"[classify-llm] commit failed: {ce}") # Final commit covers heuristic-only updates from pass 1 db.commit() parts = [f"Scanned {len(events)} upcoming event(s)"] if classified_h: parts.append(f"{classified_h} via heuristic") if classified_llm: parts.append(f"{classified_llm} via LLM") if unchanged: parts.append(f"{unchanged} already set (skipped)") if failed: parts.append(f"{failed} LLM failed") return " · ".join(parts), True finally: db.close() except Exception as e: logger.error(f"classify_events action failed: {e}") return str(e), False async def action_ping_events(owner: str, **kwargs) -> Tuple[str, bool]: """Calendar event reminders are now dispatched by Notes.""" raise TaskNoop("calendar event reminders are handled by Notes") async def action_extract_email_events(owner: str, **kwargs) -> Tuple[str, bool]: """Scan recent emails for booking confirmations / meetings / events and auto-add them to the calendar.""" import asyncio as _aio try: from routes.email_pollers import _run_auto_summarize_once try: # Hard wall-clock budget: 5 min total. Per-LLM call already has its own timeout. result = await _aio.wait_for( _run_auto_summarize_once( do_summary=False, do_reply=False, do_calendar=True, days_back=3, ), timeout=300, ) if not _result_has_work(result): raise TaskNoop(f"email→calendar: {result or 'no new emails'}") return f"{result} (3d window)", True except _aio.TimeoutError: return "Email→calendar pass exceeded 5 min budget — try fewer emails or a faster model", False except Exception as e: logger.error(f"extract_email_events action failed: {e}") return str(e), False # Sender local-parts (matched exactly or by prefix) whose mail never carries a # personal signature worth learning. These compare against the local-part # (before "@"), so role names must NOT include a trailing "@" — "support@" etc. # could never match a local-part of "support" and were silently dead. _SIG_SKIP_PREFIXES = ( "noreply", "no-reply", "donotreply", "do-not-reply", "mailer-daemon", "notifications", "notification", "bounce", "newsletter", "support", "info", "admin", ) async def action_learn_sender_signatures(owner: str, **kwargs) -> Tuple[str, bool]: """For each sender with ≥3 recent inbox emails, ask the LLM to extract the common signature block across their messages. The cached sig is served on the `/read` endpoint so the renderer can fold signatures consistently from that address (no more heuristic regex juggling). Caps at 20 senders per pass; re-runs after 30 days per sender.""" try: import sqlite3 as _sql3 import re as _re import email as _email_mod import asyncio as _aio from datetime import datetime as _dt, timedelta as _td from routes.email_helpers import _imap_connect, SCHEDULED_DB from src.endpoint_resolver import resolve_endpoint from src.llm_core import llm_call_async # 1. Pull recent UIDs + From headers cheaply (header-only fetch). def _pull_headers(): results = [] conn = _imap_connect(None) try: conn.select("INBOX", readonly=True) status, data = conn.search(None, "ALL") if status != "OK" or not data or not data[0]: return results uids = data[0].split()[-300:][::-1] # newest 300 for uid in uids: try: st, msg_data = conn.fetch( uid, "(BODY.PEEK[HEADER.FIELDS (FROM)])" ) if st != "OK" or not msg_data or not msg_data[0]: continue raw = msg_data[0][1] if isinstance(msg_data[0], tuple) else None if not raw: continue msg = _email_mod.message_from_bytes(raw) from_raw = msg.get("From", "") from_addr = _email_mod.utils.parseaddr(from_raw)[1].lower().strip() if not from_addr or "@" not in from_addr: continue results.append({ "uid": uid.decode() if isinstance(uid, bytes) else str(uid), "from_address": from_addr, }) except Exception: continue finally: try: conn.logout() except Exception: pass return results mails = await _aio.to_thread(_pull_headers) if not mails: return "No emails to scan", True # 2. Group by sender; drop addresses that don't carry useful sigs. by_sender: dict[str, list[dict]] = {} for m in mails: addr = m["from_address"] local = addr.split("@", 1)[0] if any(local == p or local.startswith(p) for p in _SIG_SKIP_PREFIXES): continue # Skip plus-aliases / list-style addresses too. if "+" in local or "-noreply" in addr or "-bounces" in addr: continue by_sender.setdefault(addr, []).append(m) # 3. Eligibility: ≥3 emails AND (no cache OR cache > 30 days old). try: conn = _sql3.connect(SCHEDULED_DB) cached = { r[0]: r[1] for r in conn.execute( "SELECT from_address, last_built_at FROM sender_signatures" ).fetchall() } conn.close() except Exception: cached = {} cutoff_iso = (_dt.utcnow() - _td(days=30)).isoformat() eligible: list[tuple[str, list[dict]]] = [] for addr, msgs in by_sender.items(): if len(msgs) < 3: continue if cached.get(addr, "") > cutoff_iso: continue eligible.append((addr, msgs[:5])) # use up to last 5 emails if not eligible: return "All sender sigs already cached (or no eligible senders)", True url, model, headers = resolve_endpoint("utility", owner=owner) if not url or not model: url, model, headers = resolve_endpoint("default", owner=owner) if not url or not model: return "No LLM endpoint available", False analyzed = 0 no_sig = 0 for addr, msgs in eligible[:20]: # cost cap per run def _fetch_bodies(_msgs): bodies = [] conn2 = _imap_connect(None) try: conn2.select("INBOX", readonly=True) for mm in _msgs: try: st, data = conn2.fetch(mm["uid"], "(BODY.PEEK[TEXT])") if st != "OK" or not data or not data[0]: continue raw = data[0][1] if isinstance(data[0], tuple) else None if not raw: continue text = raw.decode("utf-8", errors="replace") bodies.append(text[:4000]) except Exception: continue finally: try: conn2.logout() except Exception: pass return bodies try: bodies = await _aio.to_thread(_fetch_bodies, msgs) except Exception as e: logger.warning(f"sig learner: fetch bodies failed for {addr}: {e}") continue if len(bodies) < 2: continue joined = "\n\n---NEXT EMAIL---\n\n".join(bodies[:5]) prompt = ( "You are extracting the literal common SIGNATURE block that " "appears at the END of multiple emails from the same sender.\n\n" "Return ONLY the exact signature text, verbatim, with original " "line breaks preserved. If there is no clear common signature " "block across these emails, respond with the single token: " "NONE\n\n" "INCLUDE: title, company, address, phone, email/url lines, " "legal disclaimer block.\n" "EXCLUDE: greetings ('Hi', 'Dear'), closing phrases on their " "own ('Best regards'), the sender's name on its own line, the " "body content, quoted/forwarded threads (lines starting with " "'>' or 'On ... wrote:' or 'From: ... Sent:').\n\n" f"EMAILS FROM {addr}:\n{joined}" ) try: raw = await llm_call_async( url=url, model=model, messages=[{"role": "user", "content": prompt}], temperature=0.0, max_tokens=600, headers=headers, timeout=60, ) from src.text_helpers import strip_think as _st sig = _st(raw or "", prose=False, prompt_echo=False).strip() # Strip surrounding code fences if the LLM added them. sig = _re.sub(r"^```[\w]*\n?", "", sig) sig = _re.sub(r"\n?```\s*$", "", sig) sig = sig.strip() except Exception as e: logger.warning(f"sig LLM call failed for {addr}: {e}") continue # NONE sentinel or out-of-bounds → cache a NULL row so we don't # re-try for 30 days, then move on. if ( not sig or sig.upper().strip().strip(".") == "NONE" or len(sig) < 15 or len(sig) > 3000 ): cached_sig: str | None = None no_sig += 1 else: cached_sig = sig try: conn = _sql3.connect(SCHEDULED_DB) conn.execute( "INSERT OR REPLACE INTO sender_signatures " "(from_address, signature_text, sample_count, last_built_at, model_used, source) " "VALUES (?, ?, ?, ?, ?, ?)", (addr, cached_sig, len(bodies), _dt.utcnow().isoformat(), model, "llm"), ) conn.commit() conn.close() analyzed += 1 except Exception as e: logger.warning(f"sig cache write failed for {addr}: {e}") return f"Learned sigs: {analyzed - no_sig} found, {no_sig} no-sig, of {len(eligible)} eligible", True except Exception as e: logger.error(f"learn_sender_signatures failed: {e}") return str(e), False async def action_daily_brief(owner: str, **kwargs) -> Tuple[str, bool]: """Build a short morning digest: today's calendar events, unread email count + top-N senders/subjects, active todos.""" try: from datetime import datetime as _dt, timedelta as _td import json as _json from core.database import SessionLocal, CalendarEvent, CalendarCal, Note from routes.email_helpers import _imap_connect, _decode_header # ----- Calendar: today's events ----- today = _dt.now().replace(hour=0, minute=0, second=0, microsecond=0) tomorrow = today + _td(days=1) # v2 review HIGH-12: gate the OR-null branch on single-user # (unconfigured) deploys only. In a multi-user deploy, one # user's daily brief must not include another user's notes or # events that happen to be stored with owner=None. try: from core.auth import AuthManager _allow_null = not AuthManager().is_configured except Exception: _allow_null = False db = SessionLocal() try: ev_q = db.query(CalendarEvent).join(CalendarCal).filter( CalendarEvent.dtstart < tomorrow, CalendarEvent.dtend > today, CalendarEvent.status != "cancelled", ) if owner: ev_q = owner_filter(ev_q, CalendarCal, owner, include_shared=_allow_null) events = ev_q.order_by(CalendarEvent.dtstart).all() # ----- Notes: pinned + non-archived todos with at least one undone item ----- n_q = db.query(Note).filter(Note.archived == False) # noqa: E712 if owner: n_q = owner_filter(n_q, Note, owner, include_shared=_allow_null) notes = n_q.all() finally: db.close() # ----- Email: unread count + top 5 inbox subjects (best-effort) ----- # Direct IMAP: cheaper than the full _list_emails_sync helper and # avoids the module/import coupling that broke this once already. unread_count = 0 recent_subjects: list[tuple[str, str]] = [] try: import email as _email conn = _imap_connect(None) try: conn.select("INBOX", readonly=True) status, data = conn.search(None, "UNSEEN") uids = (data[0].split() if status == "OK" and data and data[0] else []) unread_count = len(uids) # Grab headers for the most recent 5 unread (UIDs increase with arrival) for uid in uids[-5:][::-1]: try: _, msg_data = conn.fetch(uid, "(BODY.PEEK[HEADER.FIELDS (FROM SUBJECT)])") if not msg_data or not msg_data[0]: continue hdr = msg_data[0][1] if isinstance(msg_data[0], tuple) else msg_data[0] parsed = _email.message_from_bytes(hdr) subject = _decode_header(parsed.get("Subject") or "") or "(no subject)" from_raw = _decode_header(parsed.get("From") or "") or "?" # Extract just the display name if "Name " form if "<" in from_raw: name = from_raw.split("<", 1)[0].strip().strip('"') or from_raw else: name = from_raw recent_subjects.append((name, subject)) except Exception as fe: logger.debug(f"daily_brief: header fetch for uid {uid} failed: {fe}") finally: try: conn.logout() except Exception: pass except Exception as ee: logger.debug(f"daily_brief: email fetch failed: {ee}") # Pull active todo items from notes todo_lines: list[str] = [] for n in notes: if n.note_type == "checklist" and n.items: try: items = _json.loads(n.items) pending = [it.get("text", "") for it in items if not it.get("done")] for t in pending[:3]: if t: todo_lines.append(f"{n.title or 'Checklist'}: {t}") except Exception: continue elif n.pinned and n.title: todo_lines.append(n.title) # ----- Compose ----- # %-d is GNU-only; format the day with str() so the brief works on # Windows / non-glibc Python builds too. date_label = today.strftime(f"%A, %B {today.day}, %Y") plain = [f"Daily brief — {date_label}", ""] if events: plain.append("Calendar:") for e in events: t = e.dtstart.strftime("%H:%M") if not e.all_day else "all day" loc = f" @ {e.location}" if e.location else "" plain.append(f" {t} {e.summary}{loc}") plain.append("") else: plain.append("Calendar: nothing scheduled.") plain.append("") plain.append(f"Email: {unread_count} unread") for sender, subj in recent_subjects: plain.append(f" · {sender} — {subj}") plain.append("") if todo_lines: plain.append("Todos:") for t in todo_lines[:10]: plain.append(f" · {t}") else: plain.append("Todos: none active.") plain_body = "\n".join(plain) return plain_body, True except Exception as e: logger.error(f"daily_brief action failed: {e}") return str(e), False async def action_test_skills(owner: str, **kwargs) -> Tuple[str, bool]: """Run the per-skill Test on every skill: agent runs the procedure in a sandbox, LLM judges the transcript, verdict is recorded on the skill. ADVISORY ONLY — only writes set_audit (never rewrites SKILL.md, never demotes status, never overrides confidence).""" try: from services.memory.skills import SkillsManager from src.constants import DATA_DIR from routes.skills_routes import _run_skill_test_once, _skill_test_task from src.endpoint_resolver import resolve_endpoint # #3 SCOPE GUARD: refuse to run on a None/empty owner — otherwise # `sm.load(owner=None)` returns every user's skills and we'd cross- # test (and write audit verdicts to) other users' data in a # multi-user deployment. if not owner: return "test_skills requires an owner on the task — refusing to run without scope.", False sm = SkillsManager(DATA_DIR) skills = sm.load(owner=owner) names = [s.get("name") for s in skills if s.get("name")] if not names: raise TaskNoop("no skills to test") url, model, headers = resolve_endpoint("default", owner=owner) if not url or not model: return "No Default/Utility model configured — set one in Settings.", False # #2 NO SILENT MODEL SWAP: if the configured model isn't served by the # endpoint, try a basename match — but fail loudly instead of grabbing # `avail[0]` which could be an embedding-only model and produce 36 # garbage transcripts → 36 'unknown' verdicts with no hint why. try: from src.llm_core import list_model_ids avail = list_model_ids(url, headers=headers) if avail and model not in avail: import os as _os base = _os.path.basename((model or "").rstrip("/")) m = next((a for a in avail if _os.path.basename(a.rstrip("/")) == base), None) if m: model = m else: return (f"Default model '{model}' not served by endpoint {url}. " f"Available: {', '.join(avail[:8])}{'…' if len(avail) > 8 else ''}. " "Set a valid Default model in Settings."), False except Exception as _e: logger.warning(f"test_skills model resolve check failed (continuing): {_e}") logger.info(f"test_skills: starting on {len(names)} skills, model={model}, owner={owner!r}") from collections import Counter tally = Counter() per_skill_log = [] for skill in skills: name = skill.get("name") if not name: continue md = sm.read_skill_md(name, owner=owner) or "" if not md: tally["skipped"] += 1 per_skill_log.append(f"{name}: skipped (no SKILL.md)") continue task = _skill_test_task(skill) try: transcript, verdict = await _run_skill_test_once(md, task, url, model, headers, owner) v = (verdict or {}).get("verdict") or "unknown" tally[v] += 1 summary = (verdict or {}).get("summary") or "" tlen = len(transcript or "") detail = "" if v in ("unknown", "inconclusive", "fail", "needs_work"): bits = [] if summary: bits.append(summary[:160]) if tlen < 200: bits.append(f"transcript {tlen}b") if bits: detail = " — " + "; ".join(bits) per_skill_log.append(f"{name}: {v}{detail}") # #4 + #8 + #12: ONLY persist a real verdict (pass / needs_work / # fail / inconclusive). Skip 'unknown' — that's the judge's # "couldn't parse" sentinel, not a real result, and persisting # it pollutes the verified-badge UI. Also skip the confidence # rewrite entirely — update_skill() re-serialises SKILL.md # (contradicts "advisory only" docstring) and overwriting a # user-set value (e.g. 1.0 → 0.95) is destructive. if v in ("pass", "needs_work", "fail", "inconclusive"): try: sm.set_audit(name, v, by_teacher=False, worker_model=model, owner=owner) except Exception as _e: logger.warning(f"test_skills set_audit({name}) failed: {_e}") if v == "unknown": logger.warning(f"test_skills: {name} → unknown — {summary[:200]}; transcript_len={tlen}") except Exception as e: logger.exception(f"test_skills: {name} errored") tally["error"] += 1 per_skill_log.append(f"{name}: error — {str(e)[:200]}") parts = [] for k in ("pass", "needs_work", "fail", "inconclusive", "unknown", "skipped", "error"): if tally.get(k): parts.append(f"{tally[k]} {k}") header = f"Tested {len(names)} skill(s): " + (" · ".join(parts) or "0") # Multi-line result: summary first, then per-skill detail. The Tasks # Activity feed renders this verbatim, so the user can see per-skill # outcomes + the judge's "why" without checking uvicorn stdout. body = "\n".join(per_skill_log) return f"{header}\nmodel={model}\n\n{body}", True except TaskNoop: raise except Exception as e: logger.error(f"test_skills action failed: {e}") return str(e), False async def action_audit_skills(owner: str, **kwargs) -> Tuple[str, bool]: """Run the real skills audit pipeline for skills that have not been audited. Unlike test_skills, this uses the same audit logic as the UI Audit all flow: metadata narrowing, self-edit/retry, optional teacher rewrite, necessity tagging, and publish/draft finalization from the user's confidence threshold. """ try: from services.memory.skills import SkillsManager from src.constants import DATA_DIR from routes.skills_routes import ( _resolve_audit_models, _run_audit_all_job, _skill_audit_jobs, ) if not owner: return "audit_skills requires an owner — refusing to run without scope.", False key = (owner or "",) existing = _skill_audit_jobs.get(key) if existing and existing.get("status") == "running": raise TaskNoop("skill audit already running") sm = SkillsManager(DATA_DIR) skills = sm.load(owner=owner) names = [ s.get("name") for s in skills if s.get("name") and not s.get("audit_verdict") ] if not names: raise TaskNoop("no unaudited skills") url, model, headers, teacher = _resolve_audit_models() try: from src.llm_core import seconds_since_model_activity recent = seconds_since_model_activity(url, model) except Exception: recent = None if recent is not None and recent < (20 * 60): raise TaskDeferred( f"audit model {model} was used {int(recent)}s ago; waiting for quiet window", delay_seconds=20 * 60, ) import time as _time _skill_audit_jobs[key] = { "status": "running", "scope": "scheduled-unchecked", "model": model, "teacher": teacher[1] if teacher else None, "total": len(names), "done": 0, "current": None, "results": [], "log": [ f"Scheduled audit of {len(names)} unaudited skill(s) with {model}" + (f"; teacher {teacher[1]}" if teacher else "") ], "started": _time.time(), "cancel": False, } await _run_audit_all_job(key, sm, names, url, model, headers, teacher, owner) job = _skill_audit_jobs.get(key, {}) counts = {} for r in job.get("results", []): k = r.get("result") or "unknown" counts[k] = counts.get(k, 0) + 1 summary = " · ".join(f"{v} {k}" for k, v in sorted(counts.items())) or "0 results" return f"Audited {job.get('done', 0)}/{len(names)} unaudited skill(s): {summary}", True except TaskNoop: raise except Exception as e: logger.error(f"audit_skills action failed: {e}") return str(e), False async def action_ping_notes(owner: str, **kwargs) -> Tuple[str, bool]: """Background note-due scanner. Fires a reminder for any note whose `due_date` falls in the current ±5-minute window and hasn't been pinged within the last 25 minutes. Mirrors `action_ping_events` for calendar. State (`data/note_pings.json`): {note_id: iso_ts_of_last_ping}. Pruned on each run by dropping entries for notes that are gone/archived/replied. """ try: import json as _json import time as _time from datetime import datetime as _dt, timezone as _tz, timedelta as _td from pathlib import Path as _P from core.database import SessionLocal as _SL, Note as _N # Per-owner state file so cache-pruning doesn't cross-delete other # users' entries (review C4). Legacy path kept as fallback so a # single-user install (empty owner) doesn't lose its history. _owner_slug = "".join(c if (c.isalnum() or c in "-_.@") else "_" for c in (owner or "default")) STATE = _P(DATA_DIR) / f"note_pings_{_owner_slug}.json" STATE.parent.mkdir(parents=True, exist_ok=True) # One-time migration: if legacy global file exists and per-owner file # doesn't, seed from global (entries for OTHER owners still get pruned # on their first run — acceptable, prevents silent loss). _legacy = _P(DATA_DIR) / "note_pings.json" if _legacy.exists() and not STATE.exists(): try: STATE.write_text(_legacy.read_text(encoding="utf-8"), encoding="utf-8") except Exception: pass # Scanner ticks every 60s in _note_pings_loop. 90s window guarantees # every note's due time lands inside at least one tick's window. WINDOW_SEC = 90 REPING_MIN = 25 # don't re-ping same note more often than this def _parse_due(s: str): """Accept '2026-05-29T16:31' (local) or '...Z' (UTC). Returns UTC datetime.""" if not s: return None try: # Handle the JS-style 'Z' suffix. if s.endswith("Z"): return _dt.fromisoformat(s[:-1]).replace(tzinfo=_tz.utc) # Naive → assume local server time. d = _dt.fromisoformat(s) if d.tzinfo is None: d = d.astimezone().astimezone(_tz.utc) return d.astimezone(_tz.utc) except Exception: return None try: cache = _json.loads(STATE.read_text(encoding="utf-8")) if STATE.exists() else {} except Exception: cache = {} db = _SL() try: q = db.query(_N).filter(_N.archived == False) # noqa: E712 q = q.filter(_N.due_date.isnot(None), _N.due_date != "") if owner: # Match owner OR legacy null-owner notes (single-user installs). q = owner_filter(q, _N, owner) notes = q.all() if not notes: raise TaskNoop("no notes with due dates") now = _dt.now(_tz.utc) window = _td(seconds=WINDOW_SEC) reping_cutoff = now - _td(minutes=REPING_MIN) seen_ids = set() sent = [] for n in notes: seen_ids.add(n.id) due = _parse_due(n.due_date) if not due: continue # Inside the ±5min window? if abs((due - now).total_seconds()) > window.total_seconds(): continue # Recently pinged? Skip. last = cache.get(n.id) if last: try: if isinstance(last, dict): last = last.get("at") last_dt = _dt.fromisoformat(str(last)) if last_dt.tzinfo is None: last_dt = last_dt.replace(tzinfo=_tz.utc) if last_dt >= reping_cutoff: continue except Exception: pass # Compose + dispatch. title = (n.title or "Reminder").strip() or "Reminder" body_parts = [] if n.content: body_parts.append(n.content[:400]) # Items: list pending checklist entries inline. if n.items: try: items = _json.loads(n.items) pending = [ it.get("text", "") for it in items if not it.get("done") and not it.get("checked") ] if pending: body_parts.append("Pending:\n" + "\n".join(f"- {t}" for t in pending[:8])) except Exception: pass body = "\n\n".join(p for p in body_parts if p) or title try: from routes.note_routes import dispatch_reminder await dispatch_reminder( title=title, note_body=body, note_id=n.id, owner=n.owner or owner or "", ) cache[n.id] = now.isoformat() sent.append(title) except Exception as e: logger.warning(f"ping_notes: dispatch failed for {n.id}: {e}") # Prune cache entries for notes that no longer exist. for stale in [k for k in cache if k not in seen_ids]: cache.pop(stale, None) try: STATE.write_text(_json.dumps(cache), encoding="utf-8") except Exception as e: logger.warning(f"ping_notes: cache write failed: {e}") if not sent: raise TaskNoop(f"scanned {len(notes)} note(s), none due in ±{WINDOW_SEC}s") preview = "; ".join(sent[:3]) extra = f" (+{len(sent) - 3} more)" if len(sent) > 3 else "" return f"Pinged {len(sent)} note(s): {preview}{extra}", True finally: db.close() except TaskNoop: raise except Exception as e: logger.exception("ping_notes action failed") return str(e), False async def action_check_email_urgency(owner: str, **kwargs) -> Tuple[str, bool]: """Scan unread emails across all accounts, LLM-triage new ones, cache per-UID verdicts, tag the inbox, and fire a reminder when a previously unseen UID scores reply-soon/urgent (>=2). State persists under data/email_urgency_state_* so the UI can color the unread dot by tier. Design notes: - Only classifies emails newer than 7 days (first-run scale guard). - Cache key = `:` so the same UID across accounts doesn't collide. - Re-notify gate: only when at least one UID NEW to `notified_uids` scores ≥2. Repeat scans where the set is unchanged stay silent. """ from src.settings import load_settings try: settings = load_settings() import json as _json import email as _email_mod import asyncio as _aio import os as _os import re as _re import time as _time import httpx from datetime import datetime as _dt, timedelta as _td from pathlib import Path as _P from core.database import SessionLocal as _SL, EmailAccount as _EA from routes.email_helpers import _imap_connect, _decode_header from src.endpoint_resolver import resolve_endpoint, resolve_utility_fallback_candidates from src.llm_core import llm_call_async_with_fallback # Per-owner state file so multi-user runs don't clobber each other's # notified_uids / urgency counts. Empty owner falls back to a generic # filename for single-user installs (matches prior behaviour). _owner_slug = "".join(c if (c.isalnum() or c in "-_.@") else "_" for c in (owner or "default")) STATE_PATH = _P(DATA_DIR) / f"email_urgency_state_{_owner_slug}.json" CACHE_DIR = _P(EMAIL_URGENCY_CACHE_DIR) CACHE_DIR.mkdir(parents=True, exist_ok=True) STATE_PATH.parent.mkdir(parents=True, exist_ok=True) AGE_CUTOFF = _dt.utcnow() - _td(days=7) TRIAGE_VERSION = 3 CATEGORY_TAGS = { "newsletter", "marketing", "notification", "finance", "bills", "receipt", "travel", "security", "shopping", "social", "work", "personal", "calendar", } MANAGED_TAGS = CATEGORY_TAGS | {"urgent", "reply-soon", "promo"} # ── 1. Resolve LLM candidates (utility primary + utility fallbacks; fall # through to default chat as a last resort). url, model, headers = resolve_endpoint("utility", owner=owner) if not url or not model: url, model, headers = resolve_endpoint("default", owner=owner) if not url or not model: return "No LLM endpoint available", False candidates = [(url, model, headers)] + resolve_utility_fallback_candidates(owner=owner) # ── 2. Enumerate enabled accounts. Match this task's owner AND fall # back to the legacy "unowned account whose imap_user / from_address # == this owner" pattern — same rule `_get_email_config` uses, so a # pre-multi-user account row still gets picked up for the seeded task. db = _SL() try: from sqlalchemy import and_ as _and, or_ as _or q = db.query(_EA).filter(_EA.enabled == True) # noqa: E712 if owner: unowned = _or(_EA.owner == None, _EA.owner == "") # noqa: E711 same_mailbox = _or(_EA.imap_user == owner, _EA.from_address == owner) q = q.filter(_or(_EA.owner == owner, _and(unowned, same_mailbox))) accounts = q.all() finally: db.close() if not accounts: raise TaskNoop("no email accounts configured") urgency_prompt = settings.get("urgent_email_prompt", "") per_uid_scores = {} # key = ":" → {"score": 0-3, "reason": "..."} all_unread_keys = set() # for cache pruning llm_attempts = 0 saved_classifications = 0 failed_classifications = [] scanned = 0 # ── 3. Per-account scan: pull headers + lightweight body for new UIDs # since 7 days ago, score via LLM, cache the verdict. for acc in accounts: cache_file = CACHE_DIR / f"{acc.id}.json" try: cache = _json.loads(cache_file.read_text(encoding="utf-8")) if cache_file.exists() else {"uids": {}} except Exception: cache = {"uids": {}} def _scan_one(account=acc, cache_uids=cache.get("uids", {})): """Sync IMAP work runs in a thread.""" results = [] conn = _imap_connect(account.id) try: conn.select("INBOX", readonly=True) # IMAP date is the only practical pre-filter — UNSEEN AND # SINCE 7-days-ago. Date format is DD-Mon-YYYY. since_str = AGE_CUTOFF.strftime("%d-%b-%Y") status, data = conn.search(None, f'(UNSEEN SINCE {since_str})') if status != "OK" or not data or not data[0]: return results uids = data[0].split() for uid_b in uids: uid = uid_b.decode() if isinstance(uid_b, bytes) else str(uid_b) key = f"{account.id}:{uid}" cached = cache_uids.get(uid) cached_ok = isinstance(cached, dict) and cached.get("triage_version") == TRIAGE_VERSION results.append({"key": key, "uid": uid, "cached": cached if cached_ok else None}) if cached_ok: # Already classified — skip the fetch. continue # Pull headers + first ~800 chars of plaintext body. try: st, msg_data = conn.fetch(uid_b, "(RFC822.HEADER BODY.PEEK[TEXT]<0.800>)") if st != "OK" or not msg_data: continue # Headers + body land in different tuples in the # response — concatenate the bytes for parsing. raw = b"" for part in msg_data: if isinstance(part, tuple) and part[1]: raw += part[1] + b"\n\n" if not raw: continue msg = _email_mod.message_from_bytes(raw) # Skip Odysseus-generated reminders so the scanner # doesn't classify its own emails as urgent and # trigger a feedback loop. Match on either the # stamped headers OR the subject prefix. _ody_origin = (msg.get("X-Odysseus-Origin") or "").strip().lower() _ody_kind = (msg.get("X-Odysseus-Kind") or "").strip().lower() _raw_subj = (msg.get("Subject") or "").lower() # MCP path drops custom headers (email_server's # schema doesn't accept them), so we ALSO match the # `[Task]` subject prefix that `_deliver_via_mcp` # always stamps. Anything that looks self-generated # is dropped before classification to prevent the # scanner from labelling its own emails "urgent". if (_ody_origin == "odysseus-ui" or _ody_kind == "reminder" or _raw_subj.startswith("reminder (odysseus):") or _raw_subj.startswith("reminder:") or _raw_subj.startswith("[task]")): # Drop this candidate entirely — don't list it # in results so its UID never enters the cache # nor counts toward `scanned`. results.pop() continue subject = _decode_header(msg.get("Subject") or "") from_raw = _decode_header(msg.get("From") or "") header_blob = "\n".join( f"{name}: {msg.get(name, '')}" for name in ( "From", "Subject", "List-Unsubscribe", "List-ID", "Precedence", "X-Mailchimp-Campaign-Id", "X-Campaign", "X-MC-User", ) if msg.get(name) ) body_snippet = "" try: if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": body_snippet = part.get_payload(decode=True).decode("utf-8", errors="ignore")[:1600] break else: body_snippet = (msg.get_payload(decode=True) or b"").decode("utf-8", errors="ignore")[:1600] except Exception: body_snippet = "" results[-1].update({ "subject": subject, "from": from_raw, "headers": header_blob, "body": body_snippet.strip(), "message_id": (msg.get("Message-ID") or "").strip(), }) except Exception as _fe: logger.debug(f"urgency: header fetch for uid {uid} failed: {_fe}") finally: try: conn.logout() except Exception: pass return results try: items = await _aio.to_thread(_scan_one) except Exception as e: logger.warning(f"urgency: IMAP scan failed for account {acc.id}: {e}") continue for item in items: scanned += 1 key = item["key"] all_unread_keys.add(key) if item.get("cached"): per_uid_scores[key] = item["cached"] continue # Skip uids we couldn't fetch (no subject/from/body). if not item.get("subject") and not item.get("from"): continue # ── LLM-classify. JSON-only response; bullet-proof parse. llm_attempts += 1 prompt = ( "You are triaging ONE unread email. Return ONLY JSON: " "{\"score\":0|1|2|3,\"tags\":[\"...\"],\"spam\":false," "\"reason\":\"one short phrase\"}.\n" "0 = trivial / promotional · 1 = informational, no reply needed · " "2 = should reply within a day · 3 = urgent, reply now (deadline, blocker).\n\n" "Allowed tags: newsletter, marketing, notification, finance, bills, receipt, " "travel, security, shopping, social, work, personal, calendar.\n" "Use marketing for ads, promos, sales, offers, and cold sales. Use newsletter " "for newsletters, digests, and recurring content. spam=true for scams, phishing, " "junk, cold sales, generic ads, or no-personal-action bulk mail.\n" "Important: 'I'm outside', 'I am outside', 'waiting outside', 'at the door', " "'locked out', or 'can't get in' means score 3 unless clearly historical.\n\n" f"User's rules:\n{urgency_prompt}\n\n" f"Email:\nFrom: {item.get('from','')}\nSubject: {item.get('subject','')}\n" f"Snippet:\n{item.get('body','')}\n" ) try: raw = await llm_call_async_with_fallback( candidates, [{"role": "user", "content": prompt}], temperature=0.1, max_tokens=220, timeout=30, ) # Tolerant JSON-parse: strip code fences if present. txt = (raw or "").strip() if txt.startswith("```"): txt = txt.strip("`") # Drop a leading "json\n" or any tag. nl = txt.find("\n") if nl >= 0: txt = txt[nl + 1:] # Find first { ... } in the response. s = txt.find("{") e = txt.rfind("}") if s < 0 or e <= s: failed_classifications.append({ "subject": item.get("subject") or "(no subject)", "from": item.get("from") or "", "reason": "model returned no JSON", }) continue obj = _json.loads(txt[s:e + 1]) score = int(obj.get("score", 0)) reason = str(obj.get("reason", ""))[:200] raw_tags = obj.get("tags") or [] if isinstance(raw_tags, str): raw_tags = [raw_tags] tags = [] for t in raw_tags: if not isinstance(t, str): continue tag = t.strip().lower().replace("_", "-") if tag == "promo": tag = "marketing" if tag in CATEGORY_TAGS and tag not in tags: tags.append(tag) _spam_raw = obj.get("spam") if isinstance(_spam_raw, bool): spam = _spam_raw elif isinstance(_spam_raw, (int, float)): spam = bool(_spam_raw) else: spam = str(_spam_raw or "").strip().lower() in {"1", "true", "yes", "y"} _blob = f"{item.get('headers','')}\n{item.get('subject','')}\n{item.get('body','')}".lower() if _re.search(r"\b(i'?m|i am|im|we'?re|we are)\s+outside\b", _blob) or _re.search( r"\b(waiting outside|at the door|locked out|can'?t get in|cannot get in)\b", _blob ): if score < 3: reason = "person is waiting outside" score = max(score, 3) bulkish = bool(_re.search( r"\b(list-unsubscribe|list-id|mailchimp|mailchimpapp|view this email in your browser|unsubscribe|newsletter|digest|precedence:\s*bulk)\b", _blob, )) marketingish = bool(_re.search( r"\b(advertisement|sponsored|promo|promotion|sale|discount|offer|limited time|deal|tickets?|tour|merch|stream|purchase|sold out|low tickets|coupon|shop now|buy now)\b", _blob, )) if "newsletter" not in tags and bulkish: tags.append("newsletter") if "marketing" not in tags and marketingish: tags.append("marketing") if (bulkish or marketingish) and score < 2: score = 0 if not reason or "urgent" in reason.lower(): reason = "Bulk marketing/newsletter; no personal reply needed" # Strip "Name " to bare display name for compact summary. _from_raw = item.get("from", "") or "" if "<" in _from_raw: _from_short = _from_raw.split("<", 1)[0].strip().strip('"') or _from_raw else: _from_short = _from_raw verdict = { "score": max(0, min(3, score)), "tags": tags[:4], "spam": spam, "reason": reason, "subject": (item.get("subject") or "")[:200], "from": _from_short[:120], "triage_version": TRIAGE_VERSION, # Cache the message_id too so re-scans of already-cached # UIDs can still write the inbox tag without re-LLM'ing. "message_id": (item.get("message_id") or "").strip(), "ts": _time.time(), } cache.setdefault("uids", {})[item["uid"]] = verdict per_uid_scores[key] = verdict saved_classifications += 1 except Exception as e: failed_classifications.append({ "subject": item.get("subject") or "(no subject)", "from": item.get("from") or "", "reason": str(e)[:120] or "classification failed", }) logger.debug(f"urgency: LLM classify failed for {key}: {e}") continue # ── Prune cache entries for UIDs that are no longer unread (replied # / archived / deleted). Compare against `items` (everything UNSEEN # in this scan window). seen_uids = {it["uid"] for it in items} cache_uids = cache.get("uids", {}) for stale in [u for u in cache_uids if u not in seen_uids]: cache_uids.pop(stale, None) try: cache_file.write_text(_json.dumps(cache), encoding="utf-8") except Exception as e: logger.warning(f"urgency: cache write failed for {acc.id}: {e}") # ── 3.5 Mirror triage verdicts into email_tags so inbox filters and # pills show urgency + category tags. Runs for BOTH cached and freshly # classified items; message_id lives on the cached verdict so this is cheap. try: import sqlite3 as _sql3 from routes.email_helpers import SCHEDULED_DB, _init_scheduled_db from datetime import datetime as _dt2 _init_scheduled_db() _conn = _sql3.connect(SCHEDULED_DB) try: for _key, _v in per_uid_scores.items(): _msg_id = (_v.get("message_id") or "").strip() _score = _v.get("score", 0) if not _msg_id: continue _new_tags = [] if _score >= 3: _new_tags.append("urgent") elif _score >= 2: _new_tags.append("reply-soon") for _tag in (_v.get("tags") or []): _tag = str(_tag).strip().lower().replace("_", "-") if _tag == "promo": _tag = "marketing" if _tag in CATEGORY_TAGS and _tag not in _new_tags: _new_tags.append(_tag) _spam = 1 if _v.get("spam") else 0 # _key is ":" — extract uid for the row. _uid_only = _key.split(":", 1)[-1] _owner_key = owner or "" _row = _conn.execute( "SELECT tags FROM email_tags WHERE message_id=? AND owner=?", (_msg_id, _owner_key), ).fetchone() if _row: try: _existing = _json.loads(_row[0] or "[]") if not isinstance(_existing, list): _existing = [] except Exception: _existing = [] # Drop previous triage-owned tags so re-classification # can upgrade/downgrade/clear without touching manual tags. _existing = [ str(t).strip().lower().replace("_", "-") for t in _existing if str(t).strip().lower().replace("_", "-") not in MANAGED_TAGS ] for _tag in _new_tags: if _tag not in _existing: _existing.append(_tag) _conn.execute( "UPDATE email_tags SET tags=?, spam_verdict=?, spam_reason=?, uid=?, folder=?, subject=?, sender=? " "WHERE message_id=? AND owner=?", (_json.dumps(_existing), _spam, _v.get("reason", ""), _uid_only, "INBOX", _v.get("subject", ""), _v.get("from", ""), _msg_id, _owner_key), ) else: if not _new_tags and not _spam: continue _conn.execute( "INSERT INTO email_tags " "(message_id, owner, uid, folder, subject, sender, tags, spam_verdict, spam_reason, created_at) " "VALUES (?, ?, ?, 'INBOX', ?, ?, ?, ?, ?, ?)", (_msg_id, _owner_key, _uid_only, _v.get("subject", ""), _v.get("from", ""), _json.dumps(_new_tags), _spam, _v.get("reason", ""), _dt2.utcnow().isoformat()), ) _conn.commit() finally: _conn.close() except Exception as _te: logger.warning(f"urgency: bulk tag write failed: {_te}") # ── 4. Aggregate state. urgent = score ≥ 2. urgent_keys = [k for k, v in per_uid_scores.items() if v.get("score", 0) >= 2] max_score = max((v.get("score", 0) for v in per_uid_scores.values()), default=0) total_urgent = len(urgent_keys) # Load prior state to know which urgent UIDs we've already notified. try: prior = _json.loads(STATE_PATH.read_text(encoding="utf-8")) if STATE_PATH.exists() else {} except Exception: prior = {} notified_uids = set(prior.get("notified_uids", [])) # ── 5. Fire reminder ONLY when a previously-unnotified UID scores urgent. new_urgent = [k for k in urgent_keys if k not in notified_uids] newly_notified = set() notify_failed = set() if new_urgent: title = "Urgent email" if total_urgent == 1 else f"{total_urgent} urgent emails" # Build a real listing — subject · sender · reason for each urgent # one — so the reminder email tells you which messages to act on, # not just "4 needing reply". Optional deep-link when the user has # `app_public_url` configured in Settings (so the email row links # straight into the Odysseus Email tab). # Sort: highest-scored UIDs first; cap at 10 to keep the email tidy. sorted_urgent = sorted( ((k, per_uid_scores[k]) for k in urgent_keys), key=lambda kv: kv[1].get("score", 0), reverse=True, )[:10] _pub = (settings.get("app_public_url") or "").strip().rstrip("/") from urllib.parse import quote as _quote lines = [f"{total_urgent} email" + ("" if total_urgent == 1 else "s") + " need an urgent reply:", ""] for i, (k, v) in enumerate(sorted_urgent, 1): subj = (v.get("subject") or "(no subject)")[:160] frm = v.get("from") or "" why = v.get("reason") or "" uid_for_link = str(k).split(":", 1)[-1] hash_link = f"#email={_quote('INBOX', safe='')}:{uid_for_link}" open_link = f"{_pub}/{hash_link}" if _pub else hash_link line = f"{i}. {subj}" if frm: line += f" — {frm}" if why: line += f" · {why}" lines.append(line) lines.append(f" Open email: {open_link}") if total_urgent > len(sorted_urgent): lines.append("") lines.append(f"…and {total_urgent - len(sorted_urgent)} more.") body = "\n".join(lines) try: # Call dispatch_reminder DIRECTLY (no HTTP/auth roundtrip — the # endpoint version 401's the background scheduler because it # has no session cookie). from routes.note_routes import dispatch_reminder dispatch_result = await dispatch_reminder( title=title, note_body=body, note_id="urgent-email", owner=owner or "", ) channel = (settings.get("reminder_channel") or "browser").strip().lower() delivered = bool(dispatch_result.get("browser_sent")) if channel == "email": delivered = bool(dispatch_result.get("email_sent")) elif channel == "ntfy": delivered = bool(dispatch_result.get("ntfy_sent")) elif channel == "webhook": delivered = bool(dispatch_result.get("webhook_sent")) if delivered: newly_notified.update(new_urgent) else: notify_failed.update(new_urgent) logger.warning(f"urgency: reminder dispatch returned no successful delivery path: {dispatch_result}") except Exception as e: logger.warning(f"urgency: reminder dispatch failed: {e}") notify_failed.update(new_urgent) # Mark only successfully delivered UIDs as notified so a transient # SMTP/ntfy/browser failure retries instead of lying forever. notified_uids.update(newly_notified) # Prune notified_uids that aren't unread anymore (so a future re-urgent # message with the same UID — rare but possible after archive→unarchive # — can re-notify). Keep only UIDs still in `all_unread_keys`. notified_uids = {u for u in notified_uids if u in all_unread_keys} state = { "ts": _time.time(), "owner": owner or "", "total_unread": len(all_unread_keys), "total_urgent": total_urgent, "max_score": max_score, "per_uid": per_uid_scores, "notified_uids": sorted(notified_uids), } try: STATE_PATH.write_text(_json.dumps(state), encoding="utf-8") except Exception as e: logger.warning(f"urgency: state write failed: {e}") # ── 6. Activity-log summary — counts line on top, then per-tier # bulleted breakdown so the user can see WHICH emails ranked where # (subject · sender · reason) and which ones triggered notifications. tier_counts = {0: 0, 1: 0, 2: 0, 3: 0} for v in per_uid_scores.values(): tier_counts[v.get("score", 0)] = tier_counts.get(v.get("score", 0), 0) + 1 if scanned == 0: raise TaskNoop("no unread emails in last 7 days") head = ( f"scanned {scanned} · urgent {tier_counts[3]} · " f"reply-soon {tier_counts[2]} · info {tier_counts[1]} · trivial {tier_counts[0]} · " f"{saved_classifications} saved classifications" ) if llm_attempts != saved_classifications: head += f" · {llm_attempts - saved_classifications} failed" if newly_notified: head += f" · notified {len(newly_notified)}" if notify_failed: head += f" · notify failed {len(notify_failed)}" def _fmt_one(v, newly_notified_set, failed_set, key): subj = (v.get("subject") or "(no subject)")[:80] frm = v.get("from") or "" why = v.get("reason") or "" tag = " · *notified now*" if key in newly_notified_set else (" · *notify failed*" if key in failed_set else "") line = f"- **{subj}**" + (f" — _{frm}_" if frm else "") if why: line += f" — {why}" return line + tag # Sort each tier by reason length (longest reason first → most info). by_tier = {3: [], 2: [], 1: [], 0: []} for k, v in per_uid_scores.items(): by_tier.setdefault(v.get("score", 0), []).append((k, v)) lines = [head] tier_labels = {3: "Urgent", 2: "Reply soon", 1: "Informational", 0: "Trivial"} for tier in (3, 2, 1, 0): items_t = by_tier.get(tier, []) if not items_t: continue lines.append("") lines.append(f"**{tier_labels[tier]} ({len(items_t)}):**") # Cap each tier at 8 rows to keep the activity entry readable. for k, v in items_t[:8]: lines.append(_fmt_one(v, newly_notified, notify_failed, k)) if len(items_t) > 8: lines.append(f"…and {len(items_t) - 8} more") if failed_classifications: lines.append("") lines.append(f"**Unclassified ({len(failed_classifications)}):**") for v in failed_classifications[:8]: subj = (v.get("subject") or "(no subject)")[:80] frm = v.get("from") or "" why = v.get("reason") or "" line = f"- **{subj}**" + (f" — _{frm}_" if frm else "") if why: line += f" — {why}" lines.append(line) if len(failed_classifications) > 8: lines.append(f"…and {len(failed_classifications) - 8} more") return "\n".join(lines), True except TaskNoop: raise except Exception as e: logger.exception("check_email_urgency action failed") return str(e), False async def action_cookbook_serve( owner: str, task_name: str = "", progress_cb=None, command: str = "", **kwargs, ) -> Tuple[str, bool]: """Launch a Cookbook model serve as a scheduled task. `command` is the JSON config string the task carries in `prompt`, of shape: {"preset": "name"} OR {"repo_id": "...", "cmd": "...", "host": "..."}. Optional `end_after_min: N` schedules a hard-stop N minutes after launch (handled by cookbook_serve_lifecycle_loop in src/cookbook_serve_lifecycle.py). """ import json import time as _time import httpx from pathlib import Path from core.middleware import INTERNAL_TOOL_HEADER, INTERNAL_TOOL_TOKEN from core.atomic_io import atomic_write_json headers = {INTERNAL_TOOL_HEADER: INTERNAL_TOOL_TOKEN} try: cfg = json.loads(command or "{}") except Exception: return f"Invalid JSON config: {command!r}", False if not isinstance(cfg, dict): return "Config must be a JSON object", False # Resolve the preset (if named) OR fall through with explicit fields. preset_name = (cfg.get("preset") or "").strip() repo_id = (cfg.get("repo_id") or "").strip() cmd = (cfg.get("cmd") or "").strip() host = (cfg.get("host") or cfg.get("remote_host") or "").strip() try: end_after_min = int(cfg.get("end_after_min") or 0) except Exception: end_after_min = 0 state_path = Path(COOKBOOK_STATE_FILE) try: state = json.loads(state_path.read_text(encoding="utf-8")) if state_path.exists() else {} except Exception: state = {} # Preset lookup. Try three matching strategies in order so the # schedule still works even when the user's preset is named # differently from the model's short name: # # 1. Exact preset.name == preset_name (case-insensitive) # 2. preset.model / preset.modelId == repo_id (caller knows the repo) # 3. preset.model's short name (after final /) == preset_name # # Without #2 and #3, scheduling "Qwen3.5-397B-A17B-AWQ" failed when # the saved preset was named "vllm-qwen-397b" or had the model field # populated with the full HF repo path. Either should resolve. def _short(name: str) -> str: return (name or "").rsplit("/", 1)[-1].lower() if not cmd or not repo_id: presets = state.get("presets") or [] chosen = None # Strategy 1: exact name match. if preset_name: chosen = next( (p for p in presets if isinstance(p, dict) and (p.get("name") or "").lower() == preset_name.lower()), None, ) # Strategy 2: repo_id matches the preset's model field. if chosen is None and repo_id: chosen = next( (p for p in presets if isinstance(p, dict) and (p.get("model") or p.get("modelId") or "").lower() == repo_id.lower()), None, ) # Strategy 3: model's short name matches the preset_name. if chosen is None and preset_name: chosen = next( (p for p in presets if isinstance(p, dict) and _short(p.get("model") or p.get("modelId") or "") == preset_name.lower()), None, ) if chosen is not None: repo_id = repo_id or chosen.get("model") or chosen.get("modelId") or "" cmd = cmd or (chosen.get("cmd") or "").strip() host = host or chosen.get("host") or chosen.get("remoteHost") or "" if not repo_id or not cmd or cmd.startswith("(adopted"): # Surface what we tried so the user can name their preset to match. preset_names = [(p.get("name") or "") for p in (state.get("presets") or []) if isinstance(p, dict)] hint = f" Saved presets: {preset_names!r}" if preset_names else "" return (f"No launchable config for {preset_name!r} (repo_id={repo_id!r}). " f"Check Cookbook → Presets has a real cmd, not 'adopted'.{hint}", False) # Resolve env_prefix etc. from the host's saved cookbook server entry, # matching the chat agent's serve_model path. body = {"repo_id": repo_id, "cmd": cmd} if host: body["remote_host"] = host env = (state.get("env") or {}) srv = next( (s for s in (env.get("servers") or []) if isinstance(s, dict) and (s.get("host") == host or s.get("name") == host)), {}, ) if srv.get("env") == "venv" and srv.get("envPath"): body["env_prefix"] = f"source {srv['envPath']}/bin/activate" elif srv.get("env") == "conda" and srv.get("envPath"): body["env_prefix"] = f"conda activate {srv['envPath']}" if srv.get("hfToken"): body["hf_token"] = srv["hfToken"] if srv.get("port"): body["ssh_port"] = str(srv["port"]) if srv.get("platform"): body["platform"] = srv["platform"] try: async with httpx.AsyncClient(timeout=30) as client: r = await client.post(f"{internal_api_base()}/api/model/serve", json=body, headers=headers) data = r.json() if r.content else {} except Exception as e: return f"Launch HTTP failed: {e}", False if not data.get("ok"): return f"Launch rejected: {data.get('error') or data.get('detail') or 'unknown'}", False sid = data.get("session_id") or "" # Register the new task in cookbook_state.json + stamp it with our # scheduler-owner markers. /api/model/serve spawns the tmux session # but leaves the state-write to the UI — when a scheduled action # launches a serve from server-side, NOBODY writes the task into # state, so the Cookbook tab never shows it. We do the write here. if sid: try: # Re-read fresh (the route may have updated state already). try: fresh = json.loads(state_path.read_text(encoding="utf-8")) except Exception: fresh = {} if not isinstance(fresh, dict): fresh = {} tasks = fresh.get("tasks") if isinstance(fresh.get("tasks"), list) else [] existing = next( (t for t in tasks if isinstance(t, dict) and t.get("sessionId") == sid), None, ) if existing is None: display_name = repo_id.split("/")[-1] if "/" in repo_id else repo_id placeholder = ( f"Launched by scheduled task {task_name!r} — waiting for tmux output…\n" f" session: {sid}\n" f" target: {host or 'local'}\n" f" cmd: {cmd[:200]}{'…' if len(cmd) > 200 else ''}" ) existing = { "id": sid, "sessionId": sid, "name": display_name, "modelId": repo_id, "type": "serve", "status": "running", "output": placeholder, "ts": int(_time.time() * 1000), "payload": {"repo_id": repo_id, "remote_host": host or "", "_cmd": cmd}, "remoteHost": host or "", "sshPort": "", "platform": "linux", "_serveReady": False, "_endpointAdded": False, } tasks.append(existing) # Stamp ownership + end-at on the task entry. existing["_scheduledByTask"] = task_name or "" existing["_scheduledByOwner"] = owner or "" if end_after_min > 0: existing["_scheduledStopAtMs"] = int(_time.time() * 1000) + end_after_min * 60 * 1000 fresh["tasks"] = tasks atomic_write_json(state_path, fresh) except Exception as e: logger.warning(f"cookbook_serve: state register/stamp failed: {e}") # Don't try to render absolute clock time in the message — the # server runs in UTC (Docker default), the user reads it as local, # and the offset depends on the user's TZ which the action doesn't # have a reliable handle on. The Tasks UI already shows the RUN # timestamp in the user's local time right above this message, so # "stops 8 min after that" gives the user everything they need. if end_after_min: return ( f"Launched {repo_id} (session {sid}); stops {end_after_min} min after this ran", True, ) return f"Launched {repo_id} (session {sid})", True BUILTIN_ACTIONS = { "tidy_sessions": action_tidy_sessions, "tidy_documents": action_tidy_documents, "consolidate_memory": action_consolidate_memory, "tidy_research": action_tidy_research, "summarize_emails": action_summarize_emails, "draft_email_replies": action_draft_email_replies, "extract_email_events": action_extract_email_events, "classify_events": action_classify_events, # ping_events removed from the user-facing registry. Calendar reminders # are represented as Notes, so note pings are the single dispatch path. "daily_brief": action_daily_brief, "learn_sender_signatures": action_learn_sender_signatures, "ssh_command": action_ssh_command, "run_script": action_run_script, "run_local": action_run_local, "test_skills": action_test_skills, "audit_skills": action_audit_skills, "check_email_urgency": action_check_email_urgency, "cookbook_serve": action_cookbook_serve, # ping_notes removed from the registry — runs only inside `_note_pings_loop`. } # Descriptions for the UI/API BUILTIN_ACTION_INFO = { "tidy_sessions": "Clean up empty chat sessions and auto-sort into folders", "tidy_documents": "Remove junk/empty documents", "consolidate_memory": "Remove duplicate memories", "tidy_research": "Remove orphaned research files (sessions that were deleted)", "summarize_emails": "Pre-generate AI summaries for new inbox emails", "draft_email_replies": "Pre-draft AI reply suggestions for new inbox emails", "extract_email_events": "Scan emails for booking/meeting confirmations and auto-add to calendar", "classify_events": "Tag upcoming events with importance (low/normal/high/critical) and type (work/health/travel/etc.); colors them too", "daily_brief": "Build a morning digest: today's calendar, unread email count + top senders, active todos", "learn_sender_signatures": "LLM learns each sender's signature from 3+ of their recent emails; cached per address so future renders fold sigs reliably without heuristics", "ssh_command": "Run a shell command on a local or remote host", "run_script": "Run a script locally or on ODYSSEUS_SCRIPT_HOST", "test_skills": "Run the per-skill Test on every skill: agent run + LLM judge → records verdict on the skill (pass/needs_work/fail/inconclusive). Advisory only — never rewrites or demotes anything.", "audit_skills": "Audit unaudited skills after enough new skills are added: test, narrow metadata, self-edit/retry, optional teacher rewrite, tag duplicates/trivial skills, and publish/draft using the auto-approve threshold.", "check_email_urgency": "Scan unread emails hourly, tag urgent/reply-soon/newsletter/marketing/spam, and send a reminder when a new email needs a fast reply.", }