This commit was merged in pull request #682.
This commit is contained in:
408
apps/gateway/src/federation/server/verbs/list-query.service.ts
Normal file
408
apps/gateway/src/federation/server/verbs/list-query.service.ts
Normal file
@@ -0,0 +1,408 @@
|
||||
/**
|
||||
* Federation list query layer (FED-M3-05).
|
||||
*
|
||||
* Read-only DB adapter used by ListController after FederationAuthGuard and
|
||||
* FederationScopeService have established the subject user, allowed resource,
|
||||
* native-RBAC intersection, and row cap. Audit writes are intentionally
|
||||
* deferred to M4.
|
||||
*/
|
||||
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import {
|
||||
and,
|
||||
desc,
|
||||
eq,
|
||||
inArray,
|
||||
insights,
|
||||
isNotNull,
|
||||
lt,
|
||||
missionTasks,
|
||||
missions,
|
||||
or,
|
||||
preferences,
|
||||
projects,
|
||||
tasks,
|
||||
teamMembers,
|
||||
type Db,
|
||||
} from '@mosaicstack/db';
|
||||
import type {
|
||||
FederationNativeRbacEvaluator,
|
||||
FederationNativeRbacRequest,
|
||||
FederationNativeRbacResult,
|
||||
FederationScopeQueryFilter,
|
||||
} from '../scope.service.js';
|
||||
import { DB } from '../../../database/database.module.js';
|
||||
|
||||
export interface FederationListQueryRequest {
|
||||
readonly filter: FederationScopeQueryFilter;
|
||||
readonly cursor?: string;
|
||||
}
|
||||
|
||||
export interface FederationListQueryResult<T extends object = Record<string, unknown>> {
|
||||
readonly items: T[];
|
||||
readonly nextCursor?: string;
|
||||
readonly truncated: boolean;
|
||||
}
|
||||
|
||||
type CursorSource = 'insights' | 'preferences';
|
||||
const CURSOR_SOURCE = Symbol('federationCursorSource');
|
||||
|
||||
type RowObject = Record<string, unknown> & { readonly [CURSOR_SOURCE]?: CursorSource };
|
||||
|
||||
interface KeysetCursor {
|
||||
readonly createdAt: Date;
|
||||
readonly id: string;
|
||||
readonly source?: CursorSource;
|
||||
}
|
||||
|
||||
function encodeCursor(row: RowObject): string {
|
||||
const createdAt = row['createdAt'];
|
||||
const id = row['id'];
|
||||
if (!(createdAt instanceof Date) || typeof id !== 'string') {
|
||||
throw new Error('Federation list cursor cannot be encoded');
|
||||
}
|
||||
|
||||
const source = row[CURSOR_SOURCE];
|
||||
return Buffer.from(
|
||||
JSON.stringify({ createdAt: createdAt.toISOString(), id, ...(source ? { source } : {}) }),
|
||||
'utf8',
|
||||
).toString('base64url');
|
||||
}
|
||||
|
||||
function decodeCursor(cursor: string | undefined): KeysetCursor | undefined {
|
||||
if (cursor === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
try {
|
||||
const parsed = JSON.parse(Buffer.from(cursor, 'base64url').toString('utf8')) as unknown;
|
||||
if (typeof parsed !== 'object' || parsed === null) {
|
||||
throw new Error('cursor must be an object');
|
||||
}
|
||||
|
||||
const { createdAt, id, source } = parsed as {
|
||||
createdAt?: unknown;
|
||||
id?: unknown;
|
||||
source?: unknown;
|
||||
};
|
||||
if (typeof createdAt !== 'string' || typeof id !== 'string' || id.length === 0) {
|
||||
throw new Error('cursor is missing createdAt or id');
|
||||
}
|
||||
if (source !== undefined && source !== 'insights' && source !== 'preferences') {
|
||||
throw new Error('cursor source is invalid');
|
||||
}
|
||||
|
||||
const date = new Date(createdAt);
|
||||
if (Number.isNaN(date.getTime())) {
|
||||
throw new Error('cursor createdAt is invalid');
|
||||
}
|
||||
|
||||
return { createdAt: date, id, ...(source ? { source } : {}) };
|
||||
} catch {
|
||||
throw new Error('Invalid federation list cursor');
|
||||
}
|
||||
}
|
||||
|
||||
function paginate<T extends RowObject>(rows: T[], limit: number): FederationListQueryResult<T> {
|
||||
const page = rows.slice(0, limit);
|
||||
const hasMore = rows.length > limit;
|
||||
const nextCursor = hasMore ? encodeCursor(page[page.length - 1] ?? {}) : undefined;
|
||||
|
||||
return {
|
||||
items: page,
|
||||
truncated: hasMore,
|
||||
...(nextCursor !== undefined ? { nextCursor } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function markCursorSource<T extends RowObject>(row: T, source: CursorSource): T {
|
||||
Object.defineProperty(row, CURSOR_SOURCE, {
|
||||
value: source,
|
||||
enumerable: false,
|
||||
configurable: false,
|
||||
});
|
||||
return row;
|
||||
}
|
||||
|
||||
function sortRows(rows: RowObject[]): RowObject[] {
|
||||
return [...rows].sort((a, b) => {
|
||||
const aTime = a['createdAt'] instanceof Date ? a['createdAt'].getTime() : 0;
|
||||
const bTime = b['createdAt'] instanceof Date ? b['createdAt'].getTime() : 0;
|
||||
if (aTime !== bTime) {
|
||||
return bTime - aTime;
|
||||
}
|
||||
return String(b['id'] ?? '').localeCompare(String(a['id'] ?? ''));
|
||||
});
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class FederationListQueryService implements FederationNativeRbacEvaluator {
|
||||
constructor(@Inject(DB) private readonly db: Db) {}
|
||||
|
||||
async evaluateReadAccess(
|
||||
request: FederationNativeRbacRequest,
|
||||
): Promise<FederationNativeRbacResult> {
|
||||
if (request.resource === 'credentials' || request.resource === 'api_keys') {
|
||||
return {
|
||||
allowed: false,
|
||||
reason: `${request.resource} federation list access is not implemented in M3`,
|
||||
details: { resource: request.resource },
|
||||
};
|
||||
}
|
||||
|
||||
if (request.resource === 'memory') {
|
||||
return { allowed: true, access: { includePersonal: true, teamIds: [] } };
|
||||
}
|
||||
|
||||
const teamIds = await this.listSubjectTeamIds(request.subjectUserId);
|
||||
return { allowed: true, access: { includePersonal: true, teamIds } };
|
||||
}
|
||||
|
||||
async list<T extends RowObject = RowObject>(
|
||||
request: FederationListQueryRequest,
|
||||
): Promise<FederationListQueryResult<T>> {
|
||||
const cursor = decodeCursor(request.cursor);
|
||||
const rows = await this.listAllRows(request.filter, request.filter.limit + 1, cursor);
|
||||
return paginate(rows as T[], request.filter.limit);
|
||||
}
|
||||
|
||||
private async listAllRows(
|
||||
filter: FederationScopeQueryFilter,
|
||||
rowLimit: number,
|
||||
cursor: KeysetCursor | undefined,
|
||||
): Promise<RowObject[]> {
|
||||
switch (filter.resource) {
|
||||
case 'tasks':
|
||||
return this.listTasks(filter, rowLimit, cursor);
|
||||
case 'notes':
|
||||
return this.listNotes(filter, rowLimit, cursor);
|
||||
case 'memory':
|
||||
return this.listMemory(filter, rowLimit, cursor);
|
||||
case 'credentials':
|
||||
case 'api_keys':
|
||||
return [];
|
||||
default:
|
||||
throw new Error(`Unsupported federation list resource: ${String(filter.resource)}`);
|
||||
}
|
||||
}
|
||||
|
||||
private async listSubjectTeamIds(subjectUserId: string): Promise<string[]> {
|
||||
const rows = await this.db
|
||||
.select({ teamId: teamMembers.teamId })
|
||||
.from(teamMembers)
|
||||
.where(eq(teamMembers.userId, subjectUserId));
|
||||
|
||||
return rows.map((row) => row.teamId);
|
||||
}
|
||||
|
||||
private async listAccessibleProjectIds(filter: FederationScopeQueryFilter): Promise<string[]> {
|
||||
const clauses = [];
|
||||
if (filter.includePersonal) {
|
||||
clauses.push(and(eq(projects.ownerType, 'user'), eq(projects.ownerId, filter.subjectUserId)));
|
||||
}
|
||||
if (filter.teamIds.length > 0) {
|
||||
clauses.push(
|
||||
and(eq(projects.ownerType, 'team'), inArray(projects.teamId, [...filter.teamIds])),
|
||||
);
|
||||
}
|
||||
|
||||
if (clauses.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const rows = await this.db
|
||||
.select({ id: projects.id })
|
||||
.from(projects)
|
||||
.where(clauses.length === 1 ? clauses[0] : or(...clauses));
|
||||
|
||||
return rows.map((row) => row.id);
|
||||
}
|
||||
|
||||
private async listMissionIds(projectIds: readonly string[]): Promise<string[]> {
|
||||
if (projectIds.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const rows = await this.db
|
||||
.select({ id: missions.id })
|
||||
.from(missions)
|
||||
.where(inArray(missions.projectId, [...projectIds]));
|
||||
|
||||
return rows.map((row) => row.id);
|
||||
}
|
||||
|
||||
private async listTasks(
|
||||
filter: FederationScopeQueryFilter,
|
||||
rowLimit: number,
|
||||
cursor: KeysetCursor | undefined,
|
||||
): Promise<RowObject[]> {
|
||||
const projectIds = await this.listAccessibleProjectIds(filter);
|
||||
const missionIds = await this.listMissionIds(projectIds);
|
||||
const clauses = [];
|
||||
|
||||
if (projectIds.length > 0) {
|
||||
clauses.push(inArray(tasks.projectId, projectIds));
|
||||
}
|
||||
if (missionIds.length > 0) {
|
||||
clauses.push(inArray(tasks.missionId, missionIds));
|
||||
}
|
||||
|
||||
if (clauses.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const scopeClause = clauses.length === 1 ? clauses[0] : or(...clauses);
|
||||
const cursorClause = cursor
|
||||
? or(
|
||||
lt(tasks.createdAt, cursor.createdAt),
|
||||
and(eq(tasks.createdAt, cursor.createdAt), lt(tasks.id, cursor.id)),
|
||||
)
|
||||
: undefined;
|
||||
|
||||
const rows = await this.db
|
||||
.select({
|
||||
id: tasks.id,
|
||||
title: tasks.title,
|
||||
description: tasks.description,
|
||||
status: tasks.status,
|
||||
priority: tasks.priority,
|
||||
projectId: tasks.projectId,
|
||||
missionId: tasks.missionId,
|
||||
assignee: tasks.assignee,
|
||||
tags: tasks.tags,
|
||||
dueDate: tasks.dueDate,
|
||||
metadata: tasks.metadata,
|
||||
createdAt: tasks.createdAt,
|
||||
updatedAt: tasks.updatedAt,
|
||||
})
|
||||
.from(tasks)
|
||||
.where(and(scopeClause, cursorClause))
|
||||
.orderBy(desc(tasks.createdAt), desc(tasks.id))
|
||||
.limit(rowLimit);
|
||||
|
||||
return sortRows(rows as RowObject[]);
|
||||
}
|
||||
|
||||
private async listNotes(
|
||||
filter: FederationScopeQueryFilter,
|
||||
rowLimit: number,
|
||||
cursor: KeysetCursor | undefined,
|
||||
): Promise<RowObject[]> {
|
||||
const projectIds = await this.listAccessibleProjectIds(filter);
|
||||
const missionIds = await this.listMissionIds(projectIds);
|
||||
|
||||
if (missionIds.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
// mission_tasks rows are user-scoped even when the mission belongs to a team.
|
||||
// Team visibility can narrow the mission set, but it must never widen the
|
||||
// query to other users' mission task notes.
|
||||
const scopeClause = and(
|
||||
eq(missionTasks.userId, filter.subjectUserId),
|
||||
inArray(missionTasks.missionId, missionIds),
|
||||
);
|
||||
const cursorClause = cursor
|
||||
? or(
|
||||
lt(missionTasks.createdAt, cursor.createdAt),
|
||||
and(eq(missionTasks.createdAt, cursor.createdAt), lt(missionTasks.id, cursor.id)),
|
||||
)
|
||||
: undefined;
|
||||
|
||||
const rows = await this.db
|
||||
.select({
|
||||
id: missionTasks.id,
|
||||
missionId: missionTasks.missionId,
|
||||
taskId: missionTasks.taskId,
|
||||
status: missionTasks.status,
|
||||
content: missionTasks.notes,
|
||||
createdAt: missionTasks.createdAt,
|
||||
updatedAt: missionTasks.updatedAt,
|
||||
})
|
||||
.from(missionTasks)
|
||||
.where(and(scopeClause, cursorClause, isNotNull(missionTasks.notes)))
|
||||
.orderBy(desc(missionTasks.createdAt), desc(missionTasks.id))
|
||||
.limit(rowLimit);
|
||||
|
||||
return sortRows(rows.filter((row) => row.content !== '') as RowObject[]);
|
||||
}
|
||||
|
||||
private async listMemory(
|
||||
filter: FederationScopeQueryFilter,
|
||||
rowLimit: number,
|
||||
cursor: KeysetCursor | undefined,
|
||||
): Promise<RowObject[]> {
|
||||
if (!filter.includePersonal) {
|
||||
return [];
|
||||
}
|
||||
if (cursor && cursor.source === undefined) {
|
||||
throw new Error('Invalid federation list cursor');
|
||||
}
|
||||
|
||||
const rows: RowObject[] = [];
|
||||
|
||||
// Memory spans two physical tables. To keep pagination deterministic and
|
||||
// resumable without a SQL UNION, M3 emits a fixed block order: all insights
|
||||
// first, then preferences. The opaque cursor records which table produced
|
||||
// the boundary row, so the next page never re-applies one table's keyset to
|
||||
// the other table (which could duplicate/skip rows at equal timestamps).
|
||||
if (cursor?.source !== 'preferences') {
|
||||
const insightCursorClause = cursor
|
||||
? or(
|
||||
lt(insights.createdAt, cursor.createdAt),
|
||||
and(eq(insights.createdAt, cursor.createdAt), lt(insights.id, cursor.id)),
|
||||
)
|
||||
: undefined;
|
||||
const insightRows = await this.db
|
||||
.select({
|
||||
id: insights.id,
|
||||
kind: insights.source,
|
||||
content: insights.content,
|
||||
category: insights.category,
|
||||
relevanceScore: insights.relevanceScore,
|
||||
metadata: insights.metadata,
|
||||
createdAt: insights.createdAt,
|
||||
updatedAt: insights.updatedAt,
|
||||
})
|
||||
.from(insights)
|
||||
.where(and(eq(insights.userId, filter.subjectUserId), insightCursorClause))
|
||||
.orderBy(desc(insights.createdAt), desc(insights.id))
|
||||
.limit(rowLimit);
|
||||
|
||||
rows.push(...(insightRows as RowObject[]).map((row) => markCursorSource(row, 'insights')));
|
||||
}
|
||||
|
||||
const remaining = rowLimit - rows.length;
|
||||
if (remaining <= 0) {
|
||||
return rows;
|
||||
}
|
||||
|
||||
const preferenceCursorClause =
|
||||
cursor?.source === 'preferences'
|
||||
? or(
|
||||
lt(preferences.createdAt, cursor.createdAt),
|
||||
and(eq(preferences.createdAt, cursor.createdAt), lt(preferences.id, cursor.id)),
|
||||
)
|
||||
: undefined;
|
||||
const preferenceRows = await this.db
|
||||
.select({
|
||||
id: preferences.id,
|
||||
kind: preferences.category,
|
||||
key: preferences.key,
|
||||
value: preferences.value,
|
||||
source: preferences.source,
|
||||
mutable: preferences.mutable,
|
||||
createdAt: preferences.createdAt,
|
||||
updatedAt: preferences.updatedAt,
|
||||
})
|
||||
.from(preferences)
|
||||
.where(and(eq(preferences.userId, filter.subjectUserId), preferenceCursorClause))
|
||||
.orderBy(desc(preferences.createdAt), desc(preferences.id))
|
||||
.limit(remaining);
|
||||
|
||||
rows.push(
|
||||
...(preferenceRows as RowObject[]).map((row) => markCursorSource(row, 'preferences')),
|
||||
);
|
||||
return rows;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user