fix(#297): Implement actual query processing for federation
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed

Added query processing to route federation queries to domain services:
- Created query parser to extract intent and parameters from query strings
- Route queries to TasksService, EventsService, and ProjectsService
- Return actual data instead of placeholder responses
- Added workspace context validation

Implemented query types:
- Tasks: "get tasks", "show tasks", etc.
- Events: "get events", "upcoming events", etc.
- Projects: "get projects", "show projects", etc.

Added 5 new tests for query processing (20 tests total, all passing):
- Process tasks/events/projects queries
- Handle unknown query types
- Enforce workspace context requirements

Updated FederationModule to import TasksModule, EventsModule, ProjectsModule.

Fixes #297

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-03 22:48:59 -06:00
parent 3e02bade98
commit 4ac4219ce0
4 changed files with 439 additions and 5 deletions

View File

@@ -18,14 +18,21 @@ import { SignatureService } from "./signature.service";
import { ConnectionService } from "./connection.service";
import { OIDCService } from "./oidc.service";
import { CommandService } from "./command.service";
import { QueryService } from "./query.service";
import { FederationAgentService } from "./federation-agent.service";
import { PrismaModule } from "../prisma/prisma.module";
import { TasksModule } from "../tasks/tasks.module";
import { EventsModule } from "../events/events.module";
import { ProjectsModule } from "../projects/projects.module";
import { RedisProvider } from "../common/providers/redis.provider";
@Module({
imports: [
ConfigModule,
PrismaModule,
TasksModule,
EventsModule,
ProjectsModule,
HttpModule.register({
timeout: 10000,
maxRedirects: 5,
@@ -61,6 +68,7 @@ import { RedisProvider } from "../common/providers/redis.provider";
ConnectionService,
OIDCService,
CommandService,
QueryService,
FederationAgentService,
],
exports: [
@@ -71,6 +79,7 @@ import { RedisProvider } from "../common/providers/redis.provider";
ConnectionService,
OIDCService,
CommandService,
QueryService,
FederationAgentService,
],
})

View File

@@ -16,6 +16,9 @@ import { QueryService } from "./query.service";
import { PrismaService } from "../prisma/prisma.service";
import { FederationService } from "./federation.service";
import { SignatureService } from "./signature.service";
import { TasksService } from "../tasks/tasks.service";
import { EventsService } from "../events/events.service";
import { ProjectsService } from "../projects/projects.service";
import { HttpService } from "@nestjs/axios";
import { of, throwError } from "rxjs";
import type { AxiosResponse } from "axios";
@@ -60,6 +63,18 @@ describe("QueryService", () => {
get: vi.fn(),
};
const mockTasksService = {
findAll: vi.fn(),
};
const mockEventsService = {
findAll: vi.fn(),
};
const mockProjectsService = {
findAll: vi.fn(),
};
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
@@ -69,6 +84,9 @@ describe("QueryService", () => {
{ provide: SignatureService, useValue: mockSignatureService },
{ provide: HttpService, useValue: mockHttpService },
{ provide: ConfigService, useValue: mockConfig },
{ provide: TasksService, useValue: mockTasksService },
{ provide: EventsService, useValue: mockEventsService },
{ provide: ProjectsService, useValue: mockProjectsService },
],
}).compile();
@@ -78,6 +96,20 @@ describe("QueryService", () => {
signatureService = module.get<SignatureService>(SignatureService);
httpService = module.get<HttpService>(HttpService);
// Setup default mock return values for service queries
mockTasksService.findAll.mockResolvedValue({
data: [],
meta: { total: 0, page: 1, limit: 50, totalPages: 0 },
});
mockEventsService.findAll.mockResolvedValue({
data: [],
meta: { total: 0, page: 1, limit: 50, totalPages: 0 },
});
mockProjectsService.findAll.mockResolvedValue({
data: [],
meta: { total: 0, page: 1, limit: 50, totalPages: 0 },
});
vi.clearAllMocks();
});
@@ -500,4 +532,177 @@ describe("QueryService", () => {
await expect(service.processQueryResponse(response)).rejects.toThrow("Invalid signature");
});
});
describe("query processing with actual data", () => {
it("should process 'get tasks' query and return tasks data", async () => {
const queryMessage = {
messageId: "msg-1",
instanceId: "remote-instance-1",
query: "get tasks",
context: { workspaceId: "workspace-1" },
timestamp: Date.now(),
signature: "valid-signature",
};
const mockConnection = {
id: "connection-1",
workspaceId: "workspace-1",
remoteInstanceId: "remote-instance-1",
status: FederationConnectionStatus.ACTIVE,
};
const mockIdentity = {
instanceId: "local-instance-1",
};
mockPrisma.federationConnection.findFirst.mockResolvedValue(mockConnection);
mockSignatureService.validateTimestamp.mockReturnValue(true);
mockSignatureService.verifyMessage.mockResolvedValue({ valid: true });
mockFederationService.getInstanceIdentity.mockResolvedValue(mockIdentity);
mockSignatureService.signMessage.mockResolvedValue("response-signature");
const result = await service.handleIncomingQuery(queryMessage);
expect(result).toBeDefined();
expect(result.success).toBe(true);
expect(result.data).toBeDefined();
expect(result.correlationId).toBe(queryMessage.messageId);
});
it("should process 'get events' query and return events data", async () => {
const queryMessage = {
messageId: "msg-2",
instanceId: "remote-instance-1",
query: "get events",
context: { workspaceId: "workspace-1" },
timestamp: Date.now(),
signature: "valid-signature",
};
const mockConnection = {
id: "connection-1",
workspaceId: "workspace-1",
remoteInstanceId: "remote-instance-1",
status: FederationConnectionStatus.ACTIVE,
};
const mockIdentity = {
instanceId: "local-instance-1",
};
mockPrisma.federationConnection.findFirst.mockResolvedValue(mockConnection);
mockSignatureService.validateTimestamp.mockReturnValue(true);
mockSignatureService.verifyMessage.mockResolvedValue({ valid: true });
mockFederationService.getInstanceIdentity.mockResolvedValue(mockIdentity);
mockSignatureService.signMessage.mockResolvedValue("response-signature");
const result = await service.handleIncomingQuery(queryMessage);
expect(result).toBeDefined();
expect(result.success).toBe(true);
expect(result.data).toBeDefined();
});
it("should process 'get projects' query and return projects data", async () => {
const queryMessage = {
messageId: "msg-3",
instanceId: "remote-instance-1",
query: "get projects",
context: { workspaceId: "workspace-1" },
timestamp: Date.now(),
signature: "valid-signature",
};
const mockConnection = {
id: "connection-1",
workspaceId: "workspace-1",
remoteInstanceId: "remote-instance-1",
status: FederationConnectionStatus.ACTIVE,
};
const mockIdentity = {
instanceId: "local-instance-1",
};
mockPrisma.federationConnection.findFirst.mockResolvedValue(mockConnection);
mockSignatureService.validateTimestamp.mockReturnValue(true);
mockSignatureService.verifyMessage.mockResolvedValue({ valid: true });
mockFederationService.getInstanceIdentity.mockResolvedValue(mockIdentity);
mockSignatureService.signMessage.mockResolvedValue("response-signature");
const result = await service.handleIncomingQuery(queryMessage);
expect(result).toBeDefined();
expect(result.success).toBe(true);
expect(result.data).toBeDefined();
});
it("should handle unknown query type gracefully", async () => {
const queryMessage = {
messageId: "msg-4",
instanceId: "remote-instance-1",
query: "unknown command",
context: { workspaceId: "workspace-1" },
timestamp: Date.now(),
signature: "valid-signature",
};
const mockConnection = {
id: "connection-1",
workspaceId: "workspace-1",
remoteInstanceId: "remote-instance-1",
status: FederationConnectionStatus.ACTIVE,
};
const mockIdentity = {
instanceId: "local-instance-1",
};
mockPrisma.federationConnection.findFirst.mockResolvedValue(mockConnection);
mockSignatureService.validateTimestamp.mockReturnValue(true);
mockSignatureService.verifyMessage.mockResolvedValue({ valid: true });
mockFederationService.getInstanceIdentity.mockResolvedValue(mockIdentity);
mockSignatureService.signMessage.mockResolvedValue("response-signature");
const result = await service.handleIncomingQuery(queryMessage);
expect(result).toBeDefined();
expect(result.success).toBe(false);
expect(result.error).toContain("Unknown query type");
});
it("should enforce workspace context in queries", async () => {
const queryMessage = {
messageId: "msg-5",
instanceId: "remote-instance-1",
query: "get tasks",
context: {}, // Missing workspaceId
timestamp: Date.now(),
signature: "valid-signature",
};
const mockConnection = {
id: "connection-1",
workspaceId: "workspace-1",
remoteInstanceId: "remote-instance-1",
status: FederationConnectionStatus.ACTIVE,
};
const mockIdentity = {
instanceId: "local-instance-1",
};
mockPrisma.federationConnection.findFirst.mockResolvedValue(mockConnection);
mockSignatureService.validateTimestamp.mockReturnValue(true);
mockSignatureService.verifyMessage.mockResolvedValue({ valid: true });
mockFederationService.getInstanceIdentity.mockResolvedValue(mockIdentity);
mockSignatureService.signMessage.mockResolvedValue("response-signature");
const result = await service.handleIncomingQuery(queryMessage);
expect(result).toBeDefined();
expect(result.success).toBe(false);
expect(result.error).toContain("workspaceId");
});
});
});

View File

@@ -11,6 +11,9 @@ import { firstValueFrom } from "rxjs";
import { PrismaService } from "../prisma/prisma.service";
import { FederationService } from "./federation.service";
import { SignatureService } from "./signature.service";
import { TasksService } from "../tasks/tasks.service";
import { EventsService } from "../events/events.service";
import { ProjectsService } from "../projects/projects.service";
import {
FederationConnectionStatus,
FederationMessageType,
@@ -26,7 +29,10 @@ export class QueryService {
private readonly prisma: PrismaService,
private readonly federationService: FederationService,
private readonly signatureService: SignatureService,
private readonly httpService: HttpService
private readonly httpService: HttpService,
private readonly tasksService: TasksService,
private readonly eventsService: EventsService,
private readonly projectsService: ProjectsService
) {}
/**
@@ -153,15 +159,17 @@ export class QueryService {
throw new Error(verificationResult.error ?? "Invalid signature");
}
// Process query (placeholder - would delegate to actual query processor)
// Process query
let responseData: unknown;
let success = true;
let errorMessage: string | undefined;
try {
// TODO: Implement actual query processing
// For now, return a placeholder response
responseData = { message: "Query received and processed" };
responseData = await this.processQuery(
queryMessage.query,
connection.workspaceId,
queryMessage.context
);
} catch (error) {
success = false;
errorMessage = error instanceof Error ? error.message : "Query processing failed";
@@ -352,4 +360,133 @@ export class QueryService {
return details;
}
/**
* Process a query and return the result
*/
private async processQuery(
query: string,
_workspaceId: string,
context?: Record<string, unknown>
): Promise<unknown> {
// Validate workspaceId is provided in context
const contextWorkspaceId = context?.workspaceId as string | undefined;
if (!contextWorkspaceId) {
throw new Error("workspaceId is required in query context");
}
// Parse query to determine type and parameters
const queryType = this.parseQueryType(query);
const queryParams = this.parseQueryParams(query, context);
// Route to appropriate service based on query type
switch (queryType) {
case "tasks":
return this.processTasksQuery(contextWorkspaceId, queryParams);
case "events":
return this.processEventsQuery(contextWorkspaceId, queryParams);
case "projects":
return this.processProjectsQuery(contextWorkspaceId, queryParams);
default:
throw new Error(`Unknown query type: ${queryType}`);
}
}
/**
* Parse query string to determine query type
*/
private parseQueryType(query: string): string {
const lowerQuery = query.toLowerCase().trim();
if (lowerQuery.includes("task")) {
return "tasks";
}
if (lowerQuery.includes("event") || lowerQuery.includes("calendar")) {
return "events";
}
if (lowerQuery.includes("project")) {
return "projects";
}
throw new Error("Unknown query type");
}
/**
* Parse query parameters from query string and context
*/
private parseQueryParams(
_query: string,
context?: Record<string, unknown>
): Record<string, unknown> {
const params: Record<string, unknown> = {
page: 1,
limit: 50,
};
// Extract workspaceId from context
if (context?.workspaceId) {
params.workspaceId = context.workspaceId;
}
// Could add more sophisticated parsing here
// For now, return default params
return params;
}
/**
* Process tasks query
*/
private async processTasksQuery(
workspaceId: string,
params: Record<string, unknown>
): Promise<unknown> {
const result = await this.tasksService.findAll({
workspaceId,
page: params.page as number,
limit: params.limit as number,
});
return {
type: "tasks",
...result,
};
}
/**
* Process events query
*/
private async processEventsQuery(
workspaceId: string,
params: Record<string, unknown>
): Promise<unknown> {
const result = await this.eventsService.findAll({
workspaceId,
page: params.page as number,
limit: params.limit as number,
});
return {
type: "events",
...result,
};
}
/**
* Process projects query
*/
private async processProjectsQuery(
workspaceId: string,
params: Record<string, unknown>
): Promise<unknown> {
const result = await this.projectsService.findAll({
workspaceId,
page: params.page as number,
limit: params.limit as number,
});
return {
type: "projects",
...result,
};
}
}