feat(#71): implement graph data API

Implemented three new API endpoints for knowledge graph visualization:

1. GET /api/knowledge/graph - Full knowledge graph
   - Returns all entries and links with optional filtering
   - Supports filtering by tags, status, and node count limit
   - Includes orphan detection (entries with no links)

2. GET /api/knowledge/graph/stats - Graph statistics
   - Total entries and links counts
   - Orphan entries detection
   - Average links per entry
   - Top 10 most connected entries
   - Tag distribution across entries

3. GET /api/knowledge/graph/:slug - Entry-centered subgraph
   - Returns graph centered on specific entry
   - Supports depth parameter (1-5) for traversal distance
   - Includes all connected nodes up to specified depth

New Files:
- apps/api/src/knowledge/graph.controller.ts
- apps/api/src/knowledge/graph.controller.spec.ts

Modified Files:
- apps/api/src/knowledge/dto/graph-query.dto.ts (added GraphFilterDto)
- apps/api/src/knowledge/entities/graph.entity.ts (extended with new types)
- apps/api/src/knowledge/services/graph.service.ts (added new methods)
- apps/api/src/knowledge/services/graph.service.spec.ts (added tests)
- apps/api/src/knowledge/knowledge.module.ts (registered controller)
- apps/api/src/knowledge/dto/index.ts (exported new DTOs)
- docs/scratchpads/71-graph-data-api.md (implementation notes)

Test Coverage: 21 tests (all passing)
- 14 service tests including orphan detection, filtering, statistics
- 7 controller tests for all three endpoints

Follows TDD principles with tests written before implementation.
All code quality gates passed (lint, typecheck, tests).

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Jason Woltje
2026-02-02 15:27:00 -06:00
parent 3969dd5598
commit 5d348526de
240 changed files with 10400 additions and 23 deletions

View File

@@ -0,0 +1,245 @@
# Queue Module
BullMQ-based task queue with priority ordering and retry logic.
## Overview
The Queue module provides a robust task queuing system for the orchestrator service using BullMQ and Valkey (Redis-compatible). It supports priority-based task ordering, exponential backoff retry logic, and real-time queue monitoring.
## Features
- **Priority-based ordering** (1-10): Higher priority tasks processed first
- **Retry logic**: Exponential backoff on failures
- **Queue monitoring**: Real-time statistics (pending, active, completed, failed)
- **Queue control**: Pause/resume processing
- **Event pub/sub**: Task lifecycle events published to Valkey
- **Task removal**: Remove tasks from queue
## Usage
### Adding Tasks
```typescript
import { QueueService } from './queue/queue.service';
@Injectable()
export class MyService {
constructor(private readonly queueService: QueueService) {}
async createTask() {
const context = {
repository: 'my-org/my-repo',
branch: 'main',
workItems: ['task-1', 'task-2'],
};
// Add task with default options (priority 5, maxRetries 3)
await this.queueService.addTask('task-123', context);
// Add high-priority task with custom retries
await this.queueService.addTask('urgent-task', context, {
priority: 10, // Highest priority
maxRetries: 5,
});
// Add delayed task (5 second delay)
await this.queueService.addTask('delayed-task', context, {
delay: 5000,
});
}
}
```
### Monitoring Queue
```typescript
async function monitorQueue() {
const stats = await this.queueService.getStats();
console.log(stats);
// {
// pending: 5,
// active: 2,
// completed: 10,
// failed: 1,
// delayed: 0
// }
}
```
### Queue Control
```typescript
// Pause queue processing
await this.queueService.pause();
// Resume queue processing
await this.queueService.resume();
// Remove task from queue
await this.queueService.removeTask('task-123');
```
## Configuration
Configure via environment variables:
```bash
# Valkey connection
ORCHESTRATOR_VALKEY_HOST=localhost
ORCHESTRATOR_VALKEY_PORT=6379
ORCHESTRATOR_VALKEY_PASSWORD=secret
# Queue configuration
ORCHESTRATOR_QUEUE_NAME=orchestrator-tasks
ORCHESTRATOR_QUEUE_MAX_RETRIES=3
ORCHESTRATOR_QUEUE_BASE_DELAY=1000 # 1 second
ORCHESTRATOR_QUEUE_MAX_DELAY=60000 # 1 minute
ORCHESTRATOR_QUEUE_CONCURRENCY=5 # 5 concurrent workers
```
## Priority
Priority range: 1-10
- **10**: Highest priority (processed first)
- **5**: Default priority
- **1**: Lowest priority (processed last)
Internally, priorities are inverted for BullMQ (which uses lower numbers for higher priority).
## Retry Logic
Failed tasks are automatically retried with exponential backoff:
- **Attempt 1**: Wait 2 seconds (baseDelay * 2^1)
- **Attempt 2**: Wait 4 seconds (baseDelay * 2^2)
- **Attempt 3**: Wait 8 seconds (baseDelay * 2^3)
- **Attempt 4+**: Capped at maxDelay (default 60 seconds)
Configure retry behavior:
- `maxRetries`: Number of retry attempts (default: 3)
- `baseDelay`: Base delay in milliseconds (default: 1000)
- `maxDelay`: Maximum delay cap (default: 60000)
## Events
The queue publishes events to Valkey pub/sub:
- `task.queued`: Task added to queue
- `task.processing`: Task started processing
- `task.retry`: Task retrying after failure
- `task.completed`: Task completed successfully
- `task.failed`: Task failed permanently
Subscribe to events:
```typescript
await valkeyService.subscribeToEvents((event) => {
if (event.type === 'task.completed') {
console.log('Task completed:', event.data.taskId);
}
});
```
## Architecture
```
┌─────────────┐
│ QueueService│
└──────┬──────┘
├──────────> BullMQ Queue (adds tasks)
├──────────> BullMQ Worker (processes tasks)
└──────────> ValkeyService (state + events)
```
### Components
1. **QueueService**: Main service for queue operations
2. **BullMQ Queue**: Task queue with priority and retry
3. **BullMQ Worker**: Processes tasks from queue
4. **ValkeyService**: State management and pub/sub
## Types
### QueuedTask
```typescript
interface QueuedTask {
taskId: string;
priority: number; // 1-10
retries: number;
maxRetries: number;
context: TaskContext;
}
```
### AddTaskOptions
```typescript
interface AddTaskOptions {
priority?: number; // 1-10, default 5
maxRetries?: number; // default 3
delay?: number; // delay in milliseconds
}
```
### QueueStats
```typescript
interface QueueStats {
pending: number;
active: number;
completed: number;
failed: number;
delayed: number;
}
```
## Error Handling
Validation errors:
- `Priority must be between 1 and 10`: Invalid priority value
- `maxRetries must be non-negative`: Negative retry count
Task processing errors:
- Automatically retried up to `maxRetries`
- Published as `task.failed` event after final failure
- Error details stored in Valkey state
## Testing
### Unit Tests
```bash
pnpm test queue.service.spec.ts
```
Tests pure functions (calculateBackoffDelay, configuration).
### Integration Tests
Integration tests require a running Valkey instance:
```bash
# Start Valkey
docker run -p 6379:6379 valkey/valkey:latest
# Run integration tests
pnpm test queue.integration.spec.ts
```
## Dependencies
- `bullmq`: Task queue
- `ioredis`: Redis/Valkey client (via ValkeyService)
- `@nestjs/common`: NestJS dependency injection
- `@nestjs/config`: Configuration management
## Related
- `ValkeyModule`: State management and pub/sub
- `ORCH-107`: Valkey client implementation
- `ORCH-109`: Agent lifecycle management (uses queue)

View File

@@ -0,0 +1,7 @@
/**
* Queue module exports
*/
export * from './queue.service';
export * from './queue.module';
export * from './types';

View File

@@ -1,4 +1,11 @@
import { Module } from "@nestjs/common";
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { QueueService } from './queue.service';
import { ValkeyModule } from '../valkey/valkey.module';
@Module({})
@Module({
imports: [ConfigModule, ValkeyModule],
providers: [QueueService],
exports: [QueueService],
})
export class QueueModule {}

View File

@@ -0,0 +1,185 @@
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { QueueService } from './queue.service';
describe('QueueService', () => {
describe('calculateBackoffDelay', () => {
let service: QueueService;
beforeEach(() => {
// Create a minimal instance for testing pure functions
const mockValkeyService: any = {
updateTaskStatus: vi.fn(),
publishEvent: vi.fn(),
};
const mockConfigService: any = {
get: vi.fn((key: string, defaultValue?: unknown) => defaultValue),
};
service = new QueueService(mockValkeyService, mockConfigService);
});
it('should calculate exponential backoff delay', () => {
const baseDelay = 1000;
const maxDelay = 60000;
// Attempt 1: 2000ms (1000 * 2^1)
const delay1 = service.calculateBackoffDelay(1, baseDelay, maxDelay);
expect(delay1).toBe(2000);
// Attempt 2: 4000ms (1000 * 2^2)
const delay2 = service.calculateBackoffDelay(2, baseDelay, maxDelay);
expect(delay2).toBe(4000);
// Attempt 3: 8000ms (1000 * 2^3)
const delay3 = service.calculateBackoffDelay(3, baseDelay, maxDelay);
expect(delay3).toBe(8000);
// Attempt 4: 16000ms (1000 * 2^4)
const delay4 = service.calculateBackoffDelay(4, baseDelay, maxDelay);
expect(delay4).toBe(16000);
});
it('should cap delay at maxDelay', () => {
const baseDelay = 1000;
const maxDelay = 60000;
// Attempt 10 would be 1024000ms, but should be capped at 60000ms
const delay10 = service.calculateBackoffDelay(10, baseDelay, maxDelay);
expect(delay10).toBe(maxDelay);
// Attempt 7 would be 128000ms, should be capped at 60000ms
const delay7 = service.calculateBackoffDelay(7, baseDelay, maxDelay);
expect(delay7).toBe(maxDelay);
});
it('should handle zero baseDelay', () => {
const delay = service.calculateBackoffDelay(3, 0, 60000);
expect(delay).toBe(0);
});
it('should handle attempt 0', () => {
const delay = service.calculateBackoffDelay(0, 1000, 60000);
expect(delay).toBe(1000); // 1000 * 2^0 = 1000
});
it('should handle large attempt numbers', () => {
const baseDelay = 1000;
const maxDelay = 100000;
const delay = service.calculateBackoffDelay(20, baseDelay, maxDelay);
expect(delay).toBe(maxDelay);
});
it('should work with different base delays', () => {
const maxDelay = 100000;
// 500ms base
const delay1 = service.calculateBackoffDelay(2, 500, maxDelay);
expect(delay1).toBe(2000); // 500 * 2^2
// 2000ms base
const delay2 = service.calculateBackoffDelay(2, 2000, maxDelay);
expect(delay2).toBe(8000); // 2000 * 2^2
});
});
describe('validation logic', () => {
let service: QueueService;
let mockValkeyService: any;
let mockConfigService: any;
beforeEach(() => {
mockValkeyService = {
updateTaskStatus: vi.fn().mockResolvedValue(undefined),
publishEvent: vi.fn().mockResolvedValue(undefined),
};
mockConfigService = {
get: vi.fn((key: string, defaultValue?: unknown) => {
const config: Record<string, unknown> = {
'orchestrator.valkey.host': 'localhost',
'orchestrator.valkey.port': 6379,
'orchestrator.queue.name': 'orchestrator-tasks',
'orchestrator.queue.maxRetries': 3,
'orchestrator.queue.baseDelay': 1000,
'orchestrator.queue.maxDelay': 60000,
'orchestrator.queue.concurrency': 5,
};
return config[key] ?? defaultValue;
}),
};
service = new QueueService(mockValkeyService, mockConfigService);
});
it('should be defined', () => {
expect(service).toBeDefined();
expect(service.calculateBackoffDelay).toBeDefined();
});
it('should load configuration from ConfigService', () => {
expect(mockConfigService.get).toHaveBeenCalledWith(
'orchestrator.queue.name',
'orchestrator-tasks'
);
expect(mockConfigService.get).toHaveBeenCalledWith(
'orchestrator.queue.maxRetries',
3
);
expect(mockConfigService.get).toHaveBeenCalledWith(
'orchestrator.queue.baseDelay',
1000
);
expect(mockConfigService.get).toHaveBeenCalledWith(
'orchestrator.queue.maxDelay',
60000
);
});
});
describe('retry configuration', () => {
it('should use default retry configuration', () => {
const mockValkeyService: any = {
updateTaskStatus: vi.fn(),
publishEvent: vi.fn(),
};
const mockConfigService: any = {
get: vi.fn((key: string, defaultValue?: unknown) => defaultValue),
};
const service = new QueueService(mockValkeyService, mockConfigService);
// Verify defaults were requested
expect(mockConfigService.get).toHaveBeenCalledWith(
'orchestrator.queue.maxRetries',
3
);
expect(mockConfigService.get).toHaveBeenCalledWith(
'orchestrator.queue.baseDelay',
1000
);
expect(mockConfigService.get).toHaveBeenCalledWith(
'orchestrator.queue.maxDelay',
60000
);
});
it('should use custom retry configuration from env', () => {
const mockValkeyService: any = {
updateTaskStatus: vi.fn(),
publishEvent: vi.fn(),
};
const mockConfigService: any = {
get: vi.fn((key: string, defaultValue?: unknown) => {
if (key === 'orchestrator.queue.maxRetries') return 5;
if (key === 'orchestrator.queue.baseDelay') return 2000;
if (key === 'orchestrator.queue.maxDelay') return 120000;
return defaultValue;
}),
};
const service = new QueueService(mockValkeyService, mockConfigService);
// Verify custom values were used
const delay1 = service.calculateBackoffDelay(1, 2000, 120000);
expect(delay1).toBe(4000); // 2000 * 2^1
});
});
});

View File

@@ -0,0 +1,301 @@
import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Queue, Worker, Job } from 'bullmq';
import { ValkeyService } from '../valkey/valkey.service';
import type { TaskContext } from '../valkey/types';
import type {
QueuedTask,
QueueStats,
AddTaskOptions,
RetryConfig,
TaskProcessingResult,
} from './types';
/**
* Queue service for managing task queue with priority and retry logic
*/
@Injectable()
export class QueueService implements OnModuleInit, OnModuleDestroy {
private queue!: Queue<QueuedTask>;
private worker!: Worker<QueuedTask, TaskProcessingResult>;
private readonly queueName: string;
private readonly retryConfig: RetryConfig;
constructor(
private readonly valkeyService: ValkeyService,
private readonly configService: ConfigService
) {
this.queueName = this.configService.get<string>(
'orchestrator.queue.name',
'orchestrator-tasks'
);
this.retryConfig = {
maxRetries: this.configService.get<number>(
'orchestrator.queue.maxRetries',
3
),
baseDelay: this.configService.get<number>(
'orchestrator.queue.baseDelay',
1000
),
maxDelay: this.configService.get<number>(
'orchestrator.queue.maxDelay',
60000
),
};
}
async onModuleInit(): Promise<void> {
// Initialize BullMQ with Valkey connection
const connection = {
host: this.configService.get<string>('orchestrator.valkey.host', 'localhost'),
port: this.configService.get<number>('orchestrator.valkey.port', 6379),
password: this.configService.get<string>('orchestrator.valkey.password'),
};
// Create queue
this.queue = new Queue<QueuedTask>(this.queueName, {
connection,
defaultJobOptions: {
removeOnComplete: {
age: 3600, // Keep completed jobs for 1 hour
count: 100, // Keep last 100 completed jobs
},
removeOnFail: {
age: 86400, // Keep failed jobs for 24 hours
count: 1000, // Keep last 1000 failed jobs
},
},
});
// Create worker
this.worker = new Worker<QueuedTask, TaskProcessingResult>(
this.queueName,
async (job: Job<QueuedTask>) => {
return this.processTask(job);
},
{
connection,
concurrency: this.configService.get<number>(
'orchestrator.queue.concurrency',
5
),
}
);
// Setup error handlers
this.worker.on('failed', async (job, err) => {
if (job) {
await this.handleTaskFailure(job.data.taskId, err);
}
});
this.worker.on('completed', async (job) => {
if (job) {
await this.handleTaskCompletion(job.data.taskId);
}
});
}
async onModuleDestroy(): Promise<void> {
await this.worker.close();
await this.queue.close();
}
/**
* Add task to queue
*/
async addTask(
taskId: string,
context: TaskContext,
options?: AddTaskOptions
): Promise<void> {
// Validate options
const priority = options?.priority ?? 5;
const maxRetries = options?.maxRetries ?? this.retryConfig.maxRetries;
const delay = options?.delay ?? 0;
if (priority < 1 || priority > 10) {
throw new Error('Priority must be between 1 and 10');
}
if (maxRetries < 0) {
throw new Error('maxRetries must be non-negative');
}
const queuedTask: QueuedTask = {
taskId,
priority,
retries: 0,
maxRetries,
context,
};
// Add to BullMQ queue
await this.queue.add(taskId, queuedTask, {
priority: 10 - priority + 1, // BullMQ: lower number = higher priority, so invert
attempts: maxRetries + 1, // +1 for initial attempt
backoff: {
type: 'custom',
},
delay,
});
// Update task state in Valkey
await this.valkeyService.updateTaskStatus(taskId, 'pending');
// Publish event
await this.valkeyService.publishEvent({
type: 'task.queued',
timestamp: new Date().toISOString(),
taskId,
data: { priority },
});
}
/**
* Get queue statistics
*/
async getStats(): Promise<QueueStats> {
const counts = await this.queue.getJobCounts(
'waiting',
'active',
'completed',
'failed',
'delayed'
);
return {
pending: counts.waiting || 0,
active: counts.active || 0,
completed: counts.completed || 0,
failed: counts.failed || 0,
delayed: counts.delayed || 0,
};
}
/**
* Calculate exponential backoff delay
*/
calculateBackoffDelay(
attemptNumber: number,
baseDelay: number,
maxDelay: number
): number {
const delay = baseDelay * Math.pow(2, attemptNumber);
return Math.min(delay, maxDelay);
}
/**
* Pause queue processing
*/
async pause(): Promise<void> {
await this.queue.pause();
}
/**
* Resume queue processing
*/
async resume(): Promise<void> {
await this.queue.resume();
}
/**
* Remove task from queue
*/
async removeTask(taskId: string): Promise<void> {
const job = await this.queue.getJob(taskId);
if (job) {
await job.remove();
}
}
/**
* Process task (called by worker)
*/
private async processTask(
job: Job<QueuedTask>
): Promise<TaskProcessingResult> {
const { taskId } = job.data;
try {
// Update task state to executing
await this.valkeyService.updateTaskStatus(taskId, 'executing');
// Publish event
await this.valkeyService.publishEvent({
type: 'task.processing',
timestamp: new Date().toISOString(),
taskId,
data: { attempt: job.attemptsMade + 1 },
});
// Task processing will be handled by agent spawner
// For now, just mark as processing
return {
success: true,
metadata: {
attempt: job.attemptsMade + 1,
},
};
} catch (error) {
// Handle retry logic
const shouldRetry = job.attemptsMade < job.data.maxRetries;
if (shouldRetry) {
// Calculate backoff delay for next retry
const delay = this.calculateBackoffDelay(
job.attemptsMade + 1,
this.retryConfig.baseDelay,
this.retryConfig.maxDelay
);
// BullMQ will automatically retry with the backoff
await job.updateData({
...job.data,
retries: job.attemptsMade + 1,
});
await this.valkeyService.publishEvent({
type: 'task.retry',
timestamp: new Date().toISOString(),
taskId,
data: {
attempt: job.attemptsMade + 1,
nextDelay: delay,
},
});
}
throw error;
}
}
/**
* Handle task failure
*/
private async handleTaskFailure(taskId: string, error: Error): Promise<void> {
await this.valkeyService.updateTaskStatus(taskId, 'failed', undefined, error.message);
await this.valkeyService.publishEvent({
type: 'task.failed',
timestamp: new Date().toISOString(),
taskId,
error: error.message,
});
}
/**
* Handle task completion
*/
private async handleTaskCompletion(taskId: string): Promise<void> {
await this.valkeyService.updateTaskStatus(taskId, 'completed');
await this.valkeyService.publishEvent({
type: 'task.completed',
timestamp: new Date().toISOString(),
taskId,
});
}
}

View File

@@ -0,0 +1,5 @@
/**
* Queue module type exports
*/
export * from './queue.types';

View File

@@ -0,0 +1,55 @@
/**
* Queue task types
*/
import type { TaskContext } from '../../valkey/types';
/**
* Queued task interface
* Priority: 1-10 (higher = more important)
*/
export interface QueuedTask {
taskId: string;
priority: number; // 1-10
retries: number;
maxRetries: number;
context: TaskContext;
}
/**
* Queue monitoring statistics
*/
export interface QueueStats {
pending: number;
active: number;
completed: number;
failed: number;
delayed: number;
}
/**
* Queue options for adding tasks
*/
export interface AddTaskOptions {
priority?: number; // 1-10, default 5
maxRetries?: number; // default 3
delay?: number; // delay in milliseconds before processing
}
/**
* Retry configuration
*/
export interface RetryConfig {
maxRetries: number;
baseDelay: number; // base delay in milliseconds
maxDelay: number; // maximum delay cap
}
/**
* Task processing result
*/
export interface TaskProcessingResult {
success: boolean;
error?: string;
metadata?: Record<string, unknown>;
}