From d6c7bf04d5e273226da3406e0d8fc5693189ed49 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Fri, 6 Mar 2026 20:24:13 -0600 Subject: [PATCH] feat(orchestrator): wire agent lifecycle events to MS23 ingestion service Add AgentIngestionService that writes to AgentConversationMessage and AgentSessionTree on every agent lifecycle event (spawned, started, completed, failed, killed). Wire into AgentSpawnerService and AgentLifecycleService. Partial #693 --- apps/orchestrator/package.json | 1 + .../agent-ingestion/agent-ingestion.module.ts | 10 ++ .../agent-ingestion.service.ts | 141 ++++++++++++++++++ apps/orchestrator/src/prisma/prisma.module.ts | 9 ++ .../orchestrator/src/prisma/prisma.service.ts | 26 ++++ .../src/spawner/agent-lifecycle.service.ts | 41 ++++- .../src/spawner/agent-spawner.service.ts | 30 +++- .../src/spawner/spawner.module.ts | 3 +- pnpm-lock.yaml | 21 +-- 9 files changed, 262 insertions(+), 20 deletions(-) create mode 100644 apps/orchestrator/src/agent-ingestion/agent-ingestion.module.ts create mode 100644 apps/orchestrator/src/agent-ingestion/agent-ingestion.service.ts create mode 100644 apps/orchestrator/src/prisma/prisma.module.ts create mode 100644 apps/orchestrator/src/prisma/prisma.service.ts diff --git a/apps/orchestrator/package.json b/apps/orchestrator/package.json index 95bf1a8..80ea663 100644 --- a/apps/orchestrator/package.json +++ b/apps/orchestrator/package.json @@ -27,6 +27,7 @@ "@nestjs/core": "^11.1.12", "@nestjs/platform-express": "^11.1.12", "@nestjs/throttler": "^6.5.0", + "@prisma/client": "^6.19.2", "bullmq": "^5.67.2", "class-transformer": "^0.5.1", "class-validator": "^0.14.1", diff --git a/apps/orchestrator/src/agent-ingestion/agent-ingestion.module.ts b/apps/orchestrator/src/agent-ingestion/agent-ingestion.module.ts new file mode 100644 index 0000000..9350ee0 --- /dev/null +++ b/apps/orchestrator/src/agent-ingestion/agent-ingestion.module.ts @@ -0,0 +1,10 @@ +import { Module } from "@nestjs/common"; +import { PrismaModule } from "../prisma/prisma.module"; +import { AgentIngestionService } from "./agent-ingestion.service"; + +@Module({ + imports: [PrismaModule], + providers: [AgentIngestionService], + exports: [AgentIngestionService], +}) +export class AgentIngestionModule {} diff --git a/apps/orchestrator/src/agent-ingestion/agent-ingestion.service.ts b/apps/orchestrator/src/agent-ingestion/agent-ingestion.service.ts new file mode 100644 index 0000000..64412ab --- /dev/null +++ b/apps/orchestrator/src/agent-ingestion/agent-ingestion.service.ts @@ -0,0 +1,141 @@ +import { Injectable, Logger } from "@nestjs/common"; +import type { Prisma } from "@prisma/client"; +import { PrismaService } from "../prisma/prisma.service"; + +export type AgentConversationRole = "agent" | "user" | "system" | "operator"; + +@Injectable() +export class AgentIngestionService { + private readonly logger = new Logger(AgentIngestionService.name); + + constructor(private readonly prisma: PrismaService) {} + + private toJsonValue(value: Record): Prisma.InputJsonValue { + return value as Prisma.InputJsonValue; + } + + async recordAgentSpawned( + agentId: string, + parentAgentId?: string, + missionId?: string, + taskId?: string, + agentType?: string + ): Promise { + await this.prisma.agentSessionTree.upsert({ + where: { sessionId: agentId }, + create: { + sessionId: agentId, + parentSessionId: parentAgentId, + missionId, + taskId, + agentType, + status: "spawning", + }, + update: { + parentSessionId: parentAgentId, + missionId, + taskId, + agentType, + status: "spawning", + completedAt: null, + }, + }); + + this.logger.debug(`Recorded spawned state for agent ${agentId}`); + } + + async recordAgentStarted(agentId: string): Promise { + await this.prisma.agentSessionTree.upsert({ + where: { sessionId: agentId }, + create: { + sessionId: agentId, + status: "running", + }, + update: { + status: "running", + }, + }); + + this.logger.debug(`Recorded running state for agent ${agentId}`); + } + + async recordAgentCompleted(agentId: string): Promise { + const completedAt = new Date(); + + await this.prisma.agentSessionTree.upsert({ + where: { sessionId: agentId }, + create: { + sessionId: agentId, + status: "completed", + completedAt, + }, + update: { + status: "completed", + completedAt, + }, + }); + + this.logger.debug(`Recorded completed state for agent ${agentId}`); + } + + async recordAgentFailed(agentId: string, error?: string): Promise { + const completedAt = new Date(); + const metadata = error ? this.toJsonValue({ error }) : undefined; + + await this.prisma.agentSessionTree.upsert({ + where: { sessionId: agentId }, + create: { + sessionId: agentId, + status: "failed", + completedAt, + ...(metadata && { metadata }), + }, + update: { + status: "failed", + completedAt, + ...(metadata && { metadata }), + }, + }); + + this.logger.debug(`Recorded failed state for agent ${agentId}`); + } + + async recordAgentKilled(agentId: string): Promise { + const completedAt = new Date(); + + await this.prisma.agentSessionTree.upsert({ + where: { sessionId: agentId }, + create: { + sessionId: agentId, + status: "killed", + completedAt, + }, + update: { + status: "killed", + completedAt, + }, + }); + + this.logger.debug(`Recorded killed state for agent ${agentId}`); + } + + async recordMessage( + sessionId: string, + role: AgentConversationRole, + content: string, + provider = "internal", + metadata?: Record + ): Promise { + await this.prisma.agentConversationMessage.create({ + data: { + sessionId, + role, + content, + provider, + ...(metadata && { metadata: this.toJsonValue(metadata) }), + }, + }); + + this.logger.debug(`Recorded message for session ${sessionId}`); + } +} diff --git a/apps/orchestrator/src/prisma/prisma.module.ts b/apps/orchestrator/src/prisma/prisma.module.ts new file mode 100644 index 0000000..1edbf95 --- /dev/null +++ b/apps/orchestrator/src/prisma/prisma.module.ts @@ -0,0 +1,9 @@ +import { Global, Module } from "@nestjs/common"; +import { PrismaService } from "./prisma.service"; + +@Global() +@Module({ + providers: [PrismaService], + exports: [PrismaService], +}) +export class PrismaModule {} diff --git a/apps/orchestrator/src/prisma/prisma.service.ts b/apps/orchestrator/src/prisma/prisma.service.ts new file mode 100644 index 0000000..869c916 --- /dev/null +++ b/apps/orchestrator/src/prisma/prisma.service.ts @@ -0,0 +1,26 @@ +import { Injectable, Logger, OnModuleDestroy, OnModuleInit } from "@nestjs/common"; +import { PrismaClient } from "@prisma/client"; + +/** + * Lightweight Prisma service for orchestrator ingestion persistence. + */ +@Injectable() +export class PrismaService extends PrismaClient implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(PrismaService.name); + + constructor() { + super({ + log: process.env.NODE_ENV === "development" ? ["warn", "error"] : ["error"], + }); + } + + async onModuleInit(): Promise { + await this.$connect(); + this.logger.log("Database connection established"); + } + + async onModuleDestroy(): Promise { + await this.$disconnect(); + this.logger.log("Database connection closed"); + } +} diff --git a/apps/orchestrator/src/spawner/agent-lifecycle.service.ts b/apps/orchestrator/src/spawner/agent-lifecycle.service.ts index cc9b57c..1505544 100644 --- a/apps/orchestrator/src/spawner/agent-lifecycle.service.ts +++ b/apps/orchestrator/src/spawner/agent-lifecycle.service.ts @@ -1,6 +1,7 @@ -import { Injectable, Logger, Inject, forwardRef } from "@nestjs/common"; +import { Injectable, Logger, Inject, Optional, forwardRef } from "@nestjs/common"; import { ValkeyService } from "../valkey/valkey.service"; import { AgentSpawnerService } from "./agent-spawner.service"; +import { AgentIngestionService } from "../agent-ingestion/agent-ingestion.service"; import type { AgentState, AgentStatus, AgentEvent } from "../valkey/types"; import { isValidAgentTransition } from "../valkey/types/state.types"; @@ -32,7 +33,8 @@ export class AgentLifecycleService { constructor( private readonly valkeyService: ValkeyService, @Inject(forwardRef(() => AgentSpawnerService)) - private readonly spawnerService: AgentSpawnerService + private readonly spawnerService: AgentSpawnerService, + @Optional() private readonly agentIngestionService?: AgentIngestionService ) { this.logger.log("AgentLifecycleService initialized"); } @@ -55,6 +57,25 @@ export class AgentLifecycleService { return createdState; } + private async recordLifecycleIngestion( + agentId: string, + event: "started" | "completed" | "failed" | "killed", + record: (ingestionService: AgentIngestionService) => Promise + ): Promise { + if (!this.agentIngestionService) { + return; + } + + try { + await record(this.agentIngestionService); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.error( + `Failed to record agent ${event} ingestion for ${agentId}: ${errorMessage}` + ); + } + } + /** * Acquire a per-agent mutex to serialize state transitions. * Uses promise chaining: each caller chains onto the previous lock, @@ -118,6 +139,10 @@ export class AgentLifecycleService { // Emit event await this.publishStateChangeEvent("agent.running", updatedState); + await this.recordLifecycleIngestion(agentId, "started", (ingestionService) => + ingestionService.recordAgentStarted(agentId) + ); + this.logger.log(`Agent ${agentId} transitioned to running`); return updatedState; }); @@ -155,6 +180,10 @@ export class AgentLifecycleService { // Emit event await this.publishStateChangeEvent("agent.completed", updatedState); + await this.recordLifecycleIngestion(agentId, "completed", (ingestionService) => + ingestionService.recordAgentCompleted(agentId) + ); + // Schedule session cleanup this.spawnerService.scheduleSessionCleanup(agentId); @@ -192,6 +221,10 @@ export class AgentLifecycleService { // Emit event await this.publishStateChangeEvent("agent.failed", updatedState, error); + await this.recordLifecycleIngestion(agentId, "failed", (ingestionService) => + ingestionService.recordAgentFailed(agentId, error) + ); + // Schedule session cleanup this.spawnerService.scheduleSessionCleanup(agentId); @@ -228,6 +261,10 @@ export class AgentLifecycleService { // Emit event await this.publishStateChangeEvent("agent.killed", updatedState); + await this.recordLifecycleIngestion(agentId, "killed", (ingestionService) => + ingestionService.recordAgentKilled(agentId) + ); + // Schedule session cleanup this.spawnerService.scheduleSessionCleanup(agentId); diff --git a/apps/orchestrator/src/spawner/agent-spawner.service.ts b/apps/orchestrator/src/spawner/agent-spawner.service.ts index c91971a..9c9ebba 100644 --- a/apps/orchestrator/src/spawner/agent-spawner.service.ts +++ b/apps/orchestrator/src/spawner/agent-spawner.service.ts @@ -1,4 +1,11 @@ -import { Injectable, Logger, HttpException, HttpStatus, OnModuleDestroy } from "@nestjs/common"; +import { + Injectable, + Logger, + HttpException, + HttpStatus, + OnModuleDestroy, + Optional, +} from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; import Anthropic from "@anthropic-ai/sdk"; import { randomUUID } from "crypto"; @@ -8,6 +15,7 @@ import { AgentSession, AgentType, } from "./types/agent-spawner.types"; +import { AgentIngestionService } from "../agent-ingestion/agent-ingestion.service"; /** * Default delay in milliseconds before cleaning up sessions after terminal states @@ -30,7 +38,10 @@ export class AgentSpawnerService implements OnModuleDestroy { private readonly sessionCleanupDelayMs: number; private readonly cleanupTimers = new Map(); - constructor(private readonly configService: ConfigService) { + constructor( + private readonly configService: ConfigService, + @Optional() private readonly agentIngestionService?: AgentIngestionService + ) { const configuredProvider = this.configService.get("orchestrator.aiProvider"); this.aiProvider = this.normalizeAiProvider(configuredProvider); @@ -98,6 +109,19 @@ export class AgentSpawnerService implements OnModuleDestroy { this.cleanupTimers.clear(); } + private recordSpawnedAgentIngestion(agentId: string, request: SpawnAgentRequest): void { + if (!this.agentIngestionService) { + return; + } + + void this.agentIngestionService + .recordAgentSpawned(agentId, undefined, undefined, request.taskId, request.agentType) + .catch((error: unknown) => { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.error(`Failed to record spawned ingestion for ${agentId}: ${errorMessage}`); + }); + } + /** * Spawn a new agent with the given configuration * @param request Agent spawn request @@ -130,6 +154,8 @@ export class AgentSpawnerService implements OnModuleDestroy { // Store session this.sessions.set(agentId, session); + this.recordSpawnedAgentIngestion(agentId, request); + this.logger.log(`Agent spawned successfully: ${agentId} (type: ${request.agentType})`); // NOTE: Actual Claude SDK integration will be implemented in next iteration (see issue #TBD) diff --git a/apps/orchestrator/src/spawner/spawner.module.ts b/apps/orchestrator/src/spawner/spawner.module.ts index b44c5f5..8d4a601 100644 --- a/apps/orchestrator/src/spawner/spawner.module.ts +++ b/apps/orchestrator/src/spawner/spawner.module.ts @@ -3,9 +3,10 @@ import { AgentSpawnerService } from "./agent-spawner.service"; import { AgentLifecycleService } from "./agent-lifecycle.service"; import { DockerSandboxService } from "./docker-sandbox.service"; import { ValkeyModule } from "../valkey/valkey.module"; +import { AgentIngestionModule } from "../agent-ingestion/agent-ingestion.module"; @Module({ - imports: [ValkeyModule], + imports: [ValkeyModule, AgentIngestionModule], providers: [AgentSpawnerService, AgentLifecycleService, DockerSandboxService], exports: [AgentSpawnerService, AgentLifecycleService, DockerSandboxService], }) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 97e9d24..3e6acb6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -337,6 +337,9 @@ importers: '@nestjs/throttler': specifier: ^6.5.0 version: 6.5.0(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.12)(reflect-metadata@0.2.2) + '@prisma/client': + specifier: ^6.19.2 + version: 6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3) bullmq: specifier: ^5.67.2 version: 5.67.2 @@ -8005,7 +8008,7 @@ snapshots: chalk: 5.6.2 commander: 12.1.0 dotenv: 17.2.4 - drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) + drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) open: 10.2.0 pg: 8.17.2 prettier: 3.8.1 @@ -11345,7 +11348,7 @@ snapshots: optionalDependencies: '@prisma/client': 5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) better-sqlite3: 12.6.2 - drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) + drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) next: 16.1.6(@babel/core@7.28.6)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4) pg: 8.17.2 prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3) @@ -11370,7 +11373,7 @@ snapshots: optionalDependencies: '@prisma/client': 6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3) better-sqlite3: 12.6.2 - drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) + drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) next: 16.1.6(@babel/core@7.28.6)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4) pg: 8.17.2 prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3) @@ -12194,17 +12197,6 @@ snapshots: dotenv@17.2.4: {} - drizzle-orm@0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)): - optionalDependencies: - '@opentelemetry/api': 1.9.0 - '@prisma/client': 5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) - '@types/pg': 8.16.0 - better-sqlite3: 12.6.2 - kysely: 0.28.10 - pg: 8.17.2 - postgres: 3.4.8 - prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3) - drizzle-orm@0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)): optionalDependencies: '@opentelemetry/api': 1.9.0 @@ -12215,7 +12207,6 @@ snapshots: pg: 8.17.2 postgres: 3.4.8 prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3) - optional: true dunder-proto@1.0.1: dependencies: