From f2a79aaf5c424a547330ce1d8f60230d8806317c Mon Sep 17 00:00:00 2001 From: Vykos Date: Sun, 7 Jun 2026 12:50:10 +0200 Subject: [PATCH] Tighten manage notes owner checks (#3002) --- src/tool_implementations.py | 28 ++++-- tests/test_manage_notes_owner_gate.py | 120 ++++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 6 deletions(-) create mode 100644 tests/test_manage_notes_owner_gate.py diff --git a/src/tool_implementations.py b/src/tool_implementations.py index 62ac23a08..e589652f0 100644 --- a/src/tool_implementations.py +++ b/src/tool_implementations.py @@ -1828,6 +1828,22 @@ async def do_manage_notes(content: str, owner: Optional[str] = None) -> Dict: text = re.sub(r"^\s*reminder\s*:\s*", "", text) return re.sub(r"\s+", " ", text) + def _note_visible_to_owner(note, owner_value: Optional[str]) -> bool: + # Empty owner_value is single-user / auth-disabled mode. A real + # authenticated owner must match exactly; null/empty legacy rows are not + # shared between accounts. + if not owner_value: + return True + return getattr(note, "owner", None) == owner_value + + def _note_by_prefix(note_id: str): + if not note_id: + return None + q = db.query(Note).filter(Note.id.startswith(note_id)) + if owner: + q = q.filter(Note.owner == owner) + return q.first() + try: if action == "list": q = db.query(Note) @@ -1947,10 +1963,10 @@ async def do_manage_notes(content: str, owner: Optional[str] = None) -> Dict: elif action == "update": note_id = args.get("id", "") - note = db.query(Note).filter(Note.id.startswith(note_id)).first() if note_id else None + note = _note_by_prefix(note_id) if not note: return {"error": f"Note '{note_id}' not found", "exit_code": 1} - if owner is not None and note.owner and note.owner != owner: + if not _note_visible_to_owner(note, owner): return {"error": "Note not found", "exit_code": 1} for field in ("title", "content", "note_type", "color", "label"): if field in args and args[field] is not None: @@ -1983,10 +1999,10 @@ async def do_manage_notes(content: str, owner: Optional[str] = None) -> Dict: elif action == "delete": note_id = args.get("id", "") - note = db.query(Note).filter(Note.id.startswith(note_id)).first() if note_id else None + note = _note_by_prefix(note_id) if not note: return {"error": f"Note '{note_id}' not found", "exit_code": 1} - if owner is not None and note.owner and note.owner != owner: + if not _note_visible_to_owner(note, owner): return {"error": "Note not found", "exit_code": 1} title = note.title db.delete(note) @@ -1996,10 +2012,10 @@ async def do_manage_notes(content: str, owner: Optional[str] = None) -> Dict: elif action == "toggle_item": note_id = args.get("id", "") index = args.get("index", 0) - note = db.query(Note).filter(Note.id.startswith(note_id)).first() if note_id else None + note = _note_by_prefix(note_id) if not note: return {"error": f"Note '{note_id}' not found", "exit_code": 1} - if owner is not None and note.owner and note.owner != owner: + if not _note_visible_to_owner(note, owner): return {"error": "Note not found", "exit_code": 1} if not note.items: return {"error": "Note has no checklist items", "exit_code": 1} diff --git a/tests/test_manage_notes_owner_gate.py b/tests/test_manage_notes_owner_gate.py new file mode 100644 index 000000000..37329b9c1 --- /dev/null +++ b/tests/test_manage_notes_owner_gate.py @@ -0,0 +1,120 @@ +import asyncio +import json +import sys +import types +from types import SimpleNamespace +from unittest.mock import MagicMock + +from src import tool_implementations + + +class _Query: + def __init__(self, note): + self.note = note + + def filter(self, *args, **kwargs): + return self + + def first(self): + return self.note + + +class _Db: + def __init__(self, note): + self.note = note + self.deleted = [] + self.commits = 0 + + def query(self, *args, **kwargs): + return _Query(self.note) + + def delete(self, note): + self.deleted.append(note) + + def commit(self): + self.commits += 1 + + def rollback(self): + pass + + def close(self): + pass + + +def _install_fakes(monkeypatch, note): + fake_sa_attrs = types.ModuleType("sqlalchemy.orm.attributes") + fake_sa_attrs.flag_modified = lambda *args, **kwargs: None + monkeypatch.setitem(sys.modules, "sqlalchemy.orm.attributes", fake_sa_attrs) + + db = _Db(note) + fake_core_db = types.ModuleType("core.database") + fake_core_db.SessionLocal = lambda: db + fake_core_db.Note = MagicMock() + monkeypatch.setitem(sys.modules, "core.database", fake_core_db) + return db + + +def _run(args, owner="alice"): + return asyncio.run(tool_implementations.do_manage_notes(json.dumps(args), owner=owner)) + + +def _note(owner=None, **overrides): + data = { + "id": "abc12345-existing", + "owner": owner, + "title": "Original", + "content": "", + "note_type": "note", + "color": None, + "label": None, + "items": '[{"text":"item","done":false}]', + "pinned": False, + "archived": False, + "due_date": None, + } + data.update(overrides) + return SimpleNamespace(**data) + + +def test_update_rejects_legacy_null_owner_for_authenticated_owner(monkeypatch): + note = _note(owner=None) + db = _install_fakes(monkeypatch, note) + + result = _run({"action": "update", "id": "abc12345", "title": "Changed"}) + + assert result == {"error": "Note not found", "exit_code": 1} + assert note.title == "Original" + assert db.commits == 0 + + +def test_delete_rejects_legacy_empty_owner_for_authenticated_owner(monkeypatch): + note = _note(owner="") + db = _install_fakes(monkeypatch, note) + + result = _run({"action": "delete", "id": "abc12345"}) + + assert result == {"error": "Note not found", "exit_code": 1} + assert db.deleted == [] + assert db.commits == 0 + + +def test_toggle_rejects_other_owner(monkeypatch): + note = _note(owner="bob") + db = _install_fakes(monkeypatch, note) + + result = _run({"action": "toggle_item", "id": "abc12345", "index": 0}) + + assert result == {"error": "Note not found", "exit_code": 1} + assert json.loads(note.items)[0]["done"] is False + assert db.commits == 0 + + +def test_update_allows_matching_owner(monkeypatch): + note = _note(owner="alice") + db = _install_fakes(monkeypatch, note) + + result = _run({"action": "update", "id": "abc12345", "title": "Changed"}) + + assert result["exit_code"] == 0 + assert note.title == "Changed" + assert db.commits == 1