feat(orchestrator): MS23 agent lifecycle ingestion service (#701)
Some checks failed
ci/woodpecker/push/ci Pipeline failed

Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #701.
This commit is contained in:
2026-03-07 04:21:26 +00:00
committed by jason.woltje
parent 49fa958444
commit de6faf659e
9 changed files with 262 additions and 20 deletions

View File

@@ -27,6 +27,7 @@
"@nestjs/core": "^11.1.12", "@nestjs/core": "^11.1.12",
"@nestjs/platform-express": "^11.1.12", "@nestjs/platform-express": "^11.1.12",
"@nestjs/throttler": "^6.5.0", "@nestjs/throttler": "^6.5.0",
"@prisma/client": "^6.19.2",
"bullmq": "^5.67.2", "bullmq": "^5.67.2",
"class-transformer": "^0.5.1", "class-transformer": "^0.5.1",
"class-validator": "^0.14.1", "class-validator": "^0.14.1",

View File

@@ -0,0 +1,10 @@
import { Module } from "@nestjs/common";
import { PrismaModule } from "../prisma/prisma.module";
import { AgentIngestionService } from "./agent-ingestion.service";
@Module({
imports: [PrismaModule],
providers: [AgentIngestionService],
exports: [AgentIngestionService],
})
export class AgentIngestionModule {}

View File

@@ -0,0 +1,141 @@
import { Injectable, Logger } from "@nestjs/common";
import type { Prisma } from "@prisma/client";
import { PrismaService } from "../prisma/prisma.service";
export type AgentConversationRole = "agent" | "user" | "system" | "operator";
@Injectable()
export class AgentIngestionService {
private readonly logger = new Logger(AgentIngestionService.name);
constructor(private readonly prisma: PrismaService) {}
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
return value as Prisma.InputJsonValue;
}
async recordAgentSpawned(
agentId: string,
parentAgentId?: string,
missionId?: string,
taskId?: string,
agentType?: string
): Promise<void> {
await this.prisma.agentSessionTree.upsert({
where: { sessionId: agentId },
create: {
sessionId: agentId,
parentSessionId: parentAgentId,
missionId,
taskId,
agentType,
status: "spawning",
},
update: {
parentSessionId: parentAgentId,
missionId,
taskId,
agentType,
status: "spawning",
completedAt: null,
},
});
this.logger.debug(`Recorded spawned state for agent ${agentId}`);
}
async recordAgentStarted(agentId: string): Promise<void> {
await this.prisma.agentSessionTree.upsert({
where: { sessionId: agentId },
create: {
sessionId: agentId,
status: "running",
},
update: {
status: "running",
},
});
this.logger.debug(`Recorded running state for agent ${agentId}`);
}
async recordAgentCompleted(agentId: string): Promise<void> {
const completedAt = new Date();
await this.prisma.agentSessionTree.upsert({
where: { sessionId: agentId },
create: {
sessionId: agentId,
status: "completed",
completedAt,
},
update: {
status: "completed",
completedAt,
},
});
this.logger.debug(`Recorded completed state for agent ${agentId}`);
}
async recordAgentFailed(agentId: string, error?: string): Promise<void> {
const completedAt = new Date();
const metadata = error ? this.toJsonValue({ error }) : undefined;
await this.prisma.agentSessionTree.upsert({
where: { sessionId: agentId },
create: {
sessionId: agentId,
status: "failed",
completedAt,
...(metadata && { metadata }),
},
update: {
status: "failed",
completedAt,
...(metadata && { metadata }),
},
});
this.logger.debug(`Recorded failed state for agent ${agentId}`);
}
async recordAgentKilled(agentId: string): Promise<void> {
const completedAt = new Date();
await this.prisma.agentSessionTree.upsert({
where: { sessionId: agentId },
create: {
sessionId: agentId,
status: "killed",
completedAt,
},
update: {
status: "killed",
completedAt,
},
});
this.logger.debug(`Recorded killed state for agent ${agentId}`);
}
async recordMessage(
sessionId: string,
role: AgentConversationRole,
content: string,
provider = "internal",
metadata?: Record<string, unknown>
): Promise<void> {
await this.prisma.agentConversationMessage.create({
data: {
sessionId,
role,
content,
provider,
...(metadata && { metadata: this.toJsonValue(metadata) }),
},
});
this.logger.debug(`Recorded message for session ${sessionId}`);
}
}

View File

@@ -0,0 +1,9 @@
import { Global, Module } from "@nestjs/common";
import { PrismaService } from "./prisma.service";
@Global()
@Module({
providers: [PrismaService],
exports: [PrismaService],
})
export class PrismaModule {}

View File

@@ -0,0 +1,26 @@
import { Injectable, Logger, OnModuleDestroy, OnModuleInit } from "@nestjs/common";
import { PrismaClient } from "@prisma/client";
/**
* Lightweight Prisma service for orchestrator ingestion persistence.
*/
@Injectable()
export class PrismaService extends PrismaClient implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(PrismaService.name);
constructor() {
super({
log: process.env.NODE_ENV === "development" ? ["warn", "error"] : ["error"],
});
}
async onModuleInit(): Promise<void> {
await this.$connect();
this.logger.log("Database connection established");
}
async onModuleDestroy(): Promise<void> {
await this.$disconnect();
this.logger.log("Database connection closed");
}
}

View File

@@ -1,6 +1,7 @@
import { Injectable, Logger, Inject, forwardRef } from "@nestjs/common"; import { Injectable, Logger, Inject, Optional, forwardRef } from "@nestjs/common";
import { ValkeyService } from "../valkey/valkey.service"; import { ValkeyService } from "../valkey/valkey.service";
import { AgentSpawnerService } from "./agent-spawner.service"; import { AgentSpawnerService } from "./agent-spawner.service";
import { AgentIngestionService } from "../agent-ingestion/agent-ingestion.service";
import type { AgentState, AgentStatus, AgentEvent } from "../valkey/types"; import type { AgentState, AgentStatus, AgentEvent } from "../valkey/types";
import { isValidAgentTransition } from "../valkey/types/state.types"; import { isValidAgentTransition } from "../valkey/types/state.types";
@@ -32,7 +33,8 @@ export class AgentLifecycleService {
constructor( constructor(
private readonly valkeyService: ValkeyService, private readonly valkeyService: ValkeyService,
@Inject(forwardRef(() => AgentSpawnerService)) @Inject(forwardRef(() => AgentSpawnerService))
private readonly spawnerService: AgentSpawnerService private readonly spawnerService: AgentSpawnerService,
@Optional() private readonly agentIngestionService?: AgentIngestionService
) { ) {
this.logger.log("AgentLifecycleService initialized"); this.logger.log("AgentLifecycleService initialized");
} }
@@ -55,6 +57,25 @@ export class AgentLifecycleService {
return createdState; return createdState;
} }
private async recordLifecycleIngestion(
agentId: string,
event: "started" | "completed" | "failed" | "killed",
record: (ingestionService: AgentIngestionService) => Promise<void>
): Promise<void> {
if (!this.agentIngestionService) {
return;
}
try {
await record(this.agentIngestionService);
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(
`Failed to record agent ${event} ingestion for ${agentId}: ${errorMessage}`
);
}
}
/** /**
* Acquire a per-agent mutex to serialize state transitions. * Acquire a per-agent mutex to serialize state transitions.
* Uses promise chaining: each caller chains onto the previous lock, * Uses promise chaining: each caller chains onto the previous lock,
@@ -118,6 +139,10 @@ export class AgentLifecycleService {
// Emit event // Emit event
await this.publishStateChangeEvent("agent.running", updatedState); await this.publishStateChangeEvent("agent.running", updatedState);
await this.recordLifecycleIngestion(agentId, "started", (ingestionService) =>
ingestionService.recordAgentStarted(agentId)
);
this.logger.log(`Agent ${agentId} transitioned to running`); this.logger.log(`Agent ${agentId} transitioned to running`);
return updatedState; return updatedState;
}); });
@@ -155,6 +180,10 @@ export class AgentLifecycleService {
// Emit event // Emit event
await this.publishStateChangeEvent("agent.completed", updatedState); await this.publishStateChangeEvent("agent.completed", updatedState);
await this.recordLifecycleIngestion(agentId, "completed", (ingestionService) =>
ingestionService.recordAgentCompleted(agentId)
);
// Schedule session cleanup // Schedule session cleanup
this.spawnerService.scheduleSessionCleanup(agentId); this.spawnerService.scheduleSessionCleanup(agentId);
@@ -192,6 +221,10 @@ export class AgentLifecycleService {
// Emit event // Emit event
await this.publishStateChangeEvent("agent.failed", updatedState, error); await this.publishStateChangeEvent("agent.failed", updatedState, error);
await this.recordLifecycleIngestion(agentId, "failed", (ingestionService) =>
ingestionService.recordAgentFailed(agentId, error)
);
// Schedule session cleanup // Schedule session cleanup
this.spawnerService.scheduleSessionCleanup(agentId); this.spawnerService.scheduleSessionCleanup(agentId);
@@ -228,6 +261,10 @@ export class AgentLifecycleService {
// Emit event // Emit event
await this.publishStateChangeEvent("agent.killed", updatedState); await this.publishStateChangeEvent("agent.killed", updatedState);
await this.recordLifecycleIngestion(agentId, "killed", (ingestionService) =>
ingestionService.recordAgentKilled(agentId)
);
// Schedule session cleanup // Schedule session cleanup
this.spawnerService.scheduleSessionCleanup(agentId); this.spawnerService.scheduleSessionCleanup(agentId);

View File

@@ -1,4 +1,11 @@
import { Injectable, Logger, HttpException, HttpStatus, OnModuleDestroy } from "@nestjs/common"; import {
Injectable,
Logger,
HttpException,
HttpStatus,
OnModuleDestroy,
Optional,
} from "@nestjs/common";
import { ConfigService } from "@nestjs/config"; import { ConfigService } from "@nestjs/config";
import Anthropic from "@anthropic-ai/sdk"; import Anthropic from "@anthropic-ai/sdk";
import { randomUUID } from "crypto"; import { randomUUID } from "crypto";
@@ -8,6 +15,7 @@ import {
AgentSession, AgentSession,
AgentType, AgentType,
} from "./types/agent-spawner.types"; } from "./types/agent-spawner.types";
import { AgentIngestionService } from "../agent-ingestion/agent-ingestion.service";
/** /**
* Default delay in milliseconds before cleaning up sessions after terminal states * Default delay in milliseconds before cleaning up sessions after terminal states
@@ -30,7 +38,10 @@ export class AgentSpawnerService implements OnModuleDestroy {
private readonly sessionCleanupDelayMs: number; private readonly sessionCleanupDelayMs: number;
private readonly cleanupTimers = new Map<string, NodeJS.Timeout>(); private readonly cleanupTimers = new Map<string, NodeJS.Timeout>();
constructor(private readonly configService: ConfigService) { constructor(
private readonly configService: ConfigService,
@Optional() private readonly agentIngestionService?: AgentIngestionService
) {
const configuredProvider = this.configService.get<string>("orchestrator.aiProvider"); const configuredProvider = this.configService.get<string>("orchestrator.aiProvider");
this.aiProvider = this.normalizeAiProvider(configuredProvider); this.aiProvider = this.normalizeAiProvider(configuredProvider);
@@ -98,6 +109,19 @@ export class AgentSpawnerService implements OnModuleDestroy {
this.cleanupTimers.clear(); this.cleanupTimers.clear();
} }
private recordSpawnedAgentIngestion(agentId: string, request: SpawnAgentRequest): void {
if (!this.agentIngestionService) {
return;
}
void this.agentIngestionService
.recordAgentSpawned(agentId, undefined, undefined, request.taskId, request.agentType)
.catch((error: unknown) => {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Failed to record spawned ingestion for ${agentId}: ${errorMessage}`);
});
}
/** /**
* Spawn a new agent with the given configuration * Spawn a new agent with the given configuration
* @param request Agent spawn request * @param request Agent spawn request
@@ -130,6 +154,8 @@ export class AgentSpawnerService implements OnModuleDestroy {
// Store session // Store session
this.sessions.set(agentId, session); this.sessions.set(agentId, session);
this.recordSpawnedAgentIngestion(agentId, request);
this.logger.log(`Agent spawned successfully: ${agentId} (type: ${request.agentType})`); this.logger.log(`Agent spawned successfully: ${agentId} (type: ${request.agentType})`);
// NOTE: Actual Claude SDK integration will be implemented in next iteration (see issue #TBD) // NOTE: Actual Claude SDK integration will be implemented in next iteration (see issue #TBD)

View File

@@ -3,9 +3,10 @@ import { AgentSpawnerService } from "./agent-spawner.service";
import { AgentLifecycleService } from "./agent-lifecycle.service"; import { AgentLifecycleService } from "./agent-lifecycle.service";
import { DockerSandboxService } from "./docker-sandbox.service"; import { DockerSandboxService } from "./docker-sandbox.service";
import { ValkeyModule } from "../valkey/valkey.module"; import { ValkeyModule } from "../valkey/valkey.module";
import { AgentIngestionModule } from "../agent-ingestion/agent-ingestion.module";
@Module({ @Module({
imports: [ValkeyModule], imports: [ValkeyModule, AgentIngestionModule],
providers: [AgentSpawnerService, AgentLifecycleService, DockerSandboxService], providers: [AgentSpawnerService, AgentLifecycleService, DockerSandboxService],
exports: [AgentSpawnerService, AgentLifecycleService, DockerSandboxService], exports: [AgentSpawnerService, AgentLifecycleService, DockerSandboxService],
}) })

21
pnpm-lock.yaml generated
View File

@@ -337,6 +337,9 @@ importers:
'@nestjs/throttler': '@nestjs/throttler':
specifier: ^6.5.0 specifier: ^6.5.0
version: 6.5.0(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.12)(reflect-metadata@0.2.2) version: 6.5.0(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.12)(reflect-metadata@0.2.2)
'@prisma/client':
specifier: ^6.19.2
version: 6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3)
bullmq: bullmq:
specifier: ^5.67.2 specifier: ^5.67.2
version: 5.67.2 version: 5.67.2
@@ -8005,7 +8008,7 @@ snapshots:
chalk: 5.6.2 chalk: 5.6.2
commander: 12.1.0 commander: 12.1.0
dotenv: 17.2.4 dotenv: 17.2.4
drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))
open: 10.2.0 open: 10.2.0
pg: 8.17.2 pg: 8.17.2
prettier: 3.8.1 prettier: 3.8.1
@@ -11345,7 +11348,7 @@ snapshots:
optionalDependencies: optionalDependencies:
'@prisma/client': 5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) '@prisma/client': 5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))
better-sqlite3: 12.6.2 better-sqlite3: 12.6.2
drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))
next: 16.1.6(@babel/core@7.28.6)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4) next: 16.1.6(@babel/core@7.28.6)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4)
pg: 8.17.2 pg: 8.17.2
prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3) prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3)
@@ -11370,7 +11373,7 @@ snapshots:
optionalDependencies: optionalDependencies:
'@prisma/client': 6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3) '@prisma/client': 6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3)
better-sqlite3: 12.6.2 better-sqlite3: 12.6.2
drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))
next: 16.1.6(@babel/core@7.28.6)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4) next: 16.1.6(@babel/core@7.28.6)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4)
pg: 8.17.2 pg: 8.17.2
prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3) prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3)
@@ -12194,17 +12197,6 @@ snapshots:
dotenv@17.2.4: {} dotenv@17.2.4: {}
drizzle-orm@0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)):
optionalDependencies:
'@opentelemetry/api': 1.9.0
'@prisma/client': 5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))
'@types/pg': 8.16.0
better-sqlite3: 12.6.2
kysely: 0.28.10
pg: 8.17.2
postgres: 3.4.8
prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3)
drizzle-orm@0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)): drizzle-orm@0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)):
optionalDependencies: optionalDependencies:
'@opentelemetry/api': 1.9.0 '@opentelemetry/api': 1.9.0
@@ -12215,7 +12207,6 @@ snapshots:
pg: 8.17.2 pg: 8.17.2
postgres: 3.4.8 postgres: 3.4.8
prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3) prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3)
optional: true
dunder-proto@1.0.1: dunder-proto@1.0.1:
dependencies: dependencies: