fix(federation/client): serialize cache fills, destroy evicted Agent, cover env-var guard
- HIGH-A: resolveEntry now uses promise-cache pattern so concurrent callers serialize on a single in-flight build, eliminating duplicate key material in heap and duplicate DB round-trips - HIGH-B: flushPeer destroys the evicted undici Agent so stale TLS connections close on cert rotation - MED-C: add regression test for PEER_MISCONFIGURED when STEP_CA_ROOT_CERT_PATH is unset Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -442,6 +442,37 @@ describe('FederationClientService', () => {
|
|||||||
expect(selectSpy).toHaveBeenCalledTimes(1);
|
expect(selectSpy).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('serializes concurrent resolveEntry calls — only one DB lookup', async () => {
|
||||||
|
const db = makeDb();
|
||||||
|
const selectSpy = vi.spyOn(db, 'select');
|
||||||
|
const svc = new FederationClientService(db);
|
||||||
|
const resolveEntry = (
|
||||||
|
svc as unknown as {
|
||||||
|
resolveEntry: (peerId: string) => Promise<unknown>;
|
||||||
|
}
|
||||||
|
).resolveEntry.bind(svc);
|
||||||
|
|
||||||
|
const [a, b] = await Promise.all([resolveEntry(PEER_ID), resolveEntry(PEER_ID)]);
|
||||||
|
expect(a).toBe(b);
|
||||||
|
expect(selectSpy).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('flushPeer destroys the evicted Agent so old TLS connections close', async () => {
|
||||||
|
const db = makeDb();
|
||||||
|
const svc = new FederationClientService(db);
|
||||||
|
const resolveEntry = (
|
||||||
|
svc as unknown as {
|
||||||
|
resolveEntry: (peerId: string) => Promise<{ agent: { destroy: () => Promise<void> } }>;
|
||||||
|
}
|
||||||
|
).resolveEntry.bind(svc);
|
||||||
|
|
||||||
|
const entry = await resolveEntry(PEER_ID);
|
||||||
|
const destroySpy = vi.spyOn(entry.agent, 'destroy').mockResolvedValue();
|
||||||
|
|
||||||
|
svc.flushPeer(PEER_ID);
|
||||||
|
expect(destroySpy).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
it('flushPeer() invalidates cache — next call re-reads DB', async () => {
|
it('flushPeer() invalidates cache — next call re-reads DB', async () => {
|
||||||
const db = makeDb();
|
const db = makeDb();
|
||||||
const { mockAgent, pool } = makeMockAgent();
|
const { mockAgent, pool } = makeMockAgent();
|
||||||
@@ -474,6 +505,25 @@ describe('FederationClientService', () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// ─── loadStepCaRoot env-var guard ─────────────────────────────────────────
|
||||||
|
|
||||||
|
describe('loadStepCaRoot() env-var guard', () => {
|
||||||
|
it('throws PEER_MISCONFIGURED when STEP_CA_ROOT_CERT_PATH is not set', async () => {
|
||||||
|
delete process.env['STEP_CA_ROOT_CERT_PATH'];
|
||||||
|
const db = makeDb();
|
||||||
|
const svc = new FederationClientService(db);
|
||||||
|
const resolveEntry = (
|
||||||
|
svc as unknown as {
|
||||||
|
resolveEntry: (peerId: string) => Promise<unknown>;
|
||||||
|
}
|
||||||
|
).resolveEntry.bind(svc);
|
||||||
|
|
||||||
|
await expect(resolveEntry(PEER_ID)).rejects.toMatchObject({
|
||||||
|
code: 'PEER_MISCONFIGURED',
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
// ─── FederationClientError class ──────────────────────────────────────────
|
// ─── FederationClientError class ──────────────────────────────────────────
|
||||||
|
|
||||||
describe('FederationClientError', () => {
|
describe('FederationClientError', () => {
|
||||||
|
|||||||
@@ -120,9 +120,15 @@ export class FederationClientService {
|
|||||||
/**
|
/**
|
||||||
* Per-peer undici Agent cache.
|
* Per-peer undici Agent cache.
|
||||||
* Key = peerId (UUID string).
|
* Key = peerId (UUID string).
|
||||||
|
*
|
||||||
|
* Values are either a resolved `AgentCacheEntry` or an in-flight
|
||||||
|
* `Promise<AgentCacheEntry>` (promise-cache pattern). Storing the promise
|
||||||
|
* prevents duplicate DB lookups and duplicate key-unseal operations when two
|
||||||
|
* requests for the same peer arrive before the first build completes.
|
||||||
|
*
|
||||||
* Flush via `flushPeer(peerId)` on cert rotation / peer revocation (M5/M6).
|
* Flush via `flushPeer(peerId)` on cert rotation / peer revocation (M5/M6).
|
||||||
*/
|
*/
|
||||||
private readonly cache = new Map<string, AgentCacheEntry>();
|
private readonly cache = new Map<string, AgentCacheEntry | Promise<AgentCacheEntry>>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Step-CA root cert PEM, loaded once from `STEP_CA_ROOT_CERT_PATH`.
|
* Step-CA root cert PEM, loaded once from `STEP_CA_ROOT_CERT_PATH`.
|
||||||
@@ -218,10 +224,20 @@ export class FederationClientService {
|
|||||||
* this peer will re-read the DB and rebuild the Agent.
|
* this peer will re-read the DB and rebuild the Agent.
|
||||||
*/
|
*/
|
||||||
flushPeer(peerId: string): void {
|
flushPeer(peerId: string): void {
|
||||||
if (this.cache.has(peerId)) {
|
const entry = this.cache.get(peerId);
|
||||||
this.cache.delete(peerId);
|
if (entry === undefined) {
|
||||||
this.logger.log(`Cache flushed for peer ${peerId}`);
|
return;
|
||||||
}
|
}
|
||||||
|
this.cache.delete(peerId);
|
||||||
|
if (!(entry instanceof Promise)) {
|
||||||
|
// best-effort destroy; promise-cached entries skip destroy because
|
||||||
|
// the in-flight build owns its own Agent which will be GC'd when the
|
||||||
|
// owning request handles the rejection from the cache miss
|
||||||
|
entry.agent.destroy().catch(() => {
|
||||||
|
// intentionally ignored — destroy errors are not actionable
|
||||||
|
});
|
||||||
|
}
|
||||||
|
this.logger.log(`Cache flushed for peer ${peerId}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
@@ -263,15 +279,44 @@ export class FederationClientService {
|
|||||||
/**
|
/**
|
||||||
* Resolve the cache entry for a peer, reading DB on miss.
|
* Resolve the cache entry for a peer, reading DB on miss.
|
||||||
*
|
*
|
||||||
|
* Uses a promise-cache pattern: concurrent callers for the same uncached
|
||||||
|
* `peerId` all `await` the same in-flight `Promise<AgentCacheEntry>` so
|
||||||
|
* only one DB lookup and one key-unseal ever runs per peer per cache miss.
|
||||||
|
* The promise is replaced with the concrete entry on success, or deleted on
|
||||||
|
* rejection so a transient error does not poison the cache permanently.
|
||||||
|
*
|
||||||
* Throws `FederationClientError` with appropriate code if the peer is not
|
* Throws `FederationClientError` with appropriate code if the peer is not
|
||||||
* found, is inactive, or is missing required fields.
|
* found, is inactive, or is missing required fields.
|
||||||
*/
|
*/
|
||||||
private async resolveEntry(peerId: string): Promise<AgentCacheEntry> {
|
private async resolveEntry(peerId: string): Promise<AgentCacheEntry> {
|
||||||
const cached = this.cache.get(peerId);
|
const cached = this.cache.get(peerId);
|
||||||
if (cached) {
|
if (cached) {
|
||||||
return cached;
|
return cached; // Promise or concrete entry — both are awaitable
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const inflight = this.buildEntry(peerId).then(
|
||||||
|
(entry) => {
|
||||||
|
this.cache.set(peerId, entry); // replace promise with concrete value
|
||||||
|
return entry;
|
||||||
|
},
|
||||||
|
(err: unknown) => {
|
||||||
|
this.cache.delete(peerId); // don't poison the cache with a rejected promise
|
||||||
|
throw err;
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
this.cache.set(peerId, inflight);
|
||||||
|
return inflight;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build the `AgentCacheEntry` for a peer by reading the DB, validating the
|
||||||
|
* peer's state, unsealing the private key, and constructing the mTLS Agent.
|
||||||
|
*
|
||||||
|
* Throws `FederationClientError` with appropriate code if the peer is not
|
||||||
|
* found, is inactive, or is missing required fields.
|
||||||
|
*/
|
||||||
|
private async buildEntry(peerId: string): Promise<AgentCacheEntry> {
|
||||||
// DB lookup
|
// DB lookup
|
||||||
const [peer] = await this.db
|
const [peer] = await this.db
|
||||||
.select()
|
.select()
|
||||||
@@ -335,7 +380,6 @@ export class FederationClientService {
|
|||||||
certSerial: peer.certSerial,
|
certSerial: peer.certSerial,
|
||||||
};
|
};
|
||||||
|
|
||||||
this.cache.set(peerId, entry);
|
|
||||||
this.logger.log(`Agent cached for peer ${peerId} (serial: ${peer.certSerial})`);
|
this.logger.log(`Agent cached for peer ${peerId} (serial: ${peer.certSerial})`);
|
||||||
|
|
||||||
return entry;
|
return entry;
|
||||||
|
|||||||
Reference in New Issue
Block a user