mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-15 17:25:26 -04:00
Tighten manage notes owner checks (#3002)
This commit is contained in:
@@ -1828,6 +1828,22 @@ async def do_manage_notes(content: str, owner: Optional[str] = None) -> Dict:
|
||||
text = re.sub(r"^\s*reminder\s*:\s*", "", text)
|
||||
return re.sub(r"\s+", " ", text)
|
||||
|
||||
def _note_visible_to_owner(note, owner_value: Optional[str]) -> bool:
|
||||
# Empty owner_value is single-user / auth-disabled mode. A real
|
||||
# authenticated owner must match exactly; null/empty legacy rows are not
|
||||
# shared between accounts.
|
||||
if not owner_value:
|
||||
return True
|
||||
return getattr(note, "owner", None) == owner_value
|
||||
|
||||
def _note_by_prefix(note_id: str):
|
||||
if not note_id:
|
||||
return None
|
||||
q = db.query(Note).filter(Note.id.startswith(note_id))
|
||||
if owner:
|
||||
q = q.filter(Note.owner == owner)
|
||||
return q.first()
|
||||
|
||||
try:
|
||||
if action == "list":
|
||||
q = db.query(Note)
|
||||
@@ -1947,10 +1963,10 @@ async def do_manage_notes(content: str, owner: Optional[str] = None) -> Dict:
|
||||
|
||||
elif action == "update":
|
||||
note_id = args.get("id", "")
|
||||
note = db.query(Note).filter(Note.id.startswith(note_id)).first() if note_id else None
|
||||
note = _note_by_prefix(note_id)
|
||||
if not note:
|
||||
return {"error": f"Note '{note_id}' not found", "exit_code": 1}
|
||||
if owner is not None and note.owner and note.owner != owner:
|
||||
if not _note_visible_to_owner(note, owner):
|
||||
return {"error": "Note not found", "exit_code": 1}
|
||||
for field in ("title", "content", "note_type", "color", "label"):
|
||||
if field in args and args[field] is not None:
|
||||
@@ -1983,10 +1999,10 @@ async def do_manage_notes(content: str, owner: Optional[str] = None) -> Dict:
|
||||
|
||||
elif action == "delete":
|
||||
note_id = args.get("id", "")
|
||||
note = db.query(Note).filter(Note.id.startswith(note_id)).first() if note_id else None
|
||||
note = _note_by_prefix(note_id)
|
||||
if not note:
|
||||
return {"error": f"Note '{note_id}' not found", "exit_code": 1}
|
||||
if owner is not None and note.owner and note.owner != owner:
|
||||
if not _note_visible_to_owner(note, owner):
|
||||
return {"error": "Note not found", "exit_code": 1}
|
||||
title = note.title
|
||||
db.delete(note)
|
||||
@@ -1996,10 +2012,10 @@ async def do_manage_notes(content: str, owner: Optional[str] = None) -> Dict:
|
||||
elif action == "toggle_item":
|
||||
note_id = args.get("id", "")
|
||||
index = args.get("index", 0)
|
||||
note = db.query(Note).filter(Note.id.startswith(note_id)).first() if note_id else None
|
||||
note = _note_by_prefix(note_id)
|
||||
if not note:
|
||||
return {"error": f"Note '{note_id}' not found", "exit_code": 1}
|
||||
if owner is not None and note.owner and note.owner != owner:
|
||||
if not _note_visible_to_owner(note, owner):
|
||||
return {"error": "Note not found", "exit_code": 1}
|
||||
if not note.items:
|
||||
return {"error": "Note has no checklist items", "exit_code": 1}
|
||||
|
||||
@@ -0,0 +1,120 @@
|
||||
import asyncio
|
||||
import json
|
||||
import sys
|
||||
import types
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from src import tool_implementations
|
||||
|
||||
|
||||
class _Query:
|
||||
def __init__(self, note):
|
||||
self.note = note
|
||||
|
||||
def filter(self, *args, **kwargs):
|
||||
return self
|
||||
|
||||
def first(self):
|
||||
return self.note
|
||||
|
||||
|
||||
class _Db:
|
||||
def __init__(self, note):
|
||||
self.note = note
|
||||
self.deleted = []
|
||||
self.commits = 0
|
||||
|
||||
def query(self, *args, **kwargs):
|
||||
return _Query(self.note)
|
||||
|
||||
def delete(self, note):
|
||||
self.deleted.append(note)
|
||||
|
||||
def commit(self):
|
||||
self.commits += 1
|
||||
|
||||
def rollback(self):
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
def _install_fakes(monkeypatch, note):
|
||||
fake_sa_attrs = types.ModuleType("sqlalchemy.orm.attributes")
|
||||
fake_sa_attrs.flag_modified = lambda *args, **kwargs: None
|
||||
monkeypatch.setitem(sys.modules, "sqlalchemy.orm.attributes", fake_sa_attrs)
|
||||
|
||||
db = _Db(note)
|
||||
fake_core_db = types.ModuleType("core.database")
|
||||
fake_core_db.SessionLocal = lambda: db
|
||||
fake_core_db.Note = MagicMock()
|
||||
monkeypatch.setitem(sys.modules, "core.database", fake_core_db)
|
||||
return db
|
||||
|
||||
|
||||
def _run(args, owner="alice"):
|
||||
return asyncio.run(tool_implementations.do_manage_notes(json.dumps(args), owner=owner))
|
||||
|
||||
|
||||
def _note(owner=None, **overrides):
|
||||
data = {
|
||||
"id": "abc12345-existing",
|
||||
"owner": owner,
|
||||
"title": "Original",
|
||||
"content": "",
|
||||
"note_type": "note",
|
||||
"color": None,
|
||||
"label": None,
|
||||
"items": '[{"text":"item","done":false}]',
|
||||
"pinned": False,
|
||||
"archived": False,
|
||||
"due_date": None,
|
||||
}
|
||||
data.update(overrides)
|
||||
return SimpleNamespace(**data)
|
||||
|
||||
|
||||
def test_update_rejects_legacy_null_owner_for_authenticated_owner(monkeypatch):
|
||||
note = _note(owner=None)
|
||||
db = _install_fakes(monkeypatch, note)
|
||||
|
||||
result = _run({"action": "update", "id": "abc12345", "title": "Changed"})
|
||||
|
||||
assert result == {"error": "Note not found", "exit_code": 1}
|
||||
assert note.title == "Original"
|
||||
assert db.commits == 0
|
||||
|
||||
|
||||
def test_delete_rejects_legacy_empty_owner_for_authenticated_owner(monkeypatch):
|
||||
note = _note(owner="")
|
||||
db = _install_fakes(monkeypatch, note)
|
||||
|
||||
result = _run({"action": "delete", "id": "abc12345"})
|
||||
|
||||
assert result == {"error": "Note not found", "exit_code": 1}
|
||||
assert db.deleted == []
|
||||
assert db.commits == 0
|
||||
|
||||
|
||||
def test_toggle_rejects_other_owner(monkeypatch):
|
||||
note = _note(owner="bob")
|
||||
db = _install_fakes(monkeypatch, note)
|
||||
|
||||
result = _run({"action": "toggle_item", "id": "abc12345", "index": 0})
|
||||
|
||||
assert result == {"error": "Note not found", "exit_code": 1}
|
||||
assert json.loads(note.items)[0]["done"] is False
|
||||
assert db.commits == 0
|
||||
|
||||
|
||||
def test_update_allows_matching_owner(monkeypatch):
|
||||
note = _note(owner="alice")
|
||||
db = _install_fakes(monkeypatch, note)
|
||||
|
||||
result = _run({"action": "update", "id": "abc12345", "title": "Changed"})
|
||||
|
||||
assert result["exit_code"] == 0
|
||||
assert note.title == "Changed"
|
||||
assert db.commits == 1
|
||||
Reference in New Issue
Block a user