diff --git a/src/brain.py b/src/brain.py index 876fb98..eed7b16 100644 --- a/src/brain.py +++ b/src/brain.py @@ -1,15 +1,27 @@ -"""Core brain operations — capture, search, recent, stats.""" +"""Core brain operations — full CRUD, search, recent, stats.""" import json +import uuid from src import db, embeddings -from src.models import CaptureRequest, SearchRequest, SearchResult, Stats, Thought +from src.models import CaptureRequest, SearchRequest, SearchResult, Stats, Thought, UpdateRequest def _meta(raw) -> dict: return json.loads(raw) if isinstance(raw, str) else dict(raw) +def _row_to_thought(row) -> Thought: + return Thought( + id=row["id"], + content=row["content"], + source=row["source"], + metadata=_meta(row["metadata"]), + created_at=row["created_at"], + embedded=row["embedded"], + ) + + async def capture(req: CaptureRequest) -> Thought: pool = await db.get_pool() embedding = await embeddings.embed(req.content) @@ -42,14 +54,174 @@ async def capture(req: CaptureRequest) -> Thought: json.dumps(req.metadata), ) - return Thought( - id=row["id"], - content=row["content"], - source=row["source"], - metadata=_meta(row["metadata"]), - created_at=row["created_at"], - embedded=row["embedded"], - ) + return _row_to_thought(row) + + +async def get_by_id(thought_id: str) -> Thought | None: + try: + uid = uuid.UUID(thought_id) + except ValueError: + return None + + pool = await db.get_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT id::text, content, source, metadata, created_at, + embedding IS NOT NULL AS embedded + FROM thoughts WHERE id = $1 + """, + uid, + ) + return _row_to_thought(row) if row else None + + +async def update(thought_id: str, req: UpdateRequest) -> Thought | None: + try: + uid = uuid.UUID(thought_id) + except ValueError: + return None + + pool = await db.get_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT id::text, content, source, metadata, created_at, + embedding IS NOT NULL AS embedded + FROM thoughts WHERE id = $1 + """, + uid, + ) + if not row: + return None + + new_content = req.content if req.content is not None else row["content"] + new_source = req.source if req.source is not None else row["source"] + new_metadata = json.dumps(req.metadata) if req.metadata is not None else row["metadata"] + + if req.content is not None and req.content != row["content"]: + embedding = await embeddings.embed(new_content) + if embedding is not None: + vec = f"[{','.join(str(v) for v in embedding)}]" + updated = await conn.fetchrow( + """ + UPDATE thoughts + SET content = $1, source = $2, metadata = $3::jsonb, embedding = $4::vector + WHERE id = $5 + RETURNING id::text, content, source, metadata, created_at, + embedding IS NOT NULL AS embedded + """, + new_content, new_source, new_metadata, vec, uid, + ) + else: + updated = await conn.fetchrow( + """ + UPDATE thoughts + SET content = $1, source = $2, metadata = $3::jsonb, embedding = NULL + WHERE id = $4 + RETURNING id::text, content, source, metadata, created_at, + embedding IS NOT NULL AS embedded + """, + new_content, new_source, new_metadata, uid, + ) + else: + updated = await conn.fetchrow( + """ + UPDATE thoughts + SET source = $1, metadata = $2::jsonb + WHERE id = $3 + RETURNING id::text, content, source, metadata, created_at, + embedding IS NOT NULL AS embedded + """, + new_source, new_metadata, uid, + ) + + return _row_to_thought(updated) + + +async def delete(thought_id: str) -> bool: + try: + uid = uuid.UUID(thought_id) + except ValueError: + return False + + pool = await db.get_pool() + async with pool.acquire() as conn: + result = await conn.execute("DELETE FROM thoughts WHERE id = $1", uid) + return result == "DELETE 1" + + +async def delete_by_filter(source: str | None, metadata_id: str | None) -> int: + """Bulk delete by source and/or metadata->>'id'. Returns count deleted.""" + if not source and not metadata_id: + return 0 + + pool = await db.get_pool() + async with pool.acquire() as conn: + if source and metadata_id: + result = await conn.execute( + "DELETE FROM thoughts WHERE source = $1 AND metadata->>'id' = $2", + source, metadata_id, + ) + elif source: + result = await conn.execute("DELETE FROM thoughts WHERE source = $1", source) + else: + result = await conn.execute( + "DELETE FROM thoughts WHERE metadata->>'id' = $1", metadata_id + ) + return int(result.split()[-1]) + + +async def list_thoughts( + source: str | None = None, + metadata_id: str | None = None, + limit: int = 50, + offset: int = 0, +) -> list[Thought]: + pool = await db.get_pool() + async with pool.acquire() as conn: + if source and metadata_id: + rows = await conn.fetch( + """ + SELECT id::text, content, source, metadata, created_at, + embedding IS NOT NULL AS embedded + FROM thoughts + WHERE source = $1 AND metadata->>'id' = $2 + ORDER BY created_at DESC LIMIT $3 OFFSET $4 + """, + source, metadata_id, limit, offset, + ) + elif source: + rows = await conn.fetch( + """ + SELECT id::text, content, source, metadata, created_at, + embedding IS NOT NULL AS embedded + FROM thoughts WHERE source = $1 + ORDER BY created_at DESC LIMIT $2 OFFSET $3 + """, + source, limit, offset, + ) + elif metadata_id: + rows = await conn.fetch( + """ + SELECT id::text, content, source, metadata, created_at, + embedding IS NOT NULL AS embedded + FROM thoughts WHERE metadata->>'id' = $1 + ORDER BY created_at DESC LIMIT $2 OFFSET $3 + """, + metadata_id, limit, offset, + ) + else: + rows = await conn.fetch( + """ + SELECT id::text, content, source, metadata, created_at, + embedding IS NOT NULL AS embedded + FROM thoughts + ORDER BY created_at DESC LIMIT $1 OFFSET $2 + """, + limit, offset, + ) + return [_row_to_thought(r) for r in rows] async def search(req: SearchRequest) -> list[SearchResult]: diff --git a/src/main.py b/src/main.py index 94f7999..5f09134 100644 --- a/src/main.py +++ b/src/main.py @@ -3,14 +3,14 @@ import contextlib import logging -from fastapi import Depends, FastAPI, HTTPException, Security +from fastapi import Depends, FastAPI, HTTPException, Query, Security from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from mcp.server.fastmcp import FastMCP from mcp.server.transport_security import TransportSecuritySettings from src import brain, db from src.config import settings -from src.models import CaptureRequest, SearchRequest, SearchResult, Stats, Thought +from src.models import CaptureRequest, SearchRequest, SearchResult, Stats, Thought, UpdateRequest logging.basicConfig(level=settings.log_level.upper()) logger = logging.getLogger("openbrain") @@ -83,6 +83,76 @@ async def stats() -> dict: return s.model_dump(mode="json") +@mcp.tool() +async def get(thought_id: str) -> dict | None: + """Retrieve a single thought by its UUID. + + Args: + thought_id: The UUID of the thought to retrieve + """ + thought = await brain.get_by_id(thought_id) + return thought.model_dump(mode="json") if thought else None + + +@mcp.tool() +async def update( + thought_id: str, content: str | None = None, metadata: dict | None = None +) -> dict | None: + """Update an existing thought. Re-embeds if content changes. + + Args: + thought_id: UUID of the thought to update + content: New content (omit to keep existing) + metadata: New metadata dict (omit to keep existing) + """ + req = UpdateRequest(content=content, metadata=metadata) + thought = await brain.update(thought_id, req) + return thought.model_dump(mode="json") if thought else None + + +@mcp.tool() +async def delete(thought_id: str) -> bool: + """Delete a thought by UUID. + + Args: + thought_id: UUID of the thought to delete + """ + return await brain.delete(thought_id) + + +@mcp.tool() +async def delete_where(source: str | None = None, metadata_id: str | None = None) -> int: + """Bulk delete thoughts by source and/or metadata id field. + + Useful for cleaning up stale captures before re-importing. + At least one filter must be provided. + + Args: + source: Delete all thoughts from this source (e.g. 'jarvis-brain/project') + metadata_id: Delete all thoughts where metadata.id equals this value + """ + return await brain.delete_by_filter(source, metadata_id) + + +@mcp.tool() +async def list_thoughts( + source: str | None = None, + metadata_id: str | None = None, + limit: int = 50, + offset: int = 0, +) -> list[dict]: + """List thoughts with optional filtering. Returns newest first. + + Args: + source: Filter by source (e.g. 'jarvis-brain/project') + metadata_id: Filter by metadata.id field value + limit: Max results (default 50) + offset: Pagination offset + """ + thoughts = await brain.list_thoughts(source, metadata_id, limit, offset) + return [t.model_dump(mode="json") for t in thoughts] + + # Initialize the MCP sub-app early so session_manager is available for lifespan. # streamable_http_app() creates a sub-app with a route at /mcp internally. # Mounted at "/" (last in the route list), FastAPI routes take priority and @@ -134,6 +204,56 @@ async def api_recent(limit: int = 20, _: str = Depends(require_api_key)) -> list return await brain.recent(limit=limit) +@app.get("/v1/thoughts", response_model=list[Thought]) +async def api_list( + source: str | None = Query(default=None), + metadata_id: str | None = Query(default=None), + limit: int = Query(default=50, le=500), + offset: int = Query(default=0, ge=0), + _: str = Depends(require_api_key), +) -> list[Thought]: + return await brain.list_thoughts(source, metadata_id, limit, offset) + + +@app.get("/v1/thoughts/{thought_id}", response_model=Thought) +async def api_get(thought_id: str, _: str = Depends(require_api_key)) -> Thought: + thought = await brain.get_by_id(thought_id) + if not thought: + raise HTTPException(status_code=404, detail="Thought not found") + return thought + + +@app.patch("/v1/thoughts/{thought_id}", response_model=Thought) +async def api_update( + thought_id: str, req: UpdateRequest, _: str = Depends(require_api_key) +) -> Thought: + thought = await brain.update(thought_id, req) + if not thought: + raise HTTPException(status_code=404, detail="Thought not found") + return thought + + +@app.delete("/v1/thoughts", status_code=200) +async def api_delete_where( + source: str | None = Query(default=None), + metadata_id: str | None = Query(default=None), + _: str = Depends(require_api_key), +) -> dict: + if not source and not metadata_id: + raise HTTPException( + status_code=422, detail="At least one filter (source or metadata_id) is required" + ) + count = await brain.delete_by_filter(source, metadata_id) + return {"deleted": count} + + +@app.delete("/v1/thoughts/{thought_id}", status_code=204) +async def api_delete(thought_id: str, _: str = Depends(require_api_key)) -> None: + deleted = await brain.delete(thought_id) + if not deleted: + raise HTTPException(status_code=404, detail="Thought not found") + + @app.get("/v1/stats", response_model=Stats) async def api_stats(_: str = Depends(require_api_key)) -> Stats: return await brain.stats() diff --git a/src/models.py b/src/models.py index d8deeb6..9cc3b4b 100644 --- a/src/models.py +++ b/src/models.py @@ -10,6 +10,12 @@ class CaptureRequest(BaseModel): metadata: dict[str, Any] = {} +class UpdateRequest(BaseModel): + content: str | None = None + source: str | None = None + metadata: dict[str, Any] | None = None + + class Thought(BaseModel): id: str content: str