From de6faf659ee12971349335a94d6b4e22b5240d5c Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sat, 7 Mar 2026 04:21:26 +0000 Subject: [PATCH] feat(orchestrator): MS23 agent lifecycle ingestion service (#701) Co-authored-by: Jason Woltje Co-committed-by: Jason Woltje --- apps/orchestrator/package.json | 1 + .../agent-ingestion/agent-ingestion.module.ts | 10 ++ .../agent-ingestion.service.ts | 141 ++++++++++++++++++ apps/orchestrator/src/prisma/prisma.module.ts | 9 ++ .../orchestrator/src/prisma/prisma.service.ts | 26 ++++ .../src/spawner/agent-lifecycle.service.ts | 41 ++++- .../src/spawner/agent-spawner.service.ts | 30 +++- .../src/spawner/spawner.module.ts | 3 +- pnpm-lock.yaml | 21 +-- 9 files changed, 262 insertions(+), 20 deletions(-) create mode 100644 apps/orchestrator/src/agent-ingestion/agent-ingestion.module.ts create mode 100644 apps/orchestrator/src/agent-ingestion/agent-ingestion.service.ts create mode 100644 apps/orchestrator/src/prisma/prisma.module.ts create mode 100644 apps/orchestrator/src/prisma/prisma.service.ts diff --git a/apps/orchestrator/package.json b/apps/orchestrator/package.json index 95bf1a8..80ea663 100644 --- a/apps/orchestrator/package.json +++ b/apps/orchestrator/package.json @@ -27,6 +27,7 @@ "@nestjs/core": "^11.1.12", "@nestjs/platform-express": "^11.1.12", "@nestjs/throttler": "^6.5.0", + "@prisma/client": "^6.19.2", "bullmq": "^5.67.2", "class-transformer": "^0.5.1", "class-validator": "^0.14.1", diff --git a/apps/orchestrator/src/agent-ingestion/agent-ingestion.module.ts b/apps/orchestrator/src/agent-ingestion/agent-ingestion.module.ts new file mode 100644 index 0000000..9350ee0 --- /dev/null +++ b/apps/orchestrator/src/agent-ingestion/agent-ingestion.module.ts @@ -0,0 +1,10 @@ +import { Module } from "@nestjs/common"; +import { PrismaModule } from "../prisma/prisma.module"; +import { AgentIngestionService } from "./agent-ingestion.service"; + +@Module({ + imports: [PrismaModule], + providers: [AgentIngestionService], + exports: [AgentIngestionService], +}) +export class AgentIngestionModule {} diff --git a/apps/orchestrator/src/agent-ingestion/agent-ingestion.service.ts b/apps/orchestrator/src/agent-ingestion/agent-ingestion.service.ts new file mode 100644 index 0000000..64412ab --- /dev/null +++ b/apps/orchestrator/src/agent-ingestion/agent-ingestion.service.ts @@ -0,0 +1,141 @@ +import { Injectable, Logger } from "@nestjs/common"; +import type { Prisma } from "@prisma/client"; +import { PrismaService } from "../prisma/prisma.service"; + +export type AgentConversationRole = "agent" | "user" | "system" | "operator"; + +@Injectable() +export class AgentIngestionService { + private readonly logger = new Logger(AgentIngestionService.name); + + constructor(private readonly prisma: PrismaService) {} + + private toJsonValue(value: Record): Prisma.InputJsonValue { + return value as Prisma.InputJsonValue; + } + + async recordAgentSpawned( + agentId: string, + parentAgentId?: string, + missionId?: string, + taskId?: string, + agentType?: string + ): Promise { + await this.prisma.agentSessionTree.upsert({ + where: { sessionId: agentId }, + create: { + sessionId: agentId, + parentSessionId: parentAgentId, + missionId, + taskId, + agentType, + status: "spawning", + }, + update: { + parentSessionId: parentAgentId, + missionId, + taskId, + agentType, + status: "spawning", + completedAt: null, + }, + }); + + this.logger.debug(`Recorded spawned state for agent ${agentId}`); + } + + async recordAgentStarted(agentId: string): Promise { + await this.prisma.agentSessionTree.upsert({ + where: { sessionId: agentId }, + create: { + sessionId: agentId, + status: "running", + }, + update: { + status: "running", + }, + }); + + this.logger.debug(`Recorded running state for agent ${agentId}`); + } + + async recordAgentCompleted(agentId: string): Promise { + const completedAt = new Date(); + + await this.prisma.agentSessionTree.upsert({ + where: { sessionId: agentId }, + create: { + sessionId: agentId, + status: "completed", + completedAt, + }, + update: { + status: "completed", + completedAt, + }, + }); + + this.logger.debug(`Recorded completed state for agent ${agentId}`); + } + + async recordAgentFailed(agentId: string, error?: string): Promise { + const completedAt = new Date(); + const metadata = error ? this.toJsonValue({ error }) : undefined; + + await this.prisma.agentSessionTree.upsert({ + where: { sessionId: agentId }, + create: { + sessionId: agentId, + status: "failed", + completedAt, + ...(metadata && { metadata }), + }, + update: { + status: "failed", + completedAt, + ...(metadata && { metadata }), + }, + }); + + this.logger.debug(`Recorded failed state for agent ${agentId}`); + } + + async recordAgentKilled(agentId: string): Promise { + const completedAt = new Date(); + + await this.prisma.agentSessionTree.upsert({ + where: { sessionId: agentId }, + create: { + sessionId: agentId, + status: "killed", + completedAt, + }, + update: { + status: "killed", + completedAt, + }, + }); + + this.logger.debug(`Recorded killed state for agent ${agentId}`); + } + + async recordMessage( + sessionId: string, + role: AgentConversationRole, + content: string, + provider = "internal", + metadata?: Record + ): Promise { + await this.prisma.agentConversationMessage.create({ + data: { + sessionId, + role, + content, + provider, + ...(metadata && { metadata: this.toJsonValue(metadata) }), + }, + }); + + this.logger.debug(`Recorded message for session ${sessionId}`); + } +} diff --git a/apps/orchestrator/src/prisma/prisma.module.ts b/apps/orchestrator/src/prisma/prisma.module.ts new file mode 100644 index 0000000..1edbf95 --- /dev/null +++ b/apps/orchestrator/src/prisma/prisma.module.ts @@ -0,0 +1,9 @@ +import { Global, Module } from "@nestjs/common"; +import { PrismaService } from "./prisma.service"; + +@Global() +@Module({ + providers: [PrismaService], + exports: [PrismaService], +}) +export class PrismaModule {} diff --git a/apps/orchestrator/src/prisma/prisma.service.ts b/apps/orchestrator/src/prisma/prisma.service.ts new file mode 100644 index 0000000..869c916 --- /dev/null +++ b/apps/orchestrator/src/prisma/prisma.service.ts @@ -0,0 +1,26 @@ +import { Injectable, Logger, OnModuleDestroy, OnModuleInit } from "@nestjs/common"; +import { PrismaClient } from "@prisma/client"; + +/** + * Lightweight Prisma service for orchestrator ingestion persistence. + */ +@Injectable() +export class PrismaService extends PrismaClient implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(PrismaService.name); + + constructor() { + super({ + log: process.env.NODE_ENV === "development" ? ["warn", "error"] : ["error"], + }); + } + + async onModuleInit(): Promise { + await this.$connect(); + this.logger.log("Database connection established"); + } + + async onModuleDestroy(): Promise { + await this.$disconnect(); + this.logger.log("Database connection closed"); + } +} diff --git a/apps/orchestrator/src/spawner/agent-lifecycle.service.ts b/apps/orchestrator/src/spawner/agent-lifecycle.service.ts index cc9b57c..1505544 100644 --- a/apps/orchestrator/src/spawner/agent-lifecycle.service.ts +++ b/apps/orchestrator/src/spawner/agent-lifecycle.service.ts @@ -1,6 +1,7 @@ -import { Injectable, Logger, Inject, forwardRef } from "@nestjs/common"; +import { Injectable, Logger, Inject, Optional, forwardRef } from "@nestjs/common"; import { ValkeyService } from "../valkey/valkey.service"; import { AgentSpawnerService } from "./agent-spawner.service"; +import { AgentIngestionService } from "../agent-ingestion/agent-ingestion.service"; import type { AgentState, AgentStatus, AgentEvent } from "../valkey/types"; import { isValidAgentTransition } from "../valkey/types/state.types"; @@ -32,7 +33,8 @@ export class AgentLifecycleService { constructor( private readonly valkeyService: ValkeyService, @Inject(forwardRef(() => AgentSpawnerService)) - private readonly spawnerService: AgentSpawnerService + private readonly spawnerService: AgentSpawnerService, + @Optional() private readonly agentIngestionService?: AgentIngestionService ) { this.logger.log("AgentLifecycleService initialized"); } @@ -55,6 +57,25 @@ export class AgentLifecycleService { return createdState; } + private async recordLifecycleIngestion( + agentId: string, + event: "started" | "completed" | "failed" | "killed", + record: (ingestionService: AgentIngestionService) => Promise + ): Promise { + if (!this.agentIngestionService) { + return; + } + + try { + await record(this.agentIngestionService); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.error( + `Failed to record agent ${event} ingestion for ${agentId}: ${errorMessage}` + ); + } + } + /** * Acquire a per-agent mutex to serialize state transitions. * Uses promise chaining: each caller chains onto the previous lock, @@ -118,6 +139,10 @@ export class AgentLifecycleService { // Emit event await this.publishStateChangeEvent("agent.running", updatedState); + await this.recordLifecycleIngestion(agentId, "started", (ingestionService) => + ingestionService.recordAgentStarted(agentId) + ); + this.logger.log(`Agent ${agentId} transitioned to running`); return updatedState; }); @@ -155,6 +180,10 @@ export class AgentLifecycleService { // Emit event await this.publishStateChangeEvent("agent.completed", updatedState); + await this.recordLifecycleIngestion(agentId, "completed", (ingestionService) => + ingestionService.recordAgentCompleted(agentId) + ); + // Schedule session cleanup this.spawnerService.scheduleSessionCleanup(agentId); @@ -192,6 +221,10 @@ export class AgentLifecycleService { // Emit event await this.publishStateChangeEvent("agent.failed", updatedState, error); + await this.recordLifecycleIngestion(agentId, "failed", (ingestionService) => + ingestionService.recordAgentFailed(agentId, error) + ); + // Schedule session cleanup this.spawnerService.scheduleSessionCleanup(agentId); @@ -228,6 +261,10 @@ export class AgentLifecycleService { // Emit event await this.publishStateChangeEvent("agent.killed", updatedState); + await this.recordLifecycleIngestion(agentId, "killed", (ingestionService) => + ingestionService.recordAgentKilled(agentId) + ); + // Schedule session cleanup this.spawnerService.scheduleSessionCleanup(agentId); diff --git a/apps/orchestrator/src/spawner/agent-spawner.service.ts b/apps/orchestrator/src/spawner/agent-spawner.service.ts index c91971a..9c9ebba 100644 --- a/apps/orchestrator/src/spawner/agent-spawner.service.ts +++ b/apps/orchestrator/src/spawner/agent-spawner.service.ts @@ -1,4 +1,11 @@ -import { Injectable, Logger, HttpException, HttpStatus, OnModuleDestroy } from "@nestjs/common"; +import { + Injectable, + Logger, + HttpException, + HttpStatus, + OnModuleDestroy, + Optional, +} from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; import Anthropic from "@anthropic-ai/sdk"; import { randomUUID } from "crypto"; @@ -8,6 +15,7 @@ import { AgentSession, AgentType, } from "./types/agent-spawner.types"; +import { AgentIngestionService } from "../agent-ingestion/agent-ingestion.service"; /** * Default delay in milliseconds before cleaning up sessions after terminal states @@ -30,7 +38,10 @@ export class AgentSpawnerService implements OnModuleDestroy { private readonly sessionCleanupDelayMs: number; private readonly cleanupTimers = new Map(); - constructor(private readonly configService: ConfigService) { + constructor( + private readonly configService: ConfigService, + @Optional() private readonly agentIngestionService?: AgentIngestionService + ) { const configuredProvider = this.configService.get("orchestrator.aiProvider"); this.aiProvider = this.normalizeAiProvider(configuredProvider); @@ -98,6 +109,19 @@ export class AgentSpawnerService implements OnModuleDestroy { this.cleanupTimers.clear(); } + private recordSpawnedAgentIngestion(agentId: string, request: SpawnAgentRequest): void { + if (!this.agentIngestionService) { + return; + } + + void this.agentIngestionService + .recordAgentSpawned(agentId, undefined, undefined, request.taskId, request.agentType) + .catch((error: unknown) => { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.error(`Failed to record spawned ingestion for ${agentId}: ${errorMessage}`); + }); + } + /** * Spawn a new agent with the given configuration * @param request Agent spawn request @@ -130,6 +154,8 @@ export class AgentSpawnerService implements OnModuleDestroy { // Store session this.sessions.set(agentId, session); + this.recordSpawnedAgentIngestion(agentId, request); + this.logger.log(`Agent spawned successfully: ${agentId} (type: ${request.agentType})`); // NOTE: Actual Claude SDK integration will be implemented in next iteration (see issue #TBD) diff --git a/apps/orchestrator/src/spawner/spawner.module.ts b/apps/orchestrator/src/spawner/spawner.module.ts index b44c5f5..8d4a601 100644 --- a/apps/orchestrator/src/spawner/spawner.module.ts +++ b/apps/orchestrator/src/spawner/spawner.module.ts @@ -3,9 +3,10 @@ import { AgentSpawnerService } from "./agent-spawner.service"; import { AgentLifecycleService } from "./agent-lifecycle.service"; import { DockerSandboxService } from "./docker-sandbox.service"; import { ValkeyModule } from "../valkey/valkey.module"; +import { AgentIngestionModule } from "../agent-ingestion/agent-ingestion.module"; @Module({ - imports: [ValkeyModule], + imports: [ValkeyModule, AgentIngestionModule], providers: [AgentSpawnerService, AgentLifecycleService, DockerSandboxService], exports: [AgentSpawnerService, AgentLifecycleService, DockerSandboxService], }) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 97e9d24..3e6acb6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -337,6 +337,9 @@ importers: '@nestjs/throttler': specifier: ^6.5.0 version: 6.5.0(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.12)(reflect-metadata@0.2.2) + '@prisma/client': + specifier: ^6.19.2 + version: 6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3) bullmq: specifier: ^5.67.2 version: 5.67.2 @@ -8005,7 +8008,7 @@ snapshots: chalk: 5.6.2 commander: 12.1.0 dotenv: 17.2.4 - drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) + drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) open: 10.2.0 pg: 8.17.2 prettier: 3.8.1 @@ -11345,7 +11348,7 @@ snapshots: optionalDependencies: '@prisma/client': 5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) better-sqlite3: 12.6.2 - drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) + drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) next: 16.1.6(@babel/core@7.28.6)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4) pg: 8.17.2 prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3) @@ -11370,7 +11373,7 @@ snapshots: optionalDependencies: '@prisma/client': 6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3) better-sqlite3: 12.6.2 - drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) + drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) next: 16.1.6(@babel/core@7.28.6)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4) pg: 8.17.2 prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3) @@ -12194,17 +12197,6 @@ snapshots: dotenv@17.2.4: {} - drizzle-orm@0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)): - optionalDependencies: - '@opentelemetry/api': 1.9.0 - '@prisma/client': 5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) - '@types/pg': 8.16.0 - better-sqlite3: 12.6.2 - kysely: 0.28.10 - pg: 8.17.2 - postgres: 3.4.8 - prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3) - drizzle-orm@0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)): optionalDependencies: '@opentelemetry/api': 1.9.0 @@ -12215,7 +12207,6 @@ snapshots: pg: 8.17.2 postgres: 3.4.8 prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3) - optional: true dunder-proto@1.0.1: dependencies: