feat: auth middleware, brain data layer, Valkey queue (P1-002/003/004) (#71)
Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #71.
This commit is contained in:
@@ -13,6 +13,8 @@
|
|||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@mariozechner/pi-coding-agent": "~0.57.1",
|
"@mariozechner/pi-coding-agent": "~0.57.1",
|
||||||
|
"@mosaic/auth": "workspace:^",
|
||||||
|
"@mosaic/db": "workspace:^",
|
||||||
"@nestjs/common": "^11.0.0",
|
"@nestjs/common": "^11.0.0",
|
||||||
"@nestjs/core": "^11.0.0",
|
"@nestjs/core": "^11.0.0",
|
||||||
"@nestjs/platform-fastify": "^11.0.0",
|
"@nestjs/platform-fastify": "^11.0.0",
|
||||||
@@ -25,6 +27,7 @@
|
|||||||
"@opentelemetry/sdk-metrics": "^2.6.0",
|
"@opentelemetry/sdk-metrics": "^2.6.0",
|
||||||
"@opentelemetry/sdk-node": "^0.213.0",
|
"@opentelemetry/sdk-node": "^0.213.0",
|
||||||
"@opentelemetry/semantic-conventions": "^1.40.0",
|
"@opentelemetry/semantic-conventions": "^1.40.0",
|
||||||
|
"better-auth": "^1.5.5",
|
||||||
"fastify": "^5.0.0",
|
"fastify": "^5.0.0",
|
||||||
"reflect-metadata": "^0.2.0",
|
"reflect-metadata": "^0.2.0",
|
||||||
"rxjs": "^7.8.0",
|
"rxjs": "^7.8.0",
|
||||||
|
|||||||
@@ -1,10 +1,12 @@
|
|||||||
import { Module } from '@nestjs/common';
|
import { Module } from '@nestjs/common';
|
||||||
import { HealthController } from './health/health.controller.js';
|
import { HealthController } from './health/health.controller.js';
|
||||||
|
import { DatabaseModule } from './database/database.module.js';
|
||||||
|
import { AuthModule } from './auth/auth.module.js';
|
||||||
import { AgentModule } from './agent/agent.module.js';
|
import { AgentModule } from './agent/agent.module.js';
|
||||||
import { ChatModule } from './chat/chat.module.js';
|
import { ChatModule } from './chat/chat.module.js';
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
imports: [AgentModule, ChatModule],
|
imports: [DatabaseModule, AuthModule, AgentModule, ChatModule],
|
||||||
controllers: [HealthController],
|
controllers: [HealthController],
|
||||||
})
|
})
|
||||||
export class AppModule {}
|
export class AppModule {}
|
||||||
|
|||||||
20
apps/gateway/src/auth/auth.controller.ts
Normal file
20
apps/gateway/src/auth/auth.controller.ts
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
import type { IncomingMessage, ServerResponse } from 'node:http';
|
||||||
|
import { All, Controller, Inject, Req, Res } from '@nestjs/common';
|
||||||
|
import type { FastifyReply, FastifyRequest } from 'fastify';
|
||||||
|
import { toNodeHandler } from 'better-auth/node';
|
||||||
|
import type { Auth } from '@mosaic/auth';
|
||||||
|
import { AUTH } from './auth.module.js';
|
||||||
|
|
||||||
|
@Controller('api/auth')
|
||||||
|
export class AuthController {
|
||||||
|
private readonly handler: (req: IncomingMessage, res: ServerResponse) => Promise<void>;
|
||||||
|
|
||||||
|
constructor(@Inject(AUTH) auth: Auth) {
|
||||||
|
this.handler = toNodeHandler(auth);
|
||||||
|
}
|
||||||
|
|
||||||
|
@All('*path')
|
||||||
|
async handleAuth(@Req() req: FastifyRequest, @Res() res: FastifyReply): Promise<void> {
|
||||||
|
await this.handler(req.raw, res.raw);
|
||||||
|
}
|
||||||
|
}
|
||||||
32
apps/gateway/src/auth/auth.guard.ts
Normal file
32
apps/gateway/src/auth/auth.guard.ts
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
import {
|
||||||
|
CanActivate,
|
||||||
|
ExecutionContext,
|
||||||
|
Inject,
|
||||||
|
Injectable,
|
||||||
|
UnauthorizedException,
|
||||||
|
} from '@nestjs/common';
|
||||||
|
import { fromNodeHeaders } from 'better-auth/node';
|
||||||
|
import type { Auth } from '@mosaic/auth';
|
||||||
|
import type { FastifyRequest } from 'fastify';
|
||||||
|
import { AUTH } from './auth.module.js';
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class AuthGuard implements CanActivate {
|
||||||
|
constructor(@Inject(AUTH) private readonly auth: Auth) {}
|
||||||
|
|
||||||
|
async canActivate(context: ExecutionContext): Promise<boolean> {
|
||||||
|
const request = context.switchToHttp().getRequest<FastifyRequest>();
|
||||||
|
const headers = fromNodeHeaders(request.raw.headers);
|
||||||
|
|
||||||
|
const result = await this.auth.api.getSession({ headers });
|
||||||
|
|
||||||
|
if (!result) {
|
||||||
|
throw new UnauthorizedException('Invalid or expired session');
|
||||||
|
}
|
||||||
|
|
||||||
|
(request as FastifyRequest & { user: unknown; session: unknown }).user = result.user;
|
||||||
|
(request as FastifyRequest & { user: unknown; session: unknown }).session = result.session;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
26
apps/gateway/src/auth/auth.module.ts
Normal file
26
apps/gateway/src/auth/auth.module.ts
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
import { Global, Module } from '@nestjs/common';
|
||||||
|
import { createAuth, type Auth } from '@mosaic/auth';
|
||||||
|
import type { Db } from '@mosaic/db';
|
||||||
|
import { DB } from '../database/database.module.js';
|
||||||
|
import { AuthController } from './auth.controller.js';
|
||||||
|
|
||||||
|
export const AUTH = 'AUTH';
|
||||||
|
|
||||||
|
@Global()
|
||||||
|
@Module({
|
||||||
|
providers: [
|
||||||
|
{
|
||||||
|
provide: AUTH,
|
||||||
|
useFactory: (db: Db): Auth =>
|
||||||
|
createAuth({
|
||||||
|
db,
|
||||||
|
baseURL: process.env['BETTER_AUTH_URL'] ?? 'http://localhost:4000',
|
||||||
|
secret: process.env['BETTER_AUTH_SECRET'],
|
||||||
|
}),
|
||||||
|
inject: [DB],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
controllers: [AuthController],
|
||||||
|
exports: [AUTH],
|
||||||
|
})
|
||||||
|
export class AuthModule {}
|
||||||
7
apps/gateway/src/auth/current-user.decorator.ts
Normal file
7
apps/gateway/src/auth/current-user.decorator.ts
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
import { createParamDecorator, type ExecutionContext } from '@nestjs/common';
|
||||||
|
import type { FastifyRequest } from 'fastify';
|
||||||
|
|
||||||
|
export const CurrentUser = createParamDecorator((_data: unknown, ctx: ExecutionContext) => {
|
||||||
|
const request = ctx.switchToHttp().getRequest<FastifyRequest & { user?: unknown }>();
|
||||||
|
return request.user;
|
||||||
|
});
|
||||||
28
apps/gateway/src/database/database.module.ts
Normal file
28
apps/gateway/src/database/database.module.ts
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
import { Global, Inject, Module, type OnApplicationShutdown } from '@nestjs/common';
|
||||||
|
import { createDb, type Db, type DbHandle } from '@mosaic/db';
|
||||||
|
|
||||||
|
export const DB_HANDLE = 'DB_HANDLE';
|
||||||
|
export const DB = 'DB';
|
||||||
|
|
||||||
|
@Global()
|
||||||
|
@Module({
|
||||||
|
providers: [
|
||||||
|
{
|
||||||
|
provide: DB_HANDLE,
|
||||||
|
useFactory: (): DbHandle => createDb(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
provide: DB,
|
||||||
|
useFactory: (handle: DbHandle): Db => handle.db,
|
||||||
|
inject: [DB_HANDLE],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
exports: [DB],
|
||||||
|
})
|
||||||
|
export class DatabaseModule implements OnApplicationShutdown {
|
||||||
|
constructor(@Inject(DB_HANDLE) private readonly handle: DbHandle) {}
|
||||||
|
|
||||||
|
async onApplicationShutdown(): Promise<void> {
|
||||||
|
await this.handle.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -14,9 +14,9 @@
|
|||||||
| P0-008 | done | Phase 0 | Project docs — AGENTS.md, CLAUDE.md, README | #69 | #8 |
|
| P0-008 | done | Phase 0 | Project docs — AGENTS.md, CLAUDE.md, README | #69 | #8 |
|
||||||
| P0-009 | done | Phase 0 | Verify Phase 0 — CI green, all packages build | #70 | #9 |
|
| P0-009 | done | Phase 0 | Verify Phase 0 — CI green, all packages build | #70 | #9 |
|
||||||
| P1-001 | done | Phase 1 | apps/gateway scaffold — NestJS + Fastify adapter | #61 | #10 |
|
| P1-001 | done | Phase 1 | apps/gateway scaffold — NestJS + Fastify adapter | #61 | #10 |
|
||||||
| P1-002 | not-started | Phase 1 | Auth middleware — BetterAuth session validation | — | #11 |
|
| P1-002 | done | Phase 1 | Auth middleware — BetterAuth session validation | #71 | #11 |
|
||||||
| P1-003 | not-started | Phase 1 | @mosaic/brain — migrate from v0, PG backend | — | #12 |
|
| P1-003 | done | Phase 1 | @mosaic/brain — migrate from v0, PG backend | #71 | #12 |
|
||||||
| P1-004 | not-started | Phase 1 | @mosaic/queue — migrate from v0 | — | #13 |
|
| P1-004 | done | Phase 1 | @mosaic/queue — migrate from v0 | #71 | #13 |
|
||||||
| P1-005 | not-started | Phase 1 | Gateway routes — conversations CRUD + messages | — | #14 |
|
| P1-005 | not-started | Phase 1 | Gateway routes — conversations CRUD + messages | — | #14 |
|
||||||
| P1-006 | not-started | Phase 1 | Gateway routes — tasks, projects, missions CRUD | — | #15 |
|
| P1-006 | not-started | Phase 1 | Gateway routes — tasks, projects, missions CRUD | — | #15 |
|
||||||
| P1-007 | done | Phase 1 | WebSocket server — chat streaming | #61 | #16 |
|
| P1-007 | done | Phase 1 | WebSocket server — chat streaming | #61 | #16 |
|
||||||
|
|||||||
@@ -16,6 +16,7 @@
|
|||||||
"test": "vitest run --passWithNoTests"
|
"test": "vitest run --passWithNoTests"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
"@mosaic/db": "workspace:^",
|
||||||
"@mosaic/types": "workspace:*"
|
"@mosaic/types": "workspace:*"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
|
|||||||
21
packages/brain/src/brain.ts
Normal file
21
packages/brain/src/brain.ts
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
import type { Db } from '@mosaic/db';
|
||||||
|
import { createProjectsRepo, type ProjectsRepo } from './projects.js';
|
||||||
|
import { createMissionsRepo, type MissionsRepo } from './missions.js';
|
||||||
|
import { createTasksRepo, type TasksRepo } from './tasks.js';
|
||||||
|
import { createConversationsRepo, type ConversationsRepo } from './conversations.js';
|
||||||
|
|
||||||
|
export interface Brain {
|
||||||
|
projects: ProjectsRepo;
|
||||||
|
missions: MissionsRepo;
|
||||||
|
tasks: TasksRepo;
|
||||||
|
conversations: ConversationsRepo;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function createBrain(db: Db): Brain {
|
||||||
|
return {
|
||||||
|
projects: createProjectsRepo(db),
|
||||||
|
missions: createMissionsRepo(db),
|
||||||
|
tasks: createTasksRepo(db),
|
||||||
|
conversations: createConversationsRepo(db),
|
||||||
|
};
|
||||||
|
}
|
||||||
49
packages/brain/src/conversations.ts
Normal file
49
packages/brain/src/conversations.ts
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
import { eq, type Db, conversations, messages } from '@mosaic/db';
|
||||||
|
|
||||||
|
export type Conversation = typeof conversations.$inferSelect;
|
||||||
|
export type NewConversation = typeof conversations.$inferInsert;
|
||||||
|
export type Message = typeof messages.$inferSelect;
|
||||||
|
export type NewMessage = typeof messages.$inferInsert;
|
||||||
|
|
||||||
|
export function createConversationsRepo(db: Db) {
|
||||||
|
return {
|
||||||
|
async findAll(userId: string): Promise<Conversation[]> {
|
||||||
|
return db.select().from(conversations).where(eq(conversations.userId, userId));
|
||||||
|
},
|
||||||
|
|
||||||
|
async findById(id: string): Promise<Conversation | undefined> {
|
||||||
|
const rows = await db.select().from(conversations).where(eq(conversations.id, id));
|
||||||
|
return rows[0];
|
||||||
|
},
|
||||||
|
|
||||||
|
async create(data: NewConversation): Promise<Conversation> {
|
||||||
|
const rows = await db.insert(conversations).values(data).returning();
|
||||||
|
return rows[0]!;
|
||||||
|
},
|
||||||
|
|
||||||
|
async update(id: string, data: Partial<NewConversation>): Promise<Conversation | undefined> {
|
||||||
|
const rows = await db
|
||||||
|
.update(conversations)
|
||||||
|
.set({ ...data, updatedAt: new Date() })
|
||||||
|
.where(eq(conversations.id, id))
|
||||||
|
.returning();
|
||||||
|
return rows[0];
|
||||||
|
},
|
||||||
|
|
||||||
|
async remove(id: string): Promise<boolean> {
|
||||||
|
const rows = await db.delete(conversations).where(eq(conversations.id, id)).returning();
|
||||||
|
return rows.length > 0;
|
||||||
|
},
|
||||||
|
|
||||||
|
async findMessages(conversationId: string): Promise<Message[]> {
|
||||||
|
return db.select().from(messages).where(eq(messages.conversationId, conversationId));
|
||||||
|
},
|
||||||
|
|
||||||
|
async addMessage(data: NewMessage): Promise<Message> {
|
||||||
|
const rows = await db.insert(messages).values(data).returning();
|
||||||
|
return rows[0]!;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export type ConversationsRepo = ReturnType<typeof createConversationsRepo>;
|
||||||
@@ -1 +1,22 @@
|
|||||||
export const VERSION = '0.0.0';
|
export { createBrain, type Brain } from './brain.js';
|
||||||
|
export {
|
||||||
|
createProjectsRepo,
|
||||||
|
type ProjectsRepo,
|
||||||
|
type Project,
|
||||||
|
type NewProject,
|
||||||
|
} from './projects.js';
|
||||||
|
export {
|
||||||
|
createMissionsRepo,
|
||||||
|
type MissionsRepo,
|
||||||
|
type Mission,
|
||||||
|
type NewMission,
|
||||||
|
} from './missions.js';
|
||||||
|
export { createTasksRepo, type TasksRepo, type Task, type NewTask } from './tasks.js';
|
||||||
|
export {
|
||||||
|
createConversationsRepo,
|
||||||
|
type ConversationsRepo,
|
||||||
|
type Conversation,
|
||||||
|
type NewConversation,
|
||||||
|
type Message,
|
||||||
|
type NewMessage,
|
||||||
|
} from './conversations.js';
|
||||||
|
|||||||
42
packages/brain/src/missions.ts
Normal file
42
packages/brain/src/missions.ts
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
import { eq, type Db, missions } from '@mosaic/db';
|
||||||
|
|
||||||
|
export type Mission = typeof missions.$inferSelect;
|
||||||
|
export type NewMission = typeof missions.$inferInsert;
|
||||||
|
|
||||||
|
export function createMissionsRepo(db: Db) {
|
||||||
|
return {
|
||||||
|
async findAll(): Promise<Mission[]> {
|
||||||
|
return db.select().from(missions);
|
||||||
|
},
|
||||||
|
|
||||||
|
async findById(id: string): Promise<Mission | undefined> {
|
||||||
|
const rows = await db.select().from(missions).where(eq(missions.id, id));
|
||||||
|
return rows[0];
|
||||||
|
},
|
||||||
|
|
||||||
|
async findByProject(projectId: string): Promise<Mission[]> {
|
||||||
|
return db.select().from(missions).where(eq(missions.projectId, projectId));
|
||||||
|
},
|
||||||
|
|
||||||
|
async create(data: NewMission): Promise<Mission> {
|
||||||
|
const rows = await db.insert(missions).values(data).returning();
|
||||||
|
return rows[0]!;
|
||||||
|
},
|
||||||
|
|
||||||
|
async update(id: string, data: Partial<NewMission>): Promise<Mission | undefined> {
|
||||||
|
const rows = await db
|
||||||
|
.update(missions)
|
||||||
|
.set({ ...data, updatedAt: new Date() })
|
||||||
|
.where(eq(missions.id, id))
|
||||||
|
.returning();
|
||||||
|
return rows[0];
|
||||||
|
},
|
||||||
|
|
||||||
|
async remove(id: string): Promise<boolean> {
|
||||||
|
const rows = await db.delete(missions).where(eq(missions.id, id)).returning();
|
||||||
|
return rows.length > 0;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export type MissionsRepo = ReturnType<typeof createMissionsRepo>;
|
||||||
38
packages/brain/src/projects.ts
Normal file
38
packages/brain/src/projects.ts
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
import { eq, type Db, projects } from '@mosaic/db';
|
||||||
|
|
||||||
|
export type Project = typeof projects.$inferSelect;
|
||||||
|
export type NewProject = typeof projects.$inferInsert;
|
||||||
|
|
||||||
|
export function createProjectsRepo(db: Db) {
|
||||||
|
return {
|
||||||
|
async findAll(): Promise<Project[]> {
|
||||||
|
return db.select().from(projects);
|
||||||
|
},
|
||||||
|
|
||||||
|
async findById(id: string): Promise<Project | undefined> {
|
||||||
|
const rows = await db.select().from(projects).where(eq(projects.id, id));
|
||||||
|
return rows[0];
|
||||||
|
},
|
||||||
|
|
||||||
|
async create(data: NewProject): Promise<Project> {
|
||||||
|
const rows = await db.insert(projects).values(data).returning();
|
||||||
|
return rows[0]!;
|
||||||
|
},
|
||||||
|
|
||||||
|
async update(id: string, data: Partial<NewProject>): Promise<Project | undefined> {
|
||||||
|
const rows = await db
|
||||||
|
.update(projects)
|
||||||
|
.set({ ...data, updatedAt: new Date() })
|
||||||
|
.where(eq(projects.id, id))
|
||||||
|
.returning();
|
||||||
|
return rows[0];
|
||||||
|
},
|
||||||
|
|
||||||
|
async remove(id: string): Promise<boolean> {
|
||||||
|
const rows = await db.delete(projects).where(eq(projects.id, id)).returning();
|
||||||
|
return rows.length > 0;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export type ProjectsRepo = ReturnType<typeof createProjectsRepo>;
|
||||||
50
packages/brain/src/tasks.ts
Normal file
50
packages/brain/src/tasks.ts
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
import { eq, type Db, tasks } from '@mosaic/db';
|
||||||
|
|
||||||
|
export type Task = typeof tasks.$inferSelect;
|
||||||
|
export type NewTask = typeof tasks.$inferInsert;
|
||||||
|
|
||||||
|
export function createTasksRepo(db: Db) {
|
||||||
|
return {
|
||||||
|
async findAll(): Promise<Task[]> {
|
||||||
|
return db.select().from(tasks);
|
||||||
|
},
|
||||||
|
|
||||||
|
async findById(id: string): Promise<Task | undefined> {
|
||||||
|
const rows = await db.select().from(tasks).where(eq(tasks.id, id));
|
||||||
|
return rows[0];
|
||||||
|
},
|
||||||
|
|
||||||
|
async findByProject(projectId: string): Promise<Task[]> {
|
||||||
|
return db.select().from(tasks).where(eq(tasks.projectId, projectId));
|
||||||
|
},
|
||||||
|
|
||||||
|
async findByMission(missionId: string): Promise<Task[]> {
|
||||||
|
return db.select().from(tasks).where(eq(tasks.missionId, missionId));
|
||||||
|
},
|
||||||
|
|
||||||
|
async findByStatus(status: Task['status']): Promise<Task[]> {
|
||||||
|
return db.select().from(tasks).where(eq(tasks.status, status));
|
||||||
|
},
|
||||||
|
|
||||||
|
async create(data: NewTask): Promise<Task> {
|
||||||
|
const rows = await db.insert(tasks).values(data).returning();
|
||||||
|
return rows[0]!;
|
||||||
|
},
|
||||||
|
|
||||||
|
async update(id: string, data: Partial<NewTask>): Promise<Task | undefined> {
|
||||||
|
const rows = await db
|
||||||
|
.update(tasks)
|
||||||
|
.set({ ...data, updatedAt: new Date() })
|
||||||
|
.where(eq(tasks.id, id))
|
||||||
|
.returning();
|
||||||
|
return rows[0];
|
||||||
|
},
|
||||||
|
|
||||||
|
async remove(id: string): Promise<boolean> {
|
||||||
|
const rows = await db.delete(tasks).where(eq(tasks.id, id)).returning();
|
||||||
|
return rows.length > 0;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export type TasksRepo = ReturnType<typeof createTasksRepo>;
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
export { createDb, type Db, type DbHandle } from './client.js';
|
export { createDb, type Db, type DbHandle } from './client.js';
|
||||||
export { runMigrations } from './migrate.js';
|
export { runMigrations } from './migrate.js';
|
||||||
export * from './schema.js';
|
export * from './schema.js';
|
||||||
|
export { eq, and, or, desc, asc, sql, inArray, isNull, isNotNull } from 'drizzle-orm';
|
||||||
|
|||||||
@@ -16,7 +16,8 @@
|
|||||||
"test": "vitest run --passWithNoTests"
|
"test": "vitest run --passWithNoTests"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@mosaic/types": "workspace:*"
|
"@mosaic/types": "workspace:*",
|
||||||
|
"ioredis": "^5.10.0"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"typescript": "^5.8.0",
|
"typescript": "^5.8.0",
|
||||||
|
|||||||
@@ -1 +1,8 @@
|
|||||||
export const VERSION = '0.0.0';
|
export {
|
||||||
|
createQueue,
|
||||||
|
createQueueClient,
|
||||||
|
type QueueConfig,
|
||||||
|
type QueueHandle,
|
||||||
|
type QueueClient,
|
||||||
|
type TaskPayload,
|
||||||
|
} from './queue.js';
|
||||||
|
|||||||
67
packages/queue/src/queue.ts
Normal file
67
packages/queue/src/queue.ts
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
import Redis from 'ioredis';
|
||||||
|
|
||||||
|
const DEFAULT_VALKEY_URL = 'redis://localhost:6380';
|
||||||
|
|
||||||
|
export interface QueueConfig {
|
||||||
|
url?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface QueueHandle {
|
||||||
|
redis: Redis;
|
||||||
|
close: () => Promise<void>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface TaskPayload {
|
||||||
|
id: string;
|
||||||
|
type: string;
|
||||||
|
data: Record<string, unknown>;
|
||||||
|
createdAt: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function createQueue(config?: QueueConfig): QueueHandle {
|
||||||
|
const url = config?.url ?? process.env['VALKEY_URL'] ?? DEFAULT_VALKEY_URL;
|
||||||
|
const redis = new Redis(url, { maxRetriesPerRequest: 3 });
|
||||||
|
|
||||||
|
return {
|
||||||
|
redis,
|
||||||
|
close: async () => {
|
||||||
|
await redis.quit();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export function createQueueClient(handle: QueueHandle) {
|
||||||
|
const { redis } = handle;
|
||||||
|
|
||||||
|
return {
|
||||||
|
async enqueue(queueName: string, payload: TaskPayload): Promise<void> {
|
||||||
|
await redis.lpush(queueName, JSON.stringify(payload));
|
||||||
|
},
|
||||||
|
|
||||||
|
async dequeue(queueName: string): Promise<TaskPayload | null> {
|
||||||
|
const item = await redis.rpop(queueName);
|
||||||
|
if (!item) return null;
|
||||||
|
return JSON.parse(item) as TaskPayload;
|
||||||
|
},
|
||||||
|
|
||||||
|
async length(queueName: string): Promise<number> {
|
||||||
|
return redis.llen(queueName);
|
||||||
|
},
|
||||||
|
|
||||||
|
async publish(channel: string, message: string): Promise<void> {
|
||||||
|
await redis.publish(channel, message);
|
||||||
|
},
|
||||||
|
|
||||||
|
subscribe(channel: string, handler: (message: string) => void): () => void {
|
||||||
|
const sub = redis.duplicate();
|
||||||
|
sub.subscribe(channel).catch(() => {});
|
||||||
|
sub.on('message', (_ch: string, msg: string) => handler(msg));
|
||||||
|
return () => {
|
||||||
|
sub.unsubscribe(channel).catch(() => {});
|
||||||
|
sub.disconnect();
|
||||||
|
};
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export type QueueClient = ReturnType<typeof createQueueClient>;
|
||||||
5903
pnpm-lock.yaml
generated
5903
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user