409 lines
12 KiB
TypeScript
409 lines
12 KiB
TypeScript
/**
|
|
* Federation list query layer (FED-M3-05).
|
|
*
|
|
* Read-only DB adapter used by ListController after FederationAuthGuard and
|
|
* FederationScopeService have established the subject user, allowed resource,
|
|
* native-RBAC intersection, and row cap. Audit writes are intentionally
|
|
* deferred to M4.
|
|
*/
|
|
|
|
import { Inject, Injectable } from '@nestjs/common';
|
|
import {
|
|
and,
|
|
desc,
|
|
eq,
|
|
inArray,
|
|
insights,
|
|
isNotNull,
|
|
lt,
|
|
missionTasks,
|
|
missions,
|
|
or,
|
|
preferences,
|
|
projects,
|
|
tasks,
|
|
teamMembers,
|
|
type Db,
|
|
} from '@mosaicstack/db';
|
|
import type {
|
|
FederationNativeRbacEvaluator,
|
|
FederationNativeRbacRequest,
|
|
FederationNativeRbacResult,
|
|
FederationScopeQueryFilter,
|
|
} from '../scope.service.js';
|
|
import { DB } from '../../../database/database.module.js';
|
|
|
|
export interface FederationListQueryRequest {
|
|
readonly filter: FederationScopeQueryFilter;
|
|
readonly cursor?: string;
|
|
}
|
|
|
|
export interface FederationListQueryResult<T extends object = Record<string, unknown>> {
|
|
readonly items: T[];
|
|
readonly nextCursor?: string;
|
|
readonly truncated: boolean;
|
|
}
|
|
|
|
type CursorSource = 'insights' | 'preferences';
|
|
const CURSOR_SOURCE = Symbol('federationCursorSource');
|
|
|
|
type RowObject = Record<string, unknown> & { readonly [CURSOR_SOURCE]?: CursorSource };
|
|
|
|
interface KeysetCursor {
|
|
readonly createdAt: Date;
|
|
readonly id: string;
|
|
readonly source?: CursorSource;
|
|
}
|
|
|
|
function encodeCursor(row: RowObject): string {
|
|
const createdAt = row['createdAt'];
|
|
const id = row['id'];
|
|
if (!(createdAt instanceof Date) || typeof id !== 'string') {
|
|
throw new Error('Federation list cursor cannot be encoded');
|
|
}
|
|
|
|
const source = row[CURSOR_SOURCE];
|
|
return Buffer.from(
|
|
JSON.stringify({ createdAt: createdAt.toISOString(), id, ...(source ? { source } : {}) }),
|
|
'utf8',
|
|
).toString('base64url');
|
|
}
|
|
|
|
function decodeCursor(cursor: string | undefined): KeysetCursor | undefined {
|
|
if (cursor === undefined) {
|
|
return undefined;
|
|
}
|
|
|
|
try {
|
|
const parsed = JSON.parse(Buffer.from(cursor, 'base64url').toString('utf8')) as unknown;
|
|
if (typeof parsed !== 'object' || parsed === null) {
|
|
throw new Error('cursor must be an object');
|
|
}
|
|
|
|
const { createdAt, id, source } = parsed as {
|
|
createdAt?: unknown;
|
|
id?: unknown;
|
|
source?: unknown;
|
|
};
|
|
if (typeof createdAt !== 'string' || typeof id !== 'string' || id.length === 0) {
|
|
throw new Error('cursor is missing createdAt or id');
|
|
}
|
|
if (source !== undefined && source !== 'insights' && source !== 'preferences') {
|
|
throw new Error('cursor source is invalid');
|
|
}
|
|
|
|
const date = new Date(createdAt);
|
|
if (Number.isNaN(date.getTime())) {
|
|
throw new Error('cursor createdAt is invalid');
|
|
}
|
|
|
|
return { createdAt: date, id, ...(source ? { source } : {}) };
|
|
} catch {
|
|
throw new Error('Invalid federation list cursor');
|
|
}
|
|
}
|
|
|
|
function paginate<T extends RowObject>(rows: T[], limit: number): FederationListQueryResult<T> {
|
|
const page = rows.slice(0, limit);
|
|
const hasMore = rows.length > limit;
|
|
const nextCursor = hasMore ? encodeCursor(page[page.length - 1] ?? {}) : undefined;
|
|
|
|
return {
|
|
items: page,
|
|
truncated: hasMore,
|
|
...(nextCursor !== undefined ? { nextCursor } : {}),
|
|
};
|
|
}
|
|
|
|
function markCursorSource<T extends RowObject>(row: T, source: CursorSource): T {
|
|
Object.defineProperty(row, CURSOR_SOURCE, {
|
|
value: source,
|
|
enumerable: false,
|
|
configurable: false,
|
|
});
|
|
return row;
|
|
}
|
|
|
|
function sortRows(rows: RowObject[]): RowObject[] {
|
|
return [...rows].sort((a, b) => {
|
|
const aTime = a['createdAt'] instanceof Date ? a['createdAt'].getTime() : 0;
|
|
const bTime = b['createdAt'] instanceof Date ? b['createdAt'].getTime() : 0;
|
|
if (aTime !== bTime) {
|
|
return bTime - aTime;
|
|
}
|
|
return String(b['id'] ?? '').localeCompare(String(a['id'] ?? ''));
|
|
});
|
|
}
|
|
|
|
@Injectable()
|
|
export class FederationListQueryService implements FederationNativeRbacEvaluator {
|
|
constructor(@Inject(DB) private readonly db: Db) {}
|
|
|
|
async evaluateReadAccess(
|
|
request: FederationNativeRbacRequest,
|
|
): Promise<FederationNativeRbacResult> {
|
|
if (request.resource === 'credentials' || request.resource === 'api_keys') {
|
|
return {
|
|
allowed: false,
|
|
reason: `${request.resource} federation list access is not implemented in M3`,
|
|
details: { resource: request.resource },
|
|
};
|
|
}
|
|
|
|
if (request.resource === 'memory') {
|
|
return { allowed: true, access: { includePersonal: true, teamIds: [] } };
|
|
}
|
|
|
|
const teamIds = await this.listSubjectTeamIds(request.subjectUserId);
|
|
return { allowed: true, access: { includePersonal: true, teamIds } };
|
|
}
|
|
|
|
async list<T extends RowObject = RowObject>(
|
|
request: FederationListQueryRequest,
|
|
): Promise<FederationListQueryResult<T>> {
|
|
const cursor = decodeCursor(request.cursor);
|
|
const rows = await this.listAllRows(request.filter, request.filter.limit + 1, cursor);
|
|
return paginate(rows as T[], request.filter.limit);
|
|
}
|
|
|
|
private async listAllRows(
|
|
filter: FederationScopeQueryFilter,
|
|
rowLimit: number,
|
|
cursor: KeysetCursor | undefined,
|
|
): Promise<RowObject[]> {
|
|
switch (filter.resource) {
|
|
case 'tasks':
|
|
return this.listTasks(filter, rowLimit, cursor);
|
|
case 'notes':
|
|
return this.listNotes(filter, rowLimit, cursor);
|
|
case 'memory':
|
|
return this.listMemory(filter, rowLimit, cursor);
|
|
case 'credentials':
|
|
case 'api_keys':
|
|
return [];
|
|
default:
|
|
throw new Error(`Unsupported federation list resource: ${String(filter.resource)}`);
|
|
}
|
|
}
|
|
|
|
private async listSubjectTeamIds(subjectUserId: string): Promise<string[]> {
|
|
const rows = await this.db
|
|
.select({ teamId: teamMembers.teamId })
|
|
.from(teamMembers)
|
|
.where(eq(teamMembers.userId, subjectUserId));
|
|
|
|
return rows.map((row) => row.teamId);
|
|
}
|
|
|
|
private async listAccessibleProjectIds(filter: FederationScopeQueryFilter): Promise<string[]> {
|
|
const clauses = [];
|
|
if (filter.includePersonal) {
|
|
clauses.push(and(eq(projects.ownerType, 'user'), eq(projects.ownerId, filter.subjectUserId)));
|
|
}
|
|
if (filter.teamIds.length > 0) {
|
|
clauses.push(
|
|
and(eq(projects.ownerType, 'team'), inArray(projects.teamId, [...filter.teamIds])),
|
|
);
|
|
}
|
|
|
|
if (clauses.length === 0) {
|
|
return [];
|
|
}
|
|
|
|
const rows = await this.db
|
|
.select({ id: projects.id })
|
|
.from(projects)
|
|
.where(clauses.length === 1 ? clauses[0] : or(...clauses));
|
|
|
|
return rows.map((row) => row.id);
|
|
}
|
|
|
|
private async listMissionIds(projectIds: readonly string[]): Promise<string[]> {
|
|
if (projectIds.length === 0) {
|
|
return [];
|
|
}
|
|
|
|
const rows = await this.db
|
|
.select({ id: missions.id })
|
|
.from(missions)
|
|
.where(inArray(missions.projectId, [...projectIds]));
|
|
|
|
return rows.map((row) => row.id);
|
|
}
|
|
|
|
private async listTasks(
|
|
filter: FederationScopeQueryFilter,
|
|
rowLimit: number,
|
|
cursor: KeysetCursor | undefined,
|
|
): Promise<RowObject[]> {
|
|
const projectIds = await this.listAccessibleProjectIds(filter);
|
|
const missionIds = await this.listMissionIds(projectIds);
|
|
const clauses = [];
|
|
|
|
if (projectIds.length > 0) {
|
|
clauses.push(inArray(tasks.projectId, projectIds));
|
|
}
|
|
if (missionIds.length > 0) {
|
|
clauses.push(inArray(tasks.missionId, missionIds));
|
|
}
|
|
|
|
if (clauses.length === 0) {
|
|
return [];
|
|
}
|
|
|
|
const scopeClause = clauses.length === 1 ? clauses[0] : or(...clauses);
|
|
const cursorClause = cursor
|
|
? or(
|
|
lt(tasks.createdAt, cursor.createdAt),
|
|
and(eq(tasks.createdAt, cursor.createdAt), lt(tasks.id, cursor.id)),
|
|
)
|
|
: undefined;
|
|
|
|
const rows = await this.db
|
|
.select({
|
|
id: tasks.id,
|
|
title: tasks.title,
|
|
description: tasks.description,
|
|
status: tasks.status,
|
|
priority: tasks.priority,
|
|
projectId: tasks.projectId,
|
|
missionId: tasks.missionId,
|
|
assignee: tasks.assignee,
|
|
tags: tasks.tags,
|
|
dueDate: tasks.dueDate,
|
|
metadata: tasks.metadata,
|
|
createdAt: tasks.createdAt,
|
|
updatedAt: tasks.updatedAt,
|
|
})
|
|
.from(tasks)
|
|
.where(and(scopeClause, cursorClause))
|
|
.orderBy(desc(tasks.createdAt), desc(tasks.id))
|
|
.limit(rowLimit);
|
|
|
|
return sortRows(rows as RowObject[]);
|
|
}
|
|
|
|
private async listNotes(
|
|
filter: FederationScopeQueryFilter,
|
|
rowLimit: number,
|
|
cursor: KeysetCursor | undefined,
|
|
): Promise<RowObject[]> {
|
|
const projectIds = await this.listAccessibleProjectIds(filter);
|
|
const missionIds = await this.listMissionIds(projectIds);
|
|
|
|
if (missionIds.length === 0) {
|
|
return [];
|
|
}
|
|
|
|
// mission_tasks rows are user-scoped even when the mission belongs to a team.
|
|
// Team visibility can narrow the mission set, but it must never widen the
|
|
// query to other users' mission task notes.
|
|
const scopeClause = and(
|
|
eq(missionTasks.userId, filter.subjectUserId),
|
|
inArray(missionTasks.missionId, missionIds),
|
|
);
|
|
const cursorClause = cursor
|
|
? or(
|
|
lt(missionTasks.createdAt, cursor.createdAt),
|
|
and(eq(missionTasks.createdAt, cursor.createdAt), lt(missionTasks.id, cursor.id)),
|
|
)
|
|
: undefined;
|
|
|
|
const rows = await this.db
|
|
.select({
|
|
id: missionTasks.id,
|
|
missionId: missionTasks.missionId,
|
|
taskId: missionTasks.taskId,
|
|
status: missionTasks.status,
|
|
content: missionTasks.notes,
|
|
createdAt: missionTasks.createdAt,
|
|
updatedAt: missionTasks.updatedAt,
|
|
})
|
|
.from(missionTasks)
|
|
.where(and(scopeClause, cursorClause, isNotNull(missionTasks.notes)))
|
|
.orderBy(desc(missionTasks.createdAt), desc(missionTasks.id))
|
|
.limit(rowLimit);
|
|
|
|
return sortRows(rows.filter((row) => row.content !== '') as RowObject[]);
|
|
}
|
|
|
|
private async listMemory(
|
|
filter: FederationScopeQueryFilter,
|
|
rowLimit: number,
|
|
cursor: KeysetCursor | undefined,
|
|
): Promise<RowObject[]> {
|
|
if (!filter.includePersonal) {
|
|
return [];
|
|
}
|
|
if (cursor && cursor.source === undefined) {
|
|
throw new Error('Invalid federation list cursor');
|
|
}
|
|
|
|
const rows: RowObject[] = [];
|
|
|
|
// Memory spans two physical tables. To keep pagination deterministic and
|
|
// resumable without a SQL UNION, M3 emits a fixed block order: all insights
|
|
// first, then preferences. The opaque cursor records which table produced
|
|
// the boundary row, so the next page never re-applies one table's keyset to
|
|
// the other table (which could duplicate/skip rows at equal timestamps).
|
|
if (cursor?.source !== 'preferences') {
|
|
const insightCursorClause = cursor
|
|
? or(
|
|
lt(insights.createdAt, cursor.createdAt),
|
|
and(eq(insights.createdAt, cursor.createdAt), lt(insights.id, cursor.id)),
|
|
)
|
|
: undefined;
|
|
const insightRows = await this.db
|
|
.select({
|
|
id: insights.id,
|
|
kind: insights.source,
|
|
content: insights.content,
|
|
category: insights.category,
|
|
relevanceScore: insights.relevanceScore,
|
|
metadata: insights.metadata,
|
|
createdAt: insights.createdAt,
|
|
updatedAt: insights.updatedAt,
|
|
})
|
|
.from(insights)
|
|
.where(and(eq(insights.userId, filter.subjectUserId), insightCursorClause))
|
|
.orderBy(desc(insights.createdAt), desc(insights.id))
|
|
.limit(rowLimit);
|
|
|
|
rows.push(...(insightRows as RowObject[]).map((row) => markCursorSource(row, 'insights')));
|
|
}
|
|
|
|
const remaining = rowLimit - rows.length;
|
|
if (remaining <= 0) {
|
|
return rows;
|
|
}
|
|
|
|
const preferenceCursorClause =
|
|
cursor?.source === 'preferences'
|
|
? or(
|
|
lt(preferences.createdAt, cursor.createdAt),
|
|
and(eq(preferences.createdAt, cursor.createdAt), lt(preferences.id, cursor.id)),
|
|
)
|
|
: undefined;
|
|
const preferenceRows = await this.db
|
|
.select({
|
|
id: preferences.id,
|
|
kind: preferences.category,
|
|
key: preferences.key,
|
|
value: preferences.value,
|
|
source: preferences.source,
|
|
mutable: preferences.mutable,
|
|
createdAt: preferences.createdAt,
|
|
updatedAt: preferences.updatedAt,
|
|
})
|
|
.from(preferences)
|
|
.where(and(eq(preferences.userId, filter.subjectUserId), preferenceCursorClause))
|
|
.orderBy(desc(preferences.createdAt), desc(preferences.id))
|
|
.limit(remaining);
|
|
|
|
rows.push(
|
|
...(preferenceRows as RowObject[]).map((row) => markCursorSource(row, 'preferences')),
|
|
);
|
|
return rows;
|
|
}
|
|
}
|