feat: initial MALS implementation — Phase 1

This commit is contained in:
2026-03-20 07:59:43 -05:00
commit dc1258a5cc
28 changed files with 2972 additions and 0 deletions

31
.env.example Normal file
View File

@@ -0,0 +1,31 @@
# =============================================================================
# MALS — Mosaic Agent Log System — Environment Configuration
# Copy this file to .env and fill in the required values.
# =============================================================================
# --- Required ---
# PostgreSQL password (no default — must be set)
POSTGRES_PASSWORD=change-me-in-production
# API bearer token — all clients must send: Authorization: Bearer <MALS_API_KEY>
MALS_API_KEY=change-me-in-production
# --- Optional (defaults shown) ---
# Database name and user
POSTGRES_DB=mals
POSTGRES_USER=mals
# Port the PostgreSQL container exposes on the host
PG_PORT=5434
# Port the API container exposes on the host
MALS_PORT=8421
# Log verbosity: DEBUG | INFO | WARNING | ERROR | CRITICAL
LOG_LEVEL=INFO
# For Portainer/Swarm only: your domain and Docker image tag
# MALS_DOMAIN=mals.yourdomain.com
# MALS_IMAGE=registry.yourdomain.com/mals:latest

10
.gitignore vendored Normal file
View File

@@ -0,0 +1,10 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.12

28
Dockerfile Normal file
View File

@@ -0,0 +1,28 @@
FROM python:3.12-slim
# Install curl for healthcheck
RUN apt-get update && apt-get install -y --no-install-recommends curl && rm -rf /var/lib/apt/lists/*
# Install uv
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
WORKDIR /app
# Copy dependency files first (layer caching)
COPY pyproject.toml uv.lock* ./
# Install dependencies (no dev extras)
RUN uv sync --frozen --no-dev 2>/dev/null || uv sync --no-dev
# Copy source
COPY src/ ./src/
COPY alembic/ ./alembic/
COPY alembic.ini ./
# Run as non-root
RUN adduser --disabled-password --gecos "" appuser && chown -R appuser /app
USER appuser
EXPOSE 8421
CMD ["uv", "run", "uvicorn", "mals.main:app", "--host", "0.0.0.0", "--port", "8421"]

204
README.md Normal file
View File

@@ -0,0 +1,204 @@
# MALS — Mosaic Agent Log System
**Phase 1 — Standalone Service**
MALS is a lightweight, centralized structured logging service for AI agents. It provides a FastAPI REST API backed by PostgreSQL, allowing any agent or bot to ship structured log entries (with levels, categories, metadata, and trace IDs) and query them later for debugging, alerting, and health summaries. It is designed to be embedded into [Mosaic Stack](https://git.mosaicstack.dev/jason.woltje/mosaic-mono-v1) as a future integration, but is fully functional as a standalone service.
---
## Quick Start
### 1. Clone and configure
```bash
git clone https://git.mosaicstack.dev/jason.woltje/mals.git
cd mals
cp .env.example .env
```
Edit `.env` and fill in **at minimum**:
```
POSTGRES_PASSWORD=your-strong-password-here
MALS_API_KEY=your-secret-api-key-here
```
### 2. Start services
```bash
docker compose up -d
```
### 3. Run migrations
```bash
docker compose exec api alembic upgrade head
```
### 4. Verify health
```bash
curl http://localhost:8421/health
# → {"status":"ok","db":"connected","version":"0.1.0"}
```
---
## Using the Python Client
Install the package (once published, or from source):
```bash
pip install mals
# or from source:
pip install -e ./path/to/mals
```
### Async usage
```python
import asyncio
import os
from mals.client import MALSClient
client = MALSClient(
base_url=os.environ["MALS_URL"], # e.g. http://10.1.1.45:8421
api_key=os.environ["MALS_API_KEY"],
agent_id="crypto", # identifies your service
session_key="agent:crypto:discord:...", # optional — for session correlation
source="openclaw", # optional — default "api"
)
async def main():
# Info log
await client.log("Bot started", level="info", category="lifecycle")
# Error with traceback capture
try:
raise RuntimeError("Flash loan failed")
except Exception as exc:
await client.error("Unexpected error in main loop", exc=exc, category="trading")
# Batch log
await client.batch([
{"message": "Order placed", "level": "info", "metadata": {"pair": "ETH/USDC"}},
{"message": "Order filled", "level": "info", "metadata": {"pair": "ETH/USDC"}},
])
# Summary for this agent
summary = await client.summary(since_hours=24)
print(summary["by_agent"])
asyncio.run(main())
```
### Synchronous usage (from non-async code)
```python
client.sync_log("Bot restarted", level="warn", category="lifecycle")
client.sync_error("DB connection lost", exc=exc)
```
---
## API Reference
All routes require `Authorization: Bearer <MALS_API_KEY>` except `GET /health`.
### `GET /health`
No auth. Returns service status.
### `POST /logs`
Ingest a single log entry. Returns `{"id": "uuid", "created_at": "..."}`.
**Body fields:**
| Field | Type | Required | Description |
|---|---|---|---|
| `agent_id` | string | ✅ | Agent or service name (max 64 chars) |
| `message` | string | ✅ | Log message |
| `level` | string | — | `debug\|info\|warn\|error\|critical` (default: `info`) |
| `category` | string | — | Freeform tag (e.g. `deploy`, `trading`) |
| `session_key` | string | — | Agent session identifier |
| `source` | string | — | Source system (default: `api`) |
| `metadata` | object | — | Arbitrary JSON metadata |
| `trace_id` | UUID | — | Distributed tracing ID |
| `parent_id` | UUID | — | Parent log entry ID for hierarchical traces |
### `POST /logs/batch`
Array of log entries. Returns `{"inserted": N}`.
### `GET /logs`
List log entries. Query params: `agent_id`, `level`, `category`, `source`, `resolved` (bool), `since` (ISO datetime), `until`, `search` (text in message), `limit` (11000, default 100), `offset`.
Returns `{"total": N, "items": [...]}`.
### `GET /logs/summary`
Params: `since` (ISO datetime, default 24h ago), `agent_id` (optional filter).
Returns error counts by agent, unresolved errors, new errors (not seen in previous period), and recurring errors (same message 3+ times).
### `GET /logs/agents`
List all distinct agents with last activity timestamp and 24h error count.
### `PATCH /logs/{id}/resolve`
Body: `{"resolved_by": "jarvis"}`. Marks entry resolved.
### `POST /logs/resolve-batch`
Body: `{"ids": ["uuid1", ...], "resolved_by": "jarvis"}`. Bulk resolve.
---
## Portainer Deployment (w-docker0)
1. Build and push the image to your registry (or build on the host):
```bash
docker build -t mals:latest .
```
2. In Portainer → Stacks → Add Stack, paste `docker-compose.portainer.yml`.
3. Set environment variables in the Portainer UI (or via a `.env` secrets file):
- `POSTGRES_PASSWORD`
- `MALS_API_KEY`
- `MALS_DOMAIN` (if using Traefik)
4. Deploy. After the first start, run migrations:
```bash
docker exec <api-container-id> alembic upgrade head
```
---
## Development
```bash
# Install with dev dependencies
uv sync --dev
# Run tests
uv run pytest tests/ -v
# Lint
uv run ruff check src/
# Type check
uv run mypy src/
# Run locally (needs a running Postgres)
DATABASE_URL=postgresql+asyncpg://mals:mals@localhost:5434/mals \
MALS_API_KEY=dev-key \
uv run uvicorn mals.main:app --reload
```
---
## Future: Mosaic Stack Integration
MALS is designed for eventual integration into the Mosaic Stack platform:
- The Python client will become an internal SDK used by all Mosaic agents
- The FastAPI service will be mounted as a sub-application in the Mosaic NestJS gateway (via a sidecar or internal network call)
- The `/logs/summary` endpoint will feed the Mosaic Mission Control dashboard
- Log entries will be linked to Mosaic sessions via `session_key`
No changes to the schema or API contract are expected for this integration — the standalone Phase 1 design is integration-ready.

38
alembic.ini Normal file
View File

@@ -0,0 +1,38 @@
[alembic]
script_location = alembic
prepend_sys_path = .
sqlalchemy.url = %(DATABASE_URL)s
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

59
alembic/env.py Normal file
View File

@@ -0,0 +1,59 @@
"""Alembic environment — uses sync psycopg2 driver for migrations."""
from __future__ import annotations
import os
from logging.config import fileConfig
from alembic import context
from sqlalchemy import create_engine
# this is the Alembic Config object
config = context.config
# Interpret the config file for Python logging.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Read DATABASE_URL from environment (preferred) or alembic.ini
database_url = os.environ.get("DATABASE_URL") or config.get_main_option("sqlalchemy.url")
if not database_url:
raise RuntimeError(
"mals/alembic: DATABASE_URL is required. "
"Set it as an environment variable or in alembic.ini."
)
# Convert asyncpg URL to psycopg2 (sync) for Alembic
sync_url = (
database_url.replace("postgresql+asyncpg://", "postgresql://")
.replace("asyncpg://", "postgresql://")
)
target_metadata = None
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode (generate SQL only)."""
context.configure(
url=sync_url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations against a live database."""
connectable = create_engine(sync_url)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,78 @@
"""Initial schema — agent_logs table with indexes.
Revision ID: 001
Revises:
Create Date: 2026-03-20
"""
from __future__ import annotations
from alembic import op
revision = "001"
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
op.execute("""
CREATE TABLE IF NOT EXISTS agent_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
-- Source
agent_id VARCHAR(64) NOT NULL,
session_key VARCHAR(256),
source VARCHAR(64) NOT NULL DEFAULT 'api',
-- Classification
level VARCHAR(16) NOT NULL DEFAULT 'info',
category VARCHAR(64),
-- Content
message TEXT NOT NULL,
metadata JSONB DEFAULT '{}',
-- Correlation
trace_id UUID,
parent_id UUID REFERENCES agent_logs(id),
-- Resolution tracking
resolved BOOLEAN NOT NULL DEFAULT false,
resolved_at TIMESTAMPTZ,
resolved_by VARCHAR(64)
)
""")
op.execute("""
CREATE INDEX IF NOT EXISTS idx_agent_logs_agent_time
ON agent_logs (agent_id, created_at DESC)
""")
op.execute("""
CREATE INDEX IF NOT EXISTS idx_agent_logs_level_time
ON agent_logs (level, created_at DESC)
WHERE level IN ('warn', 'error', 'critical')
""")
op.execute("""
CREATE INDEX IF NOT EXISTS idx_agent_logs_unresolved
ON agent_logs (created_at DESC)
WHERE resolved = false AND level IN ('warn', 'error', 'critical')
""")
op.execute("""
CREATE INDEX IF NOT EXISTS idx_agent_logs_metadata
ON agent_logs USING GIN (metadata)
""")
op.execute("""
CREATE INDEX IF NOT EXISTS idx_agent_logs_trace
ON agent_logs (trace_id)
WHERE trace_id IS NOT NULL
""")
def downgrade() -> None:
op.execute("DROP TABLE IF EXISTS agent_logs CASCADE")

View File

@@ -0,0 +1,57 @@
# Portainer / Docker Swarm deployment variant.
# Use this stack file in Portainer's "Add Stack" UI.
# Adjust Traefik labels for your domain and router names.
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: ${POSTGRES_DB:-mals}
POSTGRES_USER: ${POSTGRES_USER:-mals}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-mals}"]
interval: 10s
timeout: 5s
retries: 5
networks:
- mals_internal
deploy:
replicas: 1
resources:
limits:
memory: 512M
api:
image: ${MALS_IMAGE:-mals:latest}
environment:
DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-mals}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB:-mals}
MALS_API_KEY: ${MALS_API_KEY}
MALS_PORT: ${MALS_PORT:-8421}
LOG_LEVEL: ${LOG_LEVEL:-INFO}
networks:
- mals_internal
- traefik_public
depends_on:
- postgres
deploy:
replicas: 1
resources:
limits:
memory: 512M
labels:
- "traefik.enable=true"
- "traefik.http.routers.mals.rule=Host(`${MALS_DOMAIN:-mals.internal}`)"
- "traefik.http.routers.mals.entrypoints=websecure"
- "traefik.http.routers.mals.tls=true"
- "traefik.http.services.mals.loadbalancer.server.port=8421"
networks:
mals_internal:
traefik_public:
external: true
volumes:
postgres_data:

37
docker-compose.yml Normal file
View File

@@ -0,0 +1,37 @@
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: ${POSTGRES_DB:-mals}
POSTGRES_USER: ${POSTGRES_USER:-mals}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-mals}"]
interval: 10s
timeout: 5s
retries: 5
ports:
- "${PG_PORT:-5434}:5432"
api:
build: .
environment:
DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-mals}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB:-mals}
MALS_API_KEY: ${MALS_API_KEY}
MALS_PORT: ${MALS_PORT:-8421}
LOG_LEVEL: ${LOG_LEVEL:-INFO}
ports:
- "${MALS_PORT:-8421}:8421"
depends_on:
postgres:
condition: service_healthy
healthcheck:
test: ["CMD-SHELL", "curl -sf http://localhost:${MALS_PORT:-8421}/health || exit 1"]
interval: 30s
timeout: 5s
retries: 3
volumes:
postgres_data:

54
pyproject.toml Normal file
View File

@@ -0,0 +1,54 @@
[project]
name = "mals"
version = "0.1.0"
description = "Mosaic Agent Log System — standalone Phase 1"
requires-python = ">=3.12"
dependencies = [
"fastapi>=0.115.0",
"uvicorn[standard]>=0.32.0",
"asyncpg>=0.30.0",
"alembic>=1.14.0",
"psycopg2-binary>=2.9.0",
"pydantic-settings>=2.6.0",
"pydantic>=2.9.0",
"httpx>=0.28.0",
"python-dotenv>=1.0.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.3.0",
"pytest-asyncio>=0.24.0",
"ruff>=0.8.0",
"mypy>=1.13.0",
"pytest-httpx>=0.35.0",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/mals"]
[tool.ruff]
line-length = 100
target-version = "py312"
[tool.ruff.lint]
select = ["E", "F", "I", "UP"]
ignore = []
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
[tool.mypy]
python_version = "3.12"
warn_return_any = true
warn_unused_configs = true
[dependency-groups]
dev = [
"ruff>=0.15.7",
]

3
src/mals/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""Mosaic Agent Log System (MALS) — Phase 1."""
__version__ = "0.1.0"

20
src/mals/auth.py Normal file
View File

@@ -0,0 +1,20 @@
"""Bearer token authentication dependency."""
from fastapi import HTTPException, Security, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from .config import settings
_bearer = HTTPBearer(auto_error=False)
async def require_api_key(
credentials: HTTPAuthorizationCredentials | None = Security(_bearer),
) -> None:
"""FastAPI dependency: verify Bearer token matches MALS_API_KEY."""
if credentials is None or credentials.credentials != settings.mals_api_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or missing API key",
headers={"WWW-Authenticate": "Bearer"},
)

View File

@@ -0,0 +1,5 @@
"""MALS Python client."""
from .client import MALSClient
__all__ = ["MALSClient"]

180
src/mals/client/client.py Normal file
View File

@@ -0,0 +1,180 @@
"""MALS Python client — usable by any external service (crypto bot, etc.)."""
from __future__ import annotations
import asyncio
import traceback
from datetime import UTC, datetime, timedelta
from typing import Any
from uuid import UUID
import httpx
class MALSClient:
"""Async client for the Mosaic Agent Log System API.
Usage::
client = MALSClient(
base_url=os.environ["MALS_URL"],
api_key=os.environ["MALS_API_KEY"],
agent_id="crypto",
)
await client.log("Bot started", level="info", category="lifecycle")
await client.error("Unexpected exit", exc=exc, category="deploy")
For synchronous code::
client.sync_log("Bot started")
"""
def __init__(
self,
base_url: str,
api_key: str,
agent_id: str,
session_key: str | None = None,
source: str = "api",
timeout: float = 10.0,
) -> None:
if not base_url:
raise ValueError("MALSClient: base_url is required")
if not api_key:
raise ValueError("MALSClient: api_key is required")
if not agent_id:
raise ValueError("MALSClient: agent_id is required")
self._base_url = base_url.rstrip("/")
self._agent_id = agent_id
self._session_key = session_key
self._source = source
self._headers = {"Authorization": f"Bearer {api_key}"}
self._timeout = timeout
# ------------------------------------------------------------------
# Core methods
# ------------------------------------------------------------------
async def log(
self,
message: str,
level: str = "info",
category: str | None = None,
metadata: dict[str, Any] | None = None,
trace_id: str | UUID | None = None,
) -> dict[str, Any]:
"""Send a single log entry. Returns the created record's id + created_at."""
payload: dict[str, Any] = {
"agent_id": self._agent_id,
"message": message,
"level": level,
"source": self._source,
}
if self._session_key:
payload["session_key"] = self._session_key
if category:
payload["category"] = category
if metadata:
payload["metadata"] = metadata
if trace_id:
payload["trace_id"] = str(trace_id)
async with httpx.AsyncClient(
base_url=self._base_url, headers=self._headers, timeout=self._timeout
) as client:
resp = client.build_request("POST", "/logs", json=payload)
response = await client.send(resp)
response.raise_for_status()
return response.json()
async def error(
self,
message: str,
exc: Exception | None = None,
category: str | None = None,
metadata: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Convenience method for error-level logs.
If *exc* is provided, its traceback is included in metadata.
"""
meta: dict[str, Any] = dict(metadata or {})
if exc is not None:
meta["exception"] = type(exc).__name__
meta["traceback"] = traceback.format_exc()
return await self.log(
message=message,
level="error",
category=category,
metadata=meta,
)
async def batch(self, entries: list[dict[str, Any]]) -> dict[str, Any]:
"""Send multiple log entries at once.
Each entry dict follows the same schema as the single-log payload,
but ``agent_id`` and ``source`` default to the client's own values.
"""
filled: list[dict[str, Any]] = []
for e in entries:
entry = dict(e)
entry.setdefault("agent_id", self._agent_id)
entry.setdefault("source", self._source)
if self._session_key:
entry.setdefault("session_key", self._session_key)
filled.append(entry)
async with httpx.AsyncClient(
base_url=self._base_url, headers=self._headers, timeout=self._timeout
) as client:
response = await client.post("/logs/batch", json=filled)
response.raise_for_status()
return response.json()
async def summary(self, since_hours: int = 24) -> dict[str, Any]:
"""Get summary for this agent over the last *since_hours* hours."""
since = datetime.now(tz=UTC) - timedelta(hours=since_hours)
params: dict[str, Any] = {
"since": since.isoformat(),
"agent_id": self._agent_id,
}
async with httpx.AsyncClient(
base_url=self._base_url, headers=self._headers, timeout=self._timeout
) as client:
response = await client.get("/logs/summary", params=params)
response.raise_for_status()
return response.json()
# ------------------------------------------------------------------
# Synchronous helpers
# ------------------------------------------------------------------
def sync_log(self, message: str, level: str = "info", **kwargs: Any) -> None:
"""Synchronous wrapper — safe to call from non-async code.
Uses :func:`asyncio.run` when no loop is running, or schedules a
fire-and-forget task when called from within a running loop.
"""
coro = self.log(message, level=level, **kwargs)
try:
loop = asyncio.get_running_loop()
# Inside an async context — schedule as a background task
loop.create_task(coro)
except RuntimeError:
# No running loop — safe to call asyncio.run
asyncio.run(coro)
def sync_error(
self,
message: str,
exc: Exception | None = None,
**kwargs: Any,
) -> None:
"""Synchronous convenience wrapper for :meth:`error`."""
coro = self.error(message, exc=exc, **kwargs)
try:
loop = asyncio.get_running_loop()
loop.create_task(coro)
except RuntimeError:
asyncio.run(coro)

37
src/mals/config.py Normal file
View File

@@ -0,0 +1,37 @@
"""Configuration — all values from environment variables. No defaults for secrets."""
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# Required — must be set in environment; no default
database_url: str
mals_api_key: str
# Optional with safe defaults
mals_host: str = "0.0.0.0"
mals_port: int = 8421
log_level: str = "INFO"
model_config = {"env_file": ".env", "env_file_encoding": "utf-8", "extra": "ignore"}
def __init__(self, **kwargs): # type: ignore[override]
super().__init__(**kwargs)
if not self.database_url:
raise ValueError(
"mals: DATABASE_URL is required. "
"Set it via the DATABASE_URL environment variable or .env file."
)
if not self.mals_api_key:
raise ValueError(
"mals: MALS_API_KEY is required. "
"Set it via the MALS_API_KEY environment variable or .env file."
)
def get_settings() -> Settings:
return Settings() # type: ignore[call-arg]
# Singleton for use across the app
settings = Settings() # type: ignore[call-arg]

38
src/mals/db.py Normal file
View File

@@ -0,0 +1,38 @@
"""Database connection pool using asyncpg."""
from __future__ import annotations
import asyncpg
from .config import settings
_pool: asyncpg.Pool | None = None
async def get_pool() -> asyncpg.Pool:
"""Return the connection pool, creating it on first call."""
global _pool
if _pool is None:
_pool = await asyncpg.create_pool(
dsn=_asyncpg_dsn(settings.database_url),
min_size=2,
max_size=10,
)
return _pool
async def close_pool() -> None:
"""Close the connection pool (call on shutdown)."""
global _pool
if _pool is not None:
await _pool.close()
_pool = None
def _asyncpg_dsn(url: str) -> str:
"""Convert SQLAlchemy-style URL to asyncpg DSN.
asyncpg expects: postgresql://user:pass@host:port/db
SQLAlchemy uses: postgresql+asyncpg://...
"""
return url.replace("postgresql+asyncpg://", "postgresql://")

73
src/mals/main.py Normal file
View File

@@ -0,0 +1,73 @@
"""FastAPI application entrypoint."""
from __future__ import annotations
import logging
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from . import __version__
from .config import settings
from .db import close_pool, get_pool
from .routes.ingest import router as ingest_router
from .routes.query import router as query_router
logging.basicConfig(level=settings.log_level.upper())
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
logger.info("MALS %s starting up", __version__)
await get_pool()
logger.info("Database connection pool established")
yield
await close_pool()
logger.info("Database connection pool closed")
app = FastAPI(
title="Mosaic Agent Log System",
description="Centralized structured logging for AI agents.",
version=__version__,
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(ingest_router)
app.include_router(query_router)
@app.get("/health", tags=["health"])
async def health() -> dict[str, str]:
"""Health check — no auth required."""
try:
pool = await get_pool()
async with pool.acquire() as conn:
await conn.fetchval("SELECT 1")
db_status = "connected"
except Exception as exc:
logger.warning("DB health check failed: %s", exc)
db_status = "error"
return {"status": "ok", "db": db_status, "version": __version__}
if __name__ == "__main__":
uvicorn.run(
"mals.main:app",
host=settings.mals_host,
port=settings.mals_port,
log_level=settings.log_level.lower(),
)

107
src/mals/models.py Normal file
View File

@@ -0,0 +1,107 @@
"""Pydantic models for request/response validation."""
from __future__ import annotations
from datetime import datetime
from typing import Any
from uuid import UUID
from pydantic import BaseModel, Field, field_validator
VALID_LEVELS = {"debug", "info", "warn", "error", "critical"}
class LogEntry(BaseModel):
agent_id: str = Field(..., max_length=64)
level: str = Field("info", max_length=16)
message: str
category: str | None = Field(None, max_length=64)
session_key: str | None = Field(None, max_length=256)
source: str = Field("api", max_length=64)
metadata: dict[str, Any] = Field(default_factory=dict)
trace_id: UUID | None = None
parent_id: UUID | None = None
@field_validator("level")
@classmethod
def validate_level(cls, v: str) -> str:
v = v.lower()
if v not in VALID_LEVELS:
raise ValueError(f"level must be one of {sorted(VALID_LEVELS)}, got '{v}'")
return v
class LogEntryResponse(BaseModel):
id: UUID
created_at: datetime
class BatchResponse(BaseModel):
inserted: int
class LogRecord(BaseModel):
id: UUID
created_at: datetime
agent_id: str
session_key: str | None
source: str
level: str
category: str | None
message: str
metadata: dict[str, Any]
trace_id: UUID | None
parent_id: UUID | None
resolved: bool
resolved_at: datetime | None
resolved_by: str | None
class PaginatedLogs(BaseModel):
total: int
items: list[LogRecord]
class AgentStats(BaseModel):
total: int
error: int = 0
warn: int = 0
info: int = 0
debug: int = 0
critical: int = 0
class UnresolvedError(BaseModel):
id: UUID
agent_id: str
message: str
created_at: datetime
level: str
class SummaryResponse(BaseModel):
period: dict[str, datetime]
by_agent: dict[str, AgentStats]
unresolved_errors: list[UnresolvedError]
new_errors: list[UnresolvedError]
recurring_errors: list[UnresolvedError]
class AgentActivity(BaseModel):
agent_id: str
last_seen: datetime
error_count_24h: int
total_24h: int
class ResolveRequest(BaseModel):
resolved_by: str = Field(..., max_length=64)
class BatchResolveRequest(BaseModel):
ids: list[UUID]
resolved_by: str = Field(..., max_length=64)
class BatchResolveResponse(BaseModel):
resolved: int

View File

@@ -0,0 +1 @@
"""Route modules."""

81
src/mals/routes/ingest.py Normal file
View File

@@ -0,0 +1,81 @@
"""Log ingest routes: POST /logs, POST /logs/batch."""
from __future__ import annotations
import json
from fastapi import APIRouter, Depends, HTTPException
from ..auth import require_api_key
from ..db import get_pool
from ..models import BatchResponse, LogEntry, LogEntryResponse
router = APIRouter(prefix="/logs", tags=["ingest"])
@router.post("", response_model=LogEntryResponse, status_code=201)
async def ingest_log(
entry: LogEntry,
_: None = Depends(require_api_key),
) -> LogEntryResponse:
"""Ingest a single log entry."""
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO agent_logs
(agent_id, session_key, source, level, category, message,
metadata, trace_id, parent_id)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9)
RETURNING id, created_at
""",
entry.agent_id,
entry.session_key,
entry.source,
entry.level,
entry.category,
entry.message,
json.dumps(entry.metadata),
entry.trace_id,
entry.parent_id,
)
if row is None:
raise HTTPException(status_code=500, detail="Failed to insert log entry")
return LogEntryResponse(id=row["id"], created_at=row["created_at"])
@router.post("/batch", response_model=BatchResponse, status_code=201)
async def ingest_batch(
entries: list[LogEntry],
_: None = Depends(require_api_key),
) -> BatchResponse:
"""Ingest multiple log entries in one call."""
if not entries:
return BatchResponse(inserted=0)
pool = await get_pool()
async with pool.acquire() as conn:
rows = [
(
e.agent_id,
e.session_key,
e.source,
e.level,
e.category,
e.message,
json.dumps(e.metadata),
e.trace_id,
e.parent_id,
)
for e in entries
]
await conn.executemany(
"""
INSERT INTO agent_logs
(agent_id, session_key, source, level, category, message,
metadata, trace_id, parent_id)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9)
""",
rows,
)
return BatchResponse(inserted=len(entries))

326
src/mals/routes/query.py Normal file
View File

@@ -0,0 +1,326 @@
"""Query routes: GET /logs, GET /logs/summary, GET /logs/agents, PATCH /logs/{id}/resolve."""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query
from ..auth import require_api_key
from ..db import get_pool
from ..models import (
AgentActivity,
AgentStats,
BatchResolveRequest,
BatchResolveResponse,
LogRecord,
PaginatedLogs,
ResolveRequest,
SummaryResponse,
UnresolvedError,
)
router = APIRouter(prefix="/logs", tags=["query"])
@router.get("", response_model=PaginatedLogs)
async def list_logs(
agent_id: str | None = Query(None),
level: str | None = Query(None),
category: str | None = Query(None),
source: str | None = Query(None),
resolved: bool | None = Query(None),
since: datetime | None = Query(None),
until: datetime | None = Query(None),
search: str | None = Query(None),
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
_: None = Depends(require_api_key),
) -> PaginatedLogs:
"""List log entries with optional filters."""
pool = await get_pool()
conditions: list[str] = []
params: list[object] = []
idx = 1
def add(cond: str, val: object) -> None:
nonlocal idx
conditions.append(cond.format(idx))
params.append(val)
idx += 1
if agent_id is not None:
add("agent_id = ${}", agent_id)
if level is not None:
add("level = ${}", level.lower())
if category is not None:
add("category = ${}", category)
if source is not None:
add("source = ${}", source)
if resolved is not None:
add("resolved = ${}", resolved)
if since is not None:
add("created_at >= ${}", since)
if until is not None:
add("created_at <= ${}", until)
if search is not None:
add("message ILIKE ${}", f"%{search}%")
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
async with pool.acquire() as conn:
count_row = await conn.fetchrow(f"SELECT COUNT(*) AS n FROM agent_logs {where}", *params)
total: int = count_row["n"] if count_row else 0
rows = await conn.fetch(
f"""
SELECT id, created_at, agent_id, session_key, source, level, category,
message, metadata, trace_id, parent_id, resolved, resolved_at, resolved_by
FROM agent_logs
{where}
ORDER BY created_at DESC
LIMIT ${idx} OFFSET ${idx + 1}
""",
*params,
limit,
offset,
)
items = [
LogRecord(
id=r["id"],
created_at=r["created_at"],
agent_id=r["agent_id"],
session_key=r["session_key"],
source=r["source"],
level=r["level"],
category=r["category"],
message=r["message"],
metadata=dict(r["metadata"]) if r["metadata"] else {},
trace_id=r["trace_id"],
parent_id=r["parent_id"],
resolved=r["resolved"],
resolved_at=r["resolved_at"],
resolved_by=r["resolved_by"],
)
for r in rows
]
return PaginatedLogs(total=total, items=items)
@router.get("/agents", response_model=list[AgentActivity])
async def list_agents(
_: None = Depends(require_api_key),
) -> list[AgentActivity]:
"""Return distinct agents with last activity and 24h error count."""
pool = await get_pool()
cutoff = datetime.now(tz=UTC) - timedelta(hours=24)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT
agent_id,
MAX(created_at) AS last_seen,
COUNT(*) FILTER (WHERE created_at >= $1) AS total_24h,
COUNT(*) FILTER (
WHERE level IN ('error','critical') AND created_at >= $1
) AS error_count_24h
FROM agent_logs
GROUP BY agent_id
ORDER BY last_seen DESC
""",
cutoff,
)
return [
AgentActivity(
agent_id=r["agent_id"],
last_seen=r["last_seen"],
error_count_24h=r["error_count_24h"],
total_24h=r["total_24h"],
)
for r in rows
]
@router.get("/summary", response_model=SummaryResponse)
async def get_summary(
since: datetime | None = Query(None),
agent_id: str | None = Query(None),
_: None = Depends(require_api_key),
) -> SummaryResponse:
"""Summarize log activity over a period."""
now = datetime.now(tz=UTC)
since_dt = since if since is not None else (now - timedelta(hours=24))
pool = await get_pool()
conditions = ["created_at >= $1"]
params: list[object] = [since_dt]
idx = 2
if agent_id is not None:
conditions.append(f"agent_id = ${idx}")
params.append(agent_id)
idx += 1
where = "WHERE " + " AND ".join(conditions)
async with pool.acquire() as conn:
# Stats by agent
agent_rows = await conn.fetch(
f"""
SELECT agent_id, level, COUNT(*) AS cnt
FROM agent_logs
{where}
GROUP BY agent_id, level
""",
*params,
)
# Unresolved errors/warnings in period
unresolved_rows = await conn.fetch(
f"""
SELECT id, agent_id, message, created_at, level
FROM agent_logs
{where} AND resolved = false AND level IN ('warn','error','critical')
ORDER BY created_at DESC
LIMIT 50
""",
*params,
)
# Recurring errors: same message seen 3+ times in period
recurring_rows = await conn.fetch(
f"""
SELECT DISTINCT ON (message) id, agent_id, message, created_at, level
FROM agent_logs
{where} AND level IN ('error','critical')
AND message IN (
SELECT message FROM agent_logs
{where} AND level IN ('error','critical')
GROUP BY message HAVING COUNT(*) >= 3
)
ORDER BY message, created_at DESC
LIMIT 20
""",
*params,
)
# New errors: seen in this period but NOT in the previous same-length period
period_len = now - since_dt
prev_since = since_dt - period_len
prev_rows = await conn.fetch(
"""
SELECT DISTINCT message FROM agent_logs
WHERE created_at >= $1 AND created_at < $2
AND level IN ('error','critical')
""",
prev_since,
since_dt,
)
prev_messages = {r["message"] for r in prev_rows}
new_error_rows = await conn.fetch(
f"""
SELECT DISTINCT ON (message) id, agent_id, message, created_at, level
FROM agent_logs
{where} AND level IN ('error','critical')
ORDER BY message, created_at DESC
LIMIT 20
""",
*params,
)
# Build by_agent dict
by_agent: dict[str, AgentStats] = {}
for r in agent_rows:
aid = r["agent_id"]
if aid not in by_agent:
by_agent[aid] = AgentStats(total=0)
stats = by_agent[aid]
cnt = r["cnt"]
stats.total += cnt
lvl = r["level"]
if lvl == "error":
stats.error += cnt
elif lvl == "warn":
stats.warn += cnt
elif lvl == "info":
stats.info += cnt
elif lvl == "debug":
stats.debug += cnt
elif lvl == "critical":
stats.critical += cnt
def to_unresolved(r: object) -> UnresolvedError:
return UnresolvedError(
id=r["id"], # type: ignore[index]
agent_id=r["agent_id"], # type: ignore[index]
message=r["message"], # type: ignore[index]
created_at=r["created_at"], # type: ignore[index]
level=r["level"], # type: ignore[index]
)
new_errors = [to_unresolved(r) for r in new_error_rows if r["message"] not in prev_messages]
return SummaryResponse(
period={"since": since_dt, "until": now},
by_agent=by_agent,
unresolved_errors=[to_unresolved(r) for r in unresolved_rows],
new_errors=new_errors,
recurring_errors=[to_unresolved(r) for r in recurring_rows],
)
@router.patch("/{log_id}/resolve", status_code=200)
async def resolve_log(
log_id: UUID,
body: ResolveRequest,
_: None = Depends(require_api_key),
) -> dict[str, str]:
"""Mark a single log entry as resolved."""
pool = await get_pool()
async with pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE agent_logs
SET resolved = true,
resolved_at = now(),
resolved_by = $2
WHERE id = $1 AND resolved = false
""",
log_id,
body.resolved_by,
)
if result == "UPDATE 0":
raise HTTPException(status_code=404, detail="Log entry not found or already resolved")
return {"status": "resolved"}
@router.post("/resolve-batch", response_model=BatchResolveResponse)
async def resolve_batch(
body: BatchResolveRequest,
_: None = Depends(require_api_key),
) -> BatchResolveResponse:
"""Bulk resolve multiple log entries."""
if not body.ids:
return BatchResolveResponse(resolved=0)
pool = await get_pool()
async with pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE agent_logs
SET resolved = true,
resolved_at = now(),
resolved_by = $2
WHERE id = ANY($1::uuid[]) AND resolved = false
""",
[str(i) for i in body.ids],
body.resolved_by,
)
count = int(result.split()[-1]) if result else 0
return BatchResolveResponse(resolved=count)

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""MALS test suite."""

50
tests/conftest.py Normal file
View File

@@ -0,0 +1,50 @@
"""Test configuration and fixtures.
Tests run against a real PostgreSQL instance (docker-compose test stack).
The DATABASE_URL and MALS_API_KEY are read from environment / .env.test.
If not set, pytest will skip DB-dependent tests gracefully.
"""
from __future__ import annotations
import asyncio
import os
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from httpx import ASGITransport, AsyncClient
# Set required env vars before importing the app
os.environ.setdefault("DATABASE_URL", "postgresql+asyncpg://mals:mals@localhost:5434/mals")
os.environ.setdefault("MALS_API_KEY", "test-secret-key")
from mals.main import app # noqa: E402
TEST_API_KEY = os.environ["MALS_API_KEY"]
AUTH_HEADERS = {"Authorization": f"Bearer {TEST_API_KEY}"}
# ---------------------------------------------------------------------------
# Async HTTP client fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
async def async_client() -> AsyncClient:
"""Async HTTPX client backed by the ASGI app — no real DB needed for unit tests."""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
# ---------------------------------------------------------------------------
# Mocked DB pool fixture
# ---------------------------------------------------------------------------
@pytest.fixture
def mock_pool():
"""Return a mock asyncpg pool for unit tests that don't need a real DB."""
pool = MagicMock()
conn = AsyncMock()
pool.acquire.return_value.__aenter__ = AsyncMock(return_value=conn)
pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False)
return pool, conn

53
tests/test_auth.py Normal file
View File

@@ -0,0 +1,53 @@
"""Authentication tests."""
from __future__ import annotations
import pytest
from httpx import AsyncClient
from tests.conftest import AUTH_HEADERS
@pytest.mark.asyncio
async def test_auth_valid_token_passes(async_client: AsyncClient):
"""Requests with correct Bearer token are not rejected at the auth layer.
We don't need a real DB for this — a 422 from missing body means auth passed.
"""
# POST /logs without a body — if auth passes we get 422 (validation), not 401
response = await async_client.post("/logs", json={}, headers=AUTH_HEADERS)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_auth_missing_header(async_client: AsyncClient):
"""Missing Authorization header returns 403 (no credentials) or 401."""
response = await async_client.post("/logs", json={})
assert response.status_code in {401, 403}
@pytest.mark.asyncio
async def test_auth_wrong_token(async_client: AsyncClient):
"""Wrong Bearer token returns 401."""
response = await async_client.post(
"/logs",
json={"agent_id": "x", "message": "x"},
headers={"Authorization": "Bearer totally-wrong"},
)
assert response.status_code == 401
@pytest.mark.asyncio
async def test_auth_not_required_for_health(async_client: AsyncClient):
"""GET /health does not require authentication."""
from unittest.mock import AsyncMock, MagicMock, patch
mock_conn = AsyncMock()
mock_conn.fetchval.return_value = 1
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False)
with patch("mals.main.get_pool", AsyncMock(return_value=mock_pool)):
response = await async_client.get("/health")
assert response.status_code == 200

144
tests/test_ingest.py Normal file
View File

@@ -0,0 +1,144 @@
"""Tests for ingest routes: POST /logs and POST /logs/batch."""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from httpx import AsyncClient
from tests.conftest import AUTH_HEADERS
# ---------------------------------------------------------------------------
# POST /logs — happy path
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ingest_single_log_success(async_client: AsyncClient):
"""POST /logs with valid payload returns 201 with id and created_at."""
fake_id = uuid.uuid4()
fake_ts = datetime.now(tz=timezone.utc)
mock_conn = AsyncMock()
mock_conn.fetchrow.return_value = {"id": fake_id, "created_at": fake_ts}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False)
with patch("mals.routes.ingest.get_pool", AsyncMock(return_value=mock_pool)):
response = await async_client.post(
"/logs",
json={
"agent_id": "crypto",
"level": "error",
"message": "Bot container unhealthy after restart",
"category": "deploy",
"session_key": "agent:crypto:discord:channel:123",
"source": "openclaw",
"metadata": {"container": "jarvis-crypto-bot-1", "exit_code": 1},
},
headers=AUTH_HEADERS,
)
assert response.status_code == 201
data = response.json()
assert "id" in data
assert "created_at" in data
# ---------------------------------------------------------------------------
# POST /logs — invalid level
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ingest_invalid_level_rejected(async_client: AsyncClient):
"""POST /logs with an invalid level returns 422 Unprocessable Entity."""
response = await async_client.post(
"/logs",
json={
"agent_id": "crypto",
"level": "BADLEVEL",
"message": "This should fail validation",
},
headers=AUTH_HEADERS,
)
assert response.status_code == 422
body = response.json()
# Pydantic v2 puts errors in 'detail' list
assert "detail" in body
# ---------------------------------------------------------------------------
# POST /logs — auth rejection
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ingest_requires_auth(async_client: AsyncClient):
"""POST /logs without auth header returns 401."""
response = await async_client.post(
"/logs",
json={"agent_id": "crypto", "message": "should fail"},
)
assert response.status_code == 401
@pytest.mark.asyncio
async def test_ingest_wrong_token_rejected(async_client: AsyncClient):
"""POST /logs with wrong token returns 401."""
response = await async_client.post(
"/logs",
json={"agent_id": "crypto", "message": "should fail"},
headers={"Authorization": "Bearer wrong-key"},
)
assert response.status_code == 401
# ---------------------------------------------------------------------------
# POST /logs/batch
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ingest_batch_success(async_client: AsyncClient):
"""POST /logs/batch inserts multiple entries and returns count."""
mock_conn = AsyncMock()
mock_conn.executemany.return_value = None
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False)
entries = [
{"agent_id": "crypto", "level": "info", "message": f"Log entry {i}"}
for i in range(5)
]
with patch("mals.routes.ingest.get_pool", AsyncMock(return_value=mock_pool)):
response = await async_client.post(
"/logs/batch",
json=entries,
headers=AUTH_HEADERS,
)
assert response.status_code == 201
data = response.json()
assert data["inserted"] == 5
@pytest.mark.asyncio
async def test_ingest_batch_empty(async_client: AsyncClient):
"""POST /logs/batch with empty list returns 0 inserted without DB call."""
mock_pool = MagicMock()
with patch("mals.routes.ingest.get_pool", AsyncMock(return_value=mock_pool)):
response = await async_client.post(
"/logs/batch",
json=[],
headers=AUTH_HEADERS,
)
assert response.status_code == 201
assert response.json()["inserted"] == 0

220
tests/test_query.py Normal file
View File

@@ -0,0 +1,220 @@
"""Tests for query routes: GET /logs, GET /logs/summary, GET /logs/agents."""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from httpx import AsyncClient
from tests.conftest import AUTH_HEADERS
# Shared test data
_FAKE_LOG = {
"id": uuid.uuid4(),
"created_at": datetime.now(tz=timezone.utc),
"agent_id": "crypto",
"session_key": None,
"source": "api",
"level": "error",
"category": "deploy",
"message": "Bot container unhealthy",
"metadata": {},
"trace_id": None,
"parent_id": None,
"resolved": False,
"resolved_at": None,
"resolved_by": None,
}
def _make_pool_returning(rows, count=None):
"""Create a mock pool whose conn returns *rows* from fetch() and *count* from fetchrow()."""
mock_conn = AsyncMock()
mock_conn.fetch.return_value = rows
mock_conn.fetchrow.return_value = {"n": count if count is not None else len(rows)}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False)
return mock_pool
# ---------------------------------------------------------------------------
# GET /logs — filter by agent_id
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_list_logs_filter_by_agent(async_client: AsyncClient):
"""GET /logs?agent_id=crypto returns paginated results."""
mock_pool = _make_pool_returning([_FAKE_LOG], count=1)
with patch("mals.routes.query.get_pool", AsyncMock(return_value=mock_pool)):
response = await async_client.get(
"/logs",
params={"agent_id": "crypto"},
headers=AUTH_HEADERS,
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
assert len(data["items"]) == 1
assert data["items"][0]["agent_id"] == "crypto"
assert data["items"][0]["level"] == "error"
@pytest.mark.asyncio
async def test_list_logs_no_auth(async_client: AsyncClient):
"""GET /logs without auth returns 401."""
response = await async_client.get("/logs")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_list_logs_limit_enforced(async_client: AsyncClient):
"""GET /logs with limit > 1000 returns 422."""
response = await async_client.get(
"/logs",
params={"limit": 9999},
headers=AUTH_HEADERS,
)
assert response.status_code == 422
# ---------------------------------------------------------------------------
# GET /logs/summary
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_summary_returns_structure(async_client: AsyncClient):
"""GET /logs/summary returns the expected keys."""
mock_conn = AsyncMock()
# agent_rows: level breakdown by agent
mock_conn.fetch.side_effect = [
[{"agent_id": "crypto", "level": "error", "cnt": 2}], # agent_rows
[], # unresolved_rows
[], # recurring_rows
[], # prev_rows (for new_errors calculation)
[], # new_error_rows
]
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False)
with patch("mals.routes.query.get_pool", AsyncMock(return_value=mock_pool)):
response = await async_client.get("/logs/summary", headers=AUTH_HEADERS)
assert response.status_code == 200
data = response.json()
assert "period" in data
assert "by_agent" in data
assert "unresolved_errors" in data
assert "new_errors" in data
assert "recurring_errors" in data
assert "crypto" in data["by_agent"]
assert data["by_agent"]["crypto"]["error"] == 2
# ---------------------------------------------------------------------------
# GET /logs/agents
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_list_agents_returns_activity(async_client: AsyncClient):
"""GET /logs/agents returns agent activity list."""
mock_conn = AsyncMock()
mock_conn.fetch.return_value = [
{
"agent_id": "crypto",
"last_seen": datetime.now(tz=timezone.utc),
"total_24h": 42,
"error_count_24h": 3,
}
]
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False)
with patch("mals.routes.query.get_pool", AsyncMock(return_value=mock_pool)):
response = await async_client.get("/logs/agents", headers=AUTH_HEADERS)
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert data[0]["agent_id"] == "crypto"
assert data[0]["error_count_24h"] == 3
# ---------------------------------------------------------------------------
# PATCH /logs/{id}/resolve
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_resolve_log_success(async_client: AsyncClient):
"""PATCH /logs/{id}/resolve marks entry resolved."""
mock_conn = AsyncMock()
mock_conn.execute.return_value = "UPDATE 1"
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False)
log_id = uuid.uuid4()
with patch("mals.routes.query.get_pool", AsyncMock(return_value=mock_pool)):
response = await async_client.patch(
f"/logs/{log_id}/resolve",
json={"resolved_by": "jarvis"},
headers=AUTH_HEADERS,
)
assert response.status_code == 200
assert response.json()["status"] == "resolved"
@pytest.mark.asyncio
async def test_resolve_log_not_found(async_client: AsyncClient):
"""PATCH /logs/{id}/resolve returns 404 when entry not found or already resolved."""
mock_conn = AsyncMock()
mock_conn.execute.return_value = "UPDATE 0"
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False)
log_id = uuid.uuid4()
with patch("mals.routes.query.get_pool", AsyncMock(return_value=mock_pool)):
response = await async_client.patch(
f"/logs/{log_id}/resolve",
json={"resolved_by": "jarvis"},
headers=AUTH_HEADERS,
)
assert response.status_code == 404
# ---------------------------------------------------------------------------
# GET /health — no auth required
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_health_no_auth(async_client: AsyncClient):
"""GET /health is accessible without auth."""
mock_conn = AsyncMock()
mock_conn.fetchval.return_value = 1
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False)
with patch("mals.main.get_pool", AsyncMock(return_value=mock_pool)):
response = await async_client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert "version" in data

1036
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff