145 lines
4.8 KiB
Python
145 lines
4.8 KiB
Python
"""Tests for ingest routes: POST /logs and POST /logs/batch."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
|
|
from tests.conftest import AUTH_HEADERS
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /logs — happy path
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ingest_single_log_success(async_client: AsyncClient):
|
|
"""POST /logs with valid payload returns 201 with id and created_at."""
|
|
fake_id = uuid.uuid4()
|
|
fake_ts = datetime.now(tz=timezone.utc)
|
|
|
|
mock_conn = AsyncMock()
|
|
mock_conn.fetchrow.return_value = {"id": fake_id, "created_at": fake_ts}
|
|
|
|
mock_pool = MagicMock()
|
|
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
with patch("mals.routes.ingest.get_pool", AsyncMock(return_value=mock_pool)):
|
|
response = await async_client.post(
|
|
"/logs",
|
|
json={
|
|
"agent_id": "crypto",
|
|
"level": "error",
|
|
"message": "Bot container unhealthy after restart",
|
|
"category": "deploy",
|
|
"session_key": "agent:crypto:discord:channel:123",
|
|
"source": "openclaw",
|
|
"metadata": {"container": "jarvis-crypto-bot-1", "exit_code": 1},
|
|
},
|
|
headers=AUTH_HEADERS,
|
|
)
|
|
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert "id" in data
|
|
assert "created_at" in data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /logs — invalid level
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ingest_invalid_level_rejected(async_client: AsyncClient):
|
|
"""POST /logs with an invalid level returns 422 Unprocessable Entity."""
|
|
response = await async_client.post(
|
|
"/logs",
|
|
json={
|
|
"agent_id": "crypto",
|
|
"level": "BADLEVEL",
|
|
"message": "This should fail validation",
|
|
},
|
|
headers=AUTH_HEADERS,
|
|
)
|
|
assert response.status_code == 422
|
|
body = response.json()
|
|
# Pydantic v2 puts errors in 'detail' list
|
|
assert "detail" in body
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /logs — auth rejection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ingest_requires_auth(async_client: AsyncClient):
|
|
"""POST /logs without auth header returns 401."""
|
|
response = await async_client.post(
|
|
"/logs",
|
|
json={"agent_id": "crypto", "message": "should fail"},
|
|
)
|
|
assert response.status_code == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ingest_wrong_token_rejected(async_client: AsyncClient):
|
|
"""POST /logs with wrong token returns 401."""
|
|
response = await async_client.post(
|
|
"/logs",
|
|
json={"agent_id": "crypto", "message": "should fail"},
|
|
headers={"Authorization": "Bearer wrong-key"},
|
|
)
|
|
assert response.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /logs/batch
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ingest_batch_success(async_client: AsyncClient):
|
|
"""POST /logs/batch inserts multiple entries and returns count."""
|
|
mock_conn = AsyncMock()
|
|
mock_conn.executemany.return_value = None
|
|
|
|
mock_pool = MagicMock()
|
|
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
entries = [
|
|
{"agent_id": "crypto", "level": "info", "message": f"Log entry {i}"}
|
|
for i in range(5)
|
|
]
|
|
|
|
with patch("mals.routes.ingest.get_pool", AsyncMock(return_value=mock_pool)):
|
|
response = await async_client.post(
|
|
"/logs/batch",
|
|
json=entries,
|
|
headers=AUTH_HEADERS,
|
|
)
|
|
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert data["inserted"] == 5
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ingest_batch_empty(async_client: AsyncClient):
|
|
"""POST /logs/batch with empty list returns 0 inserted without DB call."""
|
|
mock_pool = MagicMock()
|
|
|
|
with patch("mals.routes.ingest.get_pool", AsyncMock(return_value=mock_pool)):
|
|
response = await async_client.post(
|
|
"/logs/batch",
|
|
json=[],
|
|
headers=AUTH_HEADERS,
|
|
)
|
|
|
|
assert response.status_code == 201
|
|
assert response.json()["inserted"] == 0
|