feat: full CRUD API + MCP tools for thoughts
Some checks failed
ci/woodpecker/push/build Pipeline failed

REST endpoints:
  GET    /v1/thoughts              — list with ?source=, ?metadata_id=, ?limit=, ?offset=
  GET    /v1/thoughts/{id}         — get by UUID (404 if not found)
  PATCH  /v1/thoughts/{id}         — update content/metadata, re-embeds if content changes
  DELETE /v1/thoughts/{id}         — delete by UUID (204 / 404)
  DELETE /v1/thoughts              — bulk delete by ?source= and/or ?metadata_id=

MCP tools (mirrors REST):
  get(thought_id)
  update(thought_id, content, metadata)
  delete(thought_id)
  delete_where(source, metadata_id)
  list_thoughts(source, metadata_id, limit, offset)

Internal refactor:
  - _row_to_thought() helper eliminates repeated Thought construction
  - UpdateRequest model with all-optional fields (PATCH semantics)
  - UUID validation on all by-id operations returns 404 cleanly

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-02 20:36:42 -06:00
parent c02d1d8974
commit 8a90e78016
3 changed files with 310 additions and 12 deletions

View File

@@ -1,15 +1,27 @@
"""Core brain operations — capture, search, recent, stats."""
"""Core brain operations — full CRUD, search, recent, stats."""
import json
import uuid
from src import db, embeddings
from src.models import CaptureRequest, SearchRequest, SearchResult, Stats, Thought
from src.models import CaptureRequest, SearchRequest, SearchResult, Stats, Thought, UpdateRequest
def _meta(raw) -> dict:
return json.loads(raw) if isinstance(raw, str) else dict(raw)
def _row_to_thought(row) -> Thought:
return Thought(
id=row["id"],
content=row["content"],
source=row["source"],
metadata=_meta(row["metadata"]),
created_at=row["created_at"],
embedded=row["embedded"],
)
async def capture(req: CaptureRequest) -> Thought:
pool = await db.get_pool()
embedding = await embeddings.embed(req.content)
@@ -42,14 +54,174 @@ async def capture(req: CaptureRequest) -> Thought:
json.dumps(req.metadata),
)
return Thought(
id=row["id"],
content=row["content"],
source=row["source"],
metadata=_meta(row["metadata"]),
created_at=row["created_at"],
embedded=row["embedded"],
)
return _row_to_thought(row)
async def get_by_id(thought_id: str) -> Thought | None:
try:
uid = uuid.UUID(thought_id)
except ValueError:
return None
pool = await db.get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT id::text, content, source, metadata, created_at,
embedding IS NOT NULL AS embedded
FROM thoughts WHERE id = $1
""",
uid,
)
return _row_to_thought(row) if row else None
async def update(thought_id: str, req: UpdateRequest) -> Thought | None:
try:
uid = uuid.UUID(thought_id)
except ValueError:
return None
pool = await db.get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT id::text, content, source, metadata, created_at,
embedding IS NOT NULL AS embedded
FROM thoughts WHERE id = $1
""",
uid,
)
if not row:
return None
new_content = req.content if req.content is not None else row["content"]
new_source = req.source if req.source is not None else row["source"]
new_metadata = json.dumps(req.metadata) if req.metadata is not None else row["metadata"]
if req.content is not None and req.content != row["content"]:
embedding = await embeddings.embed(new_content)
if embedding is not None:
vec = f"[{','.join(str(v) for v in embedding)}]"
updated = await conn.fetchrow(
"""
UPDATE thoughts
SET content = $1, source = $2, metadata = $3::jsonb, embedding = $4::vector
WHERE id = $5
RETURNING id::text, content, source, metadata, created_at,
embedding IS NOT NULL AS embedded
""",
new_content, new_source, new_metadata, vec, uid,
)
else:
updated = await conn.fetchrow(
"""
UPDATE thoughts
SET content = $1, source = $2, metadata = $3::jsonb, embedding = NULL
WHERE id = $4
RETURNING id::text, content, source, metadata, created_at,
embedding IS NOT NULL AS embedded
""",
new_content, new_source, new_metadata, uid,
)
else:
updated = await conn.fetchrow(
"""
UPDATE thoughts
SET source = $1, metadata = $2::jsonb
WHERE id = $3
RETURNING id::text, content, source, metadata, created_at,
embedding IS NOT NULL AS embedded
""",
new_source, new_metadata, uid,
)
return _row_to_thought(updated)
async def delete(thought_id: str) -> bool:
try:
uid = uuid.UUID(thought_id)
except ValueError:
return False
pool = await db.get_pool()
async with pool.acquire() as conn:
result = await conn.execute("DELETE FROM thoughts WHERE id = $1", uid)
return result == "DELETE 1"
async def delete_by_filter(source: str | None, metadata_id: str | None) -> int:
"""Bulk delete by source and/or metadata->>'id'. Returns count deleted."""
if not source and not metadata_id:
return 0
pool = await db.get_pool()
async with pool.acquire() as conn:
if source and metadata_id:
result = await conn.execute(
"DELETE FROM thoughts WHERE source = $1 AND metadata->>'id' = $2",
source, metadata_id,
)
elif source:
result = await conn.execute("DELETE FROM thoughts WHERE source = $1", source)
else:
result = await conn.execute(
"DELETE FROM thoughts WHERE metadata->>'id' = $1", metadata_id
)
return int(result.split()[-1])
async def list_thoughts(
source: str | None = None,
metadata_id: str | None = None,
limit: int = 50,
offset: int = 0,
) -> list[Thought]:
pool = await db.get_pool()
async with pool.acquire() as conn:
if source and metadata_id:
rows = await conn.fetch(
"""
SELECT id::text, content, source, metadata, created_at,
embedding IS NOT NULL AS embedded
FROM thoughts
WHERE source = $1 AND metadata->>'id' = $2
ORDER BY created_at DESC LIMIT $3 OFFSET $4
""",
source, metadata_id, limit, offset,
)
elif source:
rows = await conn.fetch(
"""
SELECT id::text, content, source, metadata, created_at,
embedding IS NOT NULL AS embedded
FROM thoughts WHERE source = $1
ORDER BY created_at DESC LIMIT $2 OFFSET $3
""",
source, limit, offset,
)
elif metadata_id:
rows = await conn.fetch(
"""
SELECT id::text, content, source, metadata, created_at,
embedding IS NOT NULL AS embedded
FROM thoughts WHERE metadata->>'id' = $1
ORDER BY created_at DESC LIMIT $2 OFFSET $3
""",
metadata_id, limit, offset,
)
else:
rows = await conn.fetch(
"""
SELECT id::text, content, source, metadata, created_at,
embedding IS NOT NULL AS embedded
FROM thoughts
ORDER BY created_at DESC LIMIT $1 OFFSET $2
""",
limit, offset,
)
return [_row_to_thought(r) for r in rows]
async def search(req: SearchRequest) -> list[SearchResult]:

View File

@@ -3,14 +3,14 @@
import contextlib
import logging
from fastapi import Depends, FastAPI, HTTPException, Security
from fastapi import Depends, FastAPI, HTTPException, Query, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from mcp.server.fastmcp import FastMCP
from mcp.server.transport_security import TransportSecuritySettings
from src import brain, db
from src.config import settings
from src.models import CaptureRequest, SearchRequest, SearchResult, Stats, Thought
from src.models import CaptureRequest, SearchRequest, SearchResult, Stats, Thought, UpdateRequest
logging.basicConfig(level=settings.log_level.upper())
logger = logging.getLogger("openbrain")
@@ -83,6 +83,76 @@ async def stats() -> dict:
return s.model_dump(mode="json")
@mcp.tool()
async def get(thought_id: str) -> dict | None:
"""Retrieve a single thought by its UUID.
Args:
thought_id: The UUID of the thought to retrieve
"""
thought = await brain.get_by_id(thought_id)
return thought.model_dump(mode="json") if thought else None
@mcp.tool()
async def update(
thought_id: str, content: str | None = None, metadata: dict | None = None
) -> dict | None:
"""Update an existing thought. Re-embeds if content changes.
Args:
thought_id: UUID of the thought to update
content: New content (omit to keep existing)
metadata: New metadata dict (omit to keep existing)
"""
req = UpdateRequest(content=content, metadata=metadata)
thought = await brain.update(thought_id, req)
return thought.model_dump(mode="json") if thought else None
@mcp.tool()
async def delete(thought_id: str) -> bool:
"""Delete a thought by UUID.
Args:
thought_id: UUID of the thought to delete
"""
return await brain.delete(thought_id)
@mcp.tool()
async def delete_where(source: str | None = None, metadata_id: str | None = None) -> int:
"""Bulk delete thoughts by source and/or metadata id field.
Useful for cleaning up stale captures before re-importing.
At least one filter must be provided.
Args:
source: Delete all thoughts from this source (e.g. 'jarvis-brain/project')
metadata_id: Delete all thoughts where metadata.id equals this value
"""
return await brain.delete_by_filter(source, metadata_id)
@mcp.tool()
async def list_thoughts(
source: str | None = None,
metadata_id: str | None = None,
limit: int = 50,
offset: int = 0,
) -> list[dict]:
"""List thoughts with optional filtering. Returns newest first.
Args:
source: Filter by source (e.g. 'jarvis-brain/project')
metadata_id: Filter by metadata.id field value
limit: Max results (default 50)
offset: Pagination offset
"""
thoughts = await brain.list_thoughts(source, metadata_id, limit, offset)
return [t.model_dump(mode="json") for t in thoughts]
# Initialize the MCP sub-app early so session_manager is available for lifespan.
# streamable_http_app() creates a sub-app with a route at /mcp internally.
# Mounted at "/" (last in the route list), FastAPI routes take priority and
@@ -134,6 +204,56 @@ async def api_recent(limit: int = 20, _: str = Depends(require_api_key)) -> list
return await brain.recent(limit=limit)
@app.get("/v1/thoughts", response_model=list[Thought])
async def api_list(
source: str | None = Query(default=None),
metadata_id: str | None = Query(default=None),
limit: int = Query(default=50, le=500),
offset: int = Query(default=0, ge=0),
_: str = Depends(require_api_key),
) -> list[Thought]:
return await brain.list_thoughts(source, metadata_id, limit, offset)
@app.get("/v1/thoughts/{thought_id}", response_model=Thought)
async def api_get(thought_id: str, _: str = Depends(require_api_key)) -> Thought:
thought = await brain.get_by_id(thought_id)
if not thought:
raise HTTPException(status_code=404, detail="Thought not found")
return thought
@app.patch("/v1/thoughts/{thought_id}", response_model=Thought)
async def api_update(
thought_id: str, req: UpdateRequest, _: str = Depends(require_api_key)
) -> Thought:
thought = await brain.update(thought_id, req)
if not thought:
raise HTTPException(status_code=404, detail="Thought not found")
return thought
@app.delete("/v1/thoughts", status_code=200)
async def api_delete_where(
source: str | None = Query(default=None),
metadata_id: str | None = Query(default=None),
_: str = Depends(require_api_key),
) -> dict:
if not source and not metadata_id:
raise HTTPException(
status_code=422, detail="At least one filter (source or metadata_id) is required"
)
count = await brain.delete_by_filter(source, metadata_id)
return {"deleted": count}
@app.delete("/v1/thoughts/{thought_id}", status_code=204)
async def api_delete(thought_id: str, _: str = Depends(require_api_key)) -> None:
deleted = await brain.delete(thought_id)
if not deleted:
raise HTTPException(status_code=404, detail="Thought not found")
@app.get("/v1/stats", response_model=Stats)
async def api_stats(_: str = Depends(require_api_key)) -> Stats:
return await brain.stats()

View File

@@ -10,6 +10,12 @@ class CaptureRequest(BaseModel):
metadata: dict[str, Any] = {}
class UpdateRequest(BaseModel):
content: str | None = None
source: str | None = None
metadata: dict[str, Any] | None = None
class Thought(BaseModel):
id: str
content: str