Files
stack/apps/api/src/federation/query.service.ts
Jason Woltje 73074932f6 feat(#360): Add federation credential isolation
Implement explicit deny-lists in QueryService and CommandService to prevent
user credentials from leaking across federation boundaries.

## Changes

### Core Implementation
- QueryService: Block all credential-related queries with keyword detection
- CommandService: Block all credential operations (create/update/delete/read)
- Case-insensitive keyword matching for both queries and commands

### Security Features
- Deny-list includes: credential, api_key, secret, token, password, oauth
- Errors returned for blocked operations
- No impact on existing allowed operations (tasks, events, projects, agent commands)

### Testing
- Added 2 unit tests to query.service.spec.ts
- Added 3 unit tests to command.service.spec.ts
- Added 8 integration tests in credential-isolation.integration.spec.ts
- All 377 federation tests passing

### Documentation
- Created comprehensive security doc at docs/security/federation-credential-isolation.md
- Documents 4 security guarantees (G1-G4)
- Includes testing strategy and incident response procedures

## Security Guarantees

1. G1: Credential Confidentiality - Credentials never leave instance in plaintext
2. G2: Cross-Instance Isolation - Compromised key on one instance doesn't affect others
3. G3: Query/Command Isolation - Federated instances cannot query/modify credentials
4. G4: Accidental Exposure Prevention - Credentials cannot leak via messages

## Defense-in-Depth

This implementation adds application-layer protection on top of existing:
- Transit key separation (mosaic-credentials vs mosaic-federation)
- Per-instance OpenBao servers
- Workspace-scoped credential access

Fixes #360

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 16:55:49 -06:00

522 lines
14 KiB
TypeScript

/**
* Query Service
*
* Handles federated query messages.
*/
import { Injectable, Logger } from "@nestjs/common";
import { HttpService } from "@nestjs/axios";
import { randomUUID } from "crypto";
import { firstValueFrom } from "rxjs";
import { PrismaService } from "../prisma/prisma.service";
import { FederationService } from "./federation.service";
import { SignatureService } from "./signature.service";
import { TasksService } from "../tasks/tasks.service";
import { EventsService } from "../events/events.service";
import { ProjectsService } from "../projects/projects.service";
import {
FederationConnectionStatus,
FederationMessageType,
FederationMessageStatus,
} from "@prisma/client";
import type { QueryMessage, QueryResponse, QueryMessageDetails } from "./types/message.types";
@Injectable()
export class QueryService {
private readonly logger = new Logger(QueryService.name);
constructor(
private readonly prisma: PrismaService,
private readonly federationService: FederationService,
private readonly signatureService: SignatureService,
private readonly httpService: HttpService,
private readonly tasksService: TasksService,
private readonly eventsService: EventsService,
private readonly projectsService: ProjectsService
) {}
/**
* Send a query to a remote instance
*/
async sendQuery(
workspaceId: string,
connectionId: string,
query: string,
context?: Record<string, unknown>
): Promise<QueryMessageDetails> {
// Validate connection exists and is active (enforced in query)
const connection = await this.prisma.federationConnection.findUnique({
where: {
id: connectionId,
workspaceId,
status: FederationConnectionStatus.ACTIVE,
},
});
if (!connection) {
throw new Error("Connection not found");
}
// Get local instance identity
const identity = await this.federationService.getInstanceIdentity();
// Create query message
const messageId = randomUUID();
const timestamp = Date.now();
const queryPayload: Record<string, unknown> = {
messageId,
instanceId: identity.instanceId,
query,
timestamp,
};
if (context) {
queryPayload.context = context;
}
// Sign the query
const signature = await this.signatureService.signMessage(queryPayload);
const signedQuery = {
messageId,
instanceId: identity.instanceId,
query,
...(context ? { context } : {}),
timestamp,
signature,
} as QueryMessage;
// Store message in database
const message = await this.prisma.federationMessage.create({
data: {
workspaceId,
connectionId,
messageType: FederationMessageType.QUERY,
messageId,
query,
status: FederationMessageStatus.PENDING,
signature,
},
});
// Send query to remote instance
try {
const remoteUrl = `${connection.remoteUrl}/api/v1/federation/incoming/query`;
await firstValueFrom(this.httpService.post(remoteUrl, signedQuery));
this.logger.log(`Query sent to ${connection.remoteUrl}: ${messageId}`);
} catch (error) {
this.logger.error(`Failed to send query to ${connection.remoteUrl}`, error);
// Update message status to failed
await this.prisma.federationMessage.update({
where: { id: message.id },
data: {
status: FederationMessageStatus.FAILED,
error: error instanceof Error ? error.message : "Unknown error",
},
});
throw new Error("Failed to send query");
}
return this.mapToQueryMessageDetails(message);
}
/**
* Handle incoming query from remote instance
*/
async handleIncomingQuery(queryMessage: QueryMessage): Promise<QueryResponse> {
this.logger.log(`Received query from ${queryMessage.instanceId}: ${queryMessage.messageId}`);
// Validate timestamp
if (!this.signatureService.validateTimestamp(queryMessage.timestamp)) {
throw new Error("Query timestamp is outside acceptable range");
}
// Find connection for remote instance (status enforced in query)
const connection = await this.prisma.federationConnection.findFirst({
where: {
remoteInstanceId: queryMessage.instanceId,
status: FederationConnectionStatus.ACTIVE,
},
});
if (!connection) {
throw new Error("No connection found for remote instance");
}
// Verify signature
const { signature, ...messageToVerify } = queryMessage;
const verificationResult = await this.signatureService.verifyMessage(
messageToVerify,
signature,
queryMessage.instanceId
);
if (!verificationResult.valid) {
throw new Error(verificationResult.error ?? "Invalid signature");
}
// Process query
let responseData: unknown;
let success = true;
let errorMessage: string | undefined;
try {
responseData = await this.processQuery(
queryMessage.query,
connection.workspaceId,
queryMessage.context
);
} catch (error) {
success = false;
errorMessage = error instanceof Error ? error.message : "Query processing failed";
this.logger.error(`Query processing failed: ${errorMessage}`);
}
// Get local instance identity
const identity = await this.federationService.getInstanceIdentity();
// Create response
const responseMessageId = randomUUID();
const responseTimestamp = Date.now();
const responsePayload: Record<string, unknown> = {
messageId: responseMessageId,
correlationId: queryMessage.messageId,
instanceId: identity.instanceId,
success,
timestamp: responseTimestamp,
};
if (responseData !== undefined) {
responsePayload.data = responseData;
}
if (errorMessage !== undefined) {
responsePayload.error = errorMessage;
}
// Sign the response
const responseSignature = await this.signatureService.signMessage(responsePayload);
const response = {
messageId: responseMessageId,
correlationId: queryMessage.messageId,
instanceId: identity.instanceId,
success,
...(responseData !== undefined ? { data: responseData } : {}),
...(errorMessage !== undefined ? { error: errorMessage } : {}),
timestamp: responseTimestamp,
signature: responseSignature,
} as QueryResponse;
return response;
}
/**
* Get all query messages for a workspace
*/
async getQueryMessages(
workspaceId: string,
status?: FederationMessageStatus
): Promise<QueryMessageDetails[]> {
const where: Record<string, unknown> = {
workspaceId,
messageType: FederationMessageType.QUERY,
};
if (status) {
where.status = status;
}
const messages = await this.prisma.federationMessage.findMany({
where,
orderBy: { createdAt: "desc" },
});
return messages.map((msg) => this.mapToQueryMessageDetails(msg));
}
/**
* Get a single query message
*/
async getQueryMessage(workspaceId: string, messageId: string): Promise<QueryMessageDetails> {
const message = await this.prisma.federationMessage.findUnique({
where: { id: messageId, workspaceId },
});
if (!message) {
throw new Error("Query message not found");
}
return this.mapToQueryMessageDetails(message);
}
/**
* Process a query response from remote instance
*/
async processQueryResponse(response: QueryResponse): Promise<void> {
this.logger.log(`Received response for query: ${response.correlationId}`);
// Validate timestamp
if (!this.signatureService.validateTimestamp(response.timestamp)) {
throw new Error("Response timestamp is outside acceptable range");
}
// Find original query message
const message = await this.prisma.federationMessage.findFirst({
where: {
messageId: response.correlationId,
messageType: FederationMessageType.QUERY,
},
});
if (!message) {
throw new Error("Original query message not found");
}
// Verify signature
const { signature, ...responseToVerify } = response;
const verificationResult = await this.signatureService.verifyMessage(
responseToVerify,
signature,
response.instanceId
);
if (!verificationResult.valid) {
throw new Error(verificationResult.error ?? "Invalid signature");
}
// Update message with response
const updateData: Record<string, unknown> = {
status: response.success ? FederationMessageStatus.DELIVERED : FederationMessageStatus.FAILED,
deliveredAt: new Date(),
};
if (response.data !== undefined) {
updateData.response = response.data;
}
if (response.error !== undefined) {
updateData.error = response.error;
}
await this.prisma.federationMessage.update({
where: { id: message.id },
data: updateData,
});
this.logger.log(`Query response processed: ${response.correlationId}`);
}
/**
* Map Prisma FederationMessage to QueryMessageDetails
*/
private mapToQueryMessageDetails(message: {
id: string;
workspaceId: string;
connectionId: string;
messageType: FederationMessageType;
messageId: string;
correlationId: string | null;
query: string | null;
response: unknown;
status: FederationMessageStatus;
error: string | null;
createdAt: Date;
updatedAt: Date;
deliveredAt: Date | null;
}): QueryMessageDetails {
const details: QueryMessageDetails = {
id: message.id,
workspaceId: message.workspaceId,
connectionId: message.connectionId,
messageType: message.messageType,
messageId: message.messageId,
response: message.response,
status: message.status,
createdAt: message.createdAt,
updatedAt: message.updatedAt,
};
if (message.correlationId !== null) {
details.correlationId = message.correlationId;
}
if (message.query !== null) {
details.query = message.query;
}
if (message.error !== null) {
details.error = message.error;
}
if (message.deliveredAt !== null) {
details.deliveredAt = message.deliveredAt;
}
return details;
}
/**
* Process a query and return the result
*/
private async processQuery(
query: string,
_workspaceId: string,
context?: Record<string, unknown>
): Promise<unknown> {
// Validate workspaceId is provided in context
const contextWorkspaceId = context?.workspaceId as string | undefined;
if (!contextWorkspaceId) {
throw new Error("workspaceId is required in query context");
}
// SECURITY: Block all credential-related queries
// Credentials must never be exposed via federation
if (this.isCredentialQuery(query)) {
throw new Error("Credential queries are not allowed via federation");
}
// Parse query to determine type and parameters
const queryType = this.parseQueryType(query);
const queryParams = this.parseQueryParams(query, context);
// Route to appropriate service based on query type
switch (queryType) {
case "tasks":
return this.processTasksQuery(contextWorkspaceId, queryParams);
case "events":
return this.processEventsQuery(contextWorkspaceId, queryParams);
case "projects":
return this.processProjectsQuery(contextWorkspaceId, queryParams);
default:
throw new Error(`Unknown query type: ${queryType}`);
}
}
/**
* Check if query is attempting to access credential data
* Returns true if query contains credential-related keywords
*/
private isCredentialQuery(query: string): boolean {
const lowerQuery = query.toLowerCase();
// Deny-list of credential-related keywords
const credentialKeywords = [
"credential",
"user_credential",
"api_key",
"api key",
"secret",
"token",
"password",
"oauth",
"access_token",
];
return credentialKeywords.some((keyword) => lowerQuery.includes(keyword));
}
/**
* Parse query string to determine query type
*/
private parseQueryType(query: string): string {
const lowerQuery = query.toLowerCase().trim();
if (lowerQuery.includes("task")) {
return "tasks";
}
if (lowerQuery.includes("event") || lowerQuery.includes("calendar")) {
return "events";
}
if (lowerQuery.includes("project")) {
return "projects";
}
throw new Error("Unknown query type");
}
/**
* Parse query parameters from query string and context
*/
private parseQueryParams(
_query: string,
context?: Record<string, unknown>
): Record<string, unknown> {
const params: Record<string, unknown> = {
page: 1,
limit: 50,
};
// Extract workspaceId from context
if (context?.workspaceId) {
params.workspaceId = context.workspaceId;
}
// Could add more sophisticated parsing here
// For now, return default params
return params;
}
/**
* Process tasks query
*/
private async processTasksQuery(
workspaceId: string,
params: Record<string, unknown>
): Promise<unknown> {
const result = await this.tasksService.findAll({
workspaceId,
page: params.page as number,
limit: params.limit as number,
});
return {
type: "tasks",
...result,
};
}
/**
* Process events query
*/
private async processEventsQuery(
workspaceId: string,
params: Record<string, unknown>
): Promise<unknown> {
const result = await this.eventsService.findAll({
workspaceId,
page: params.page as number,
limit: params.limit as number,
});
return {
type: "events",
...result,
};
}
/**
* Process projects query
*/
private async processProjectsQuery(
workspaceId: string,
params: Record<string, unknown>
): Promise<unknown> {
const result = await this.projectsService.findAll({
workspaceId,
page: params.page as number,
limit: params.limit as number,
});
return {
type: "projects",
...result,
};
}
}