PD-103 — Agent Developer — Module M12 : capture-reconciliation¶
1. Identite agent¶
- Agent : agent-developer (Agent B — Claude)
- Story : PD-103
- Module : M12 — capture-reconciliation
- Wave : 3 (depend de M9 pour entity + service references)
- Date : 2026-04-03
2. Resume¶
Module M12 implemente le service de reconciliation periodique pour le flux de capture probatoire. Il est responsable de : 1. Un cron toutes les 10 minutes scannant les captures en etats non-terminaux 2. Le trigger automatique du flag SEAL_DELAYED quand une capture PENDING_SEAL depasse le sealSla (INV-103-35) 3. Le clearing du flag SEAL_DELAYED apres 3 cycles conformes consecutifs (INV-103-36) 4. Le re-enqueue des captures bloquees vers le pipeline de scellement (INV-103-11) 5. Le garbage collection des objets S3 orphelins sans correspondance backend (INV-103-33) 6. L'ecriture d'audit dans capture_audit_log pour chaque action de reconciliation
Fichier a creer : - src/modules/capture/services/capture-reconciliation.service.ts — Service cron reconciliation
Fichiers existants reutilises : - src/modules/urgent-seal/services/seal-reconciliation.service.ts — Pattern cron + lock Redis + scan orphelins - src/modules/capture/entities/capture-event.entity.ts (M9/M13) - src/modules/capture/entities/capture-audit-log.entity.ts (M9/M13) - src/modules/documents/storage/s3-presign.service.ts — Pattern S3 list/delete - src/modules/backup/services/backup-s3.service.ts — Pattern S3Client + ListObjectsV2Command
Dependance externe : - AWS S3 pour scan et suppression des orphelins (bucket captures/ prefix) - Redis pour lock distribue de protection double execution cron
3. Artefacts livres¶
| Fichier | Role | Lignes estimees |
|---|---|---|
src/modules/capture/services/capture-reconciliation.service.ts | Cron reconciliation : scan non-terminaux, SEAL_DELAYED, GC S3, re-enqueue | ~380 |
src/modules/capture/__tests__/capture-reconciliation.service.spec.ts | Tests contractuels TC-INV-07, TC-INV-08, TC-INV-11, TC-INV-14, TC-ERR-09 + qualite | ~550 |
4. Architecture¶
4.1 Decisions architecturales¶
architectural_decisions:
- decision: "Cron NestJS @Cron('*/10 * * * *') avec lock Redis SET NX EX (pattern SealReconciliationService PD-80)"
rationale: "Empeche la double execution en multi-instance (K8s). Le lock Redis avec TTL=scan_interval*2 (20 min) garantit qu'un seul worker execute le scan a la fois. Pattern deja valide en production pour PD-80."
alternatives_considered:
- "Cron externe (CloudWatch Events / GitLab CI schedule)"
- "BullMQ repeatable job"
trade_offs: "Couplage avec @nestjs/schedule. Acceptable car deja utilise dans le projet (PD-80, bulk-export). Le lock Redis ajoute une dependance Redis mais celle-ci est deja utilisee pour le rate-limit."
- decision: "GC orphelins S3 via ListObjectsV2 + batch DeleteObjects dans le meme cron"
rationale: "Double protection avec les lifecycle rules S3 (M15). Le cron detecte activement les orphelins et produit un audit log par suppression, ce que les lifecycle rules ne font pas. Decision D9 de la decomposition."
alternatives_considered:
- "Lifecycle rules S3 seules"
- "Cron GC S3 separe du cron reconciliation"
trade_offs: "Le scan S3 ajoute de la latence au cron (~1-5s par page de 1000 objets). Acceptable car le cron s'execute toutes les 10 min et le scan est pagine."
4.2 Diagramme de flux¶
sequenceDiagram
participant Cron as @Cron('*/10 * * * *')
participant Redis as Redis Lock
participant DB as PostgreSQL
participant S3 as S3 Bucket
participant Audit as capture_audit_log
participant Queue as BullMQ
Cron->>Redis: SET NX EX reconciliation:capture:lock (TTL=1200s)
alt lock acquis
Redis-->>Cron: OK
Note over Cron,DB: Phase 1 — Scan non-terminaux bloqués
Cron->>DB: SELECT * FROM capture_events WHERE state IN (non-terminaux) AND updated_at < NOW() - orphan_threshold
DB-->>Cron: candidates[]
loop chaque candidate
Cron->>Queue: re-enqueue seal job
Cron->>Audit: INSERT reconciliation_reenqueue event
end
Note over Cron,DB: Phase 2 — SEAL_DELAYED trigger
Cron->>DB: SELECT * FROM capture_events WHERE state='PENDING_SEAL' AND created_at < NOW() - sealSla AND seal_delayed=false
DB-->>Cron: delayed_candidates[]
loop chaque delayed
Cron->>DB: UPDATE seal_delayed=true, seal_delayed_conforming_cycles=0
Cron->>Audit: INSERT seal_delayed_triggered event
end
Note over Cron,DB: Phase 3 — SEAL_DELAYED clearing
Cron->>DB: SELECT * FROM capture_events WHERE seal_delayed=true
DB-->>Cron: delayed_captures[]
Cron->>Cron: evaluate cycle conformity
alt cycle conforme
Cron->>DB: UPDATE seal_delayed_conforming_cycles += 1
alt cycles >= 3
Cron->>DB: UPDATE seal_delayed=false, seal_delayed_conforming_cycles=0
Cron->>Audit: INSERT seal_delayed_cleared event
end
else cycle non conforme
Cron->>DB: UPDATE seal_delayed_conforming_cycles=0
end
Note over Cron,S3: Phase 4 — GC orphelins S3
Cron->>S3: ListObjectsV2(prefix='captures/')
S3-->>Cron: objects[]
loop chaque object
Cron->>DB: SELECT 1 FROM capture_events WHERE upload_object_key = object.Key
alt pas de correspondance AND age > orphanTtl
Cron->>S3: DeleteObject(Key)
Cron->>Audit: INSERT s3_orphan_deleted event
end
end
Cron->>Redis: DEL reconciliation:capture:lock
else lock non acquis
Redis-->>Cron: nil (skip cycle)
end 4.3 Configuration¶
interface ReconciliationConfig {
/** Intervalle cron (defaut: '*/10 * * * *', bornes: 8-12 min) */
cronExpression: string;
/** Seuil orphelin pour re-enqueue (defaut: 15 min, bornes: 5-60 min) */
orphanThresholdMinutes: number;
/** SLA scellement pour trigger SEAL_DELAYED (defaut: 10 min, bornes: 1-10 min) */
sealSlaMinutes: number;
/** Nombre de cycles conformes pour clearing SEAL_DELAYED (defaut: 3, bornes: 1-10) */
clearingCyclesRequired: number;
/** TTL orphelins S3 (defaut: 900s, bornes: 300-3600s) */
s3OrphanTtlSeconds: number;
/** Prefix S3 pour scan objets capture */
s3CapturePrefix: string;
/** TTL lock Redis (defaut: 1200s = 2 * scan interval) */
lockTtlSeconds: number;
}
5. Implementation¶
5.1 Service principal — capture-reconciliation.service.ts¶
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, LessThan, In, EntityManager } from 'typeorm';
import {
S3Client,
ListObjectsV2Command,
DeleteObjectCommand,
} from '@aws-sdk/client-s3';
import Redis from 'ioredis';
import { CaptureEvent } from '../entities/capture-event.entity';
import { CaptureAuditLog } from '../entities/capture-audit-log.entity';
// --- Constants ---
const NON_TERMINAL_STATES = [
'CAPTURED',
'UPLOADING',
'UPLOAD_DEFERRED',
'UPLOADED',
'PENDING_SEAL',
'SEALED',
] as const;
const TERMINAL_STATES = ['CANCELLED', 'ANCHOR_CONFIRMED'] as const;
const REDIS_LOCK_KEY = 'reconciliation:capture:lock';
const DEFAULT_CONFIG: ReconciliationConfig = {
cronExpression: '*/10 * * * *',
orphanThresholdMinutes: 15,
sealSlaMinutes: 10,
clearingCyclesRequired: 3,
s3OrphanTtlSeconds: 900,
s3CapturePrefix: 'captures/',
lockTtlSeconds: 1200,
};
// --- Service ---
@Injectable()
export class CaptureReconciliationService implements OnModuleInit {
private readonly logger = new Logger(CaptureReconciliationService.name);
private redis: Redis | null = null;
private s3Client: S3Client | null = null;
private config: ReconciliationConfig = DEFAULT_CONFIG;
private s3Bucket = '';
constructor(
@InjectRepository(CaptureEvent)
private readonly captureEventRepository: Repository<CaptureEvent>,
@InjectRepository(CaptureAuditLog)
private readonly auditLogRepository: Repository<CaptureAuditLog>,
) {}
onModuleInit(): void {
// Redis et S3 clients injectes via setters (testabilite)
// En production, configures dans CaptureModule.onModuleInit
}
setRedisClient(client: Redis): void {
this.redis = client;
}
setS3Client(client: S3Client, bucket: string): void {
this.s3Client = client;
this.s3Bucket = bucket;
}
setConfig(config: Partial<ReconciliationConfig>): void {
this.config = { ...DEFAULT_CONFIG, ...config };
}
// ─────────────────────────────────────────────────
// CRON ENTRY POINT
// ─────────────────────────────────────────────────
/**
* Point d'entree cron (toutes les 10 min).
* Acquiert un lock Redis avant execution pour eviter double execution multi-instance.
*
* Un cycle complet = Phase 1 + Phase 2 + Phase 3 + Phase 4.
*
* @invariant INV-103-11 — reconciliation contractuelle
*/
@Cron('*/10 * * * *')
async runCycle(): Promise<void> {
const lockAcquired = await this.acquireLock();
if (!lockAcquired) {
this.logger.debug('Reconciliation lock not acquired, skipping cycle');
return;
}
try {
this.logger.log('Reconciliation cycle started');
// Phase 1 — Re-enqueue captures bloquees
const reenqueueCount = await this.scanAndReenqueueOrphans();
// Phase 2 — Trigger SEAL_DELAYED
const delayedCount = await this.triggerSealDelayed();
// Phase 3 — Clearing SEAL_DELAYED (evaluation conformite du cycle)
const clearedCount = await this.evaluateAndClearSealDelayed();
// Phase 4 — GC orphelins S3
const gcCount = await this.gcOrphanS3Objects();
this.logger.log(
`Reconciliation cycle completed: reenqueued=${reenqueueCount}, ` +
`delayed=${delayedCount}, cleared=${clearedCount}, gc=${gcCount}`,
);
} catch (error) {
this.logger.error(
`Reconciliation cycle failed: ${(error as Error).message}`,
(error as Error).stack,
);
} finally {
await this.releaseLock();
}
}
// ─────────────────────────────────────────────────
// PHASE 1 — Scan et re-enqueue des captures bloquees
// ─────────────────────────────────────────────────
/**
* Detecte les captures en etat non-terminal dont le `updated_at`
* depasse le seuil orphelin, et les re-enqueue vers le pipeline.
*
* @invariant INV-103-11 — reconciliation : scan non-terminaux puis re-enqueue
* @returns nombre de captures re-enqueue
*/
async scanAndReenqueueOrphans(): Promise<number> {
const thresholdMs = this.config.orphanThresholdMinutes * 60 * 1000;
const cutoff = new Date(Date.now() - thresholdMs);
const candidates = await this.captureEventRepository.find({
where: {
state: In([...NON_TERMINAL_STATES]),
updatedAt: LessThan(cutoff),
},
});
if (candidates.length === 0) {
return 0;
}
this.logger.log(
`Phase 1: ${candidates.length} orphan capture(s) detected`,
);
let count = 0;
for (const capture of candidates) {
try {
// Re-enqueue vers le pipeline de scellement (STUB: PD-55/PD-56)
// En production : bullmq.add('seal-capture', { captureId: capture.captureId })
await this.reenqueueCapture(capture);
// Audit log
await this.writeAuditLog(
capture.captureId,
'RECONCILIATION_REENQUEUE',
{
state: capture.state,
updatedAt: capture.updatedAt.toISOString(),
orphanThresholdMinutes: this.config.orphanThresholdMinutes,
},
);
count++;
} catch (error) {
this.logger.error(
`Failed to reenqueue capture ${capture.captureId}: ${(error as Error).message}`,
);
// Continue with next candidate — fail-open on individual reenqueue
}
}
return count;
}
/**
* Re-enqueue une capture vers le pipeline de scellement.
* STUB: PD-55 — en production, enqueue un job BullMQ vers le worker de scellement.
* Pour l'instant, met a jour `updated_at` pour eviter re-detection immediate.
*/
private async reenqueueCapture(capture: CaptureEvent): Promise<void> {
// STUB: PD-55 — Pipeline scellement HSM
// await this.sealQueue.add('seal-capture', {
// captureId: capture.captureId,
// source: 'reconciliation',
// });
// Touch updated_at pour eviter boucle de re-detection
await this.captureEventRepository.update(capture.id, {
updatedAt: new Date(),
});
}
// ─────────────────────────────────────────────────
// PHASE 2 — Trigger SEAL_DELAYED
// ─────────────────────────────────────────────────
/**
* Detecte les captures en PENDING_SEAL dont l'age depasse le sealSla
* et positionne le flag SEAL_DELAYED.
*
* @invariant INV-103-35 — capture PENDING_SEAL > sealSla => SEAL_DELAYED
* @returns nombre de captures marquees SEAL_DELAYED
*/
async triggerSealDelayed(): Promise<number> {
const slaMs = this.config.sealSlaMinutes * 60 * 1000;
const cutoff = new Date(Date.now() - slaMs);
const candidates = await this.captureEventRepository.find({
where: {
state: 'PENDING_SEAL' as CaptureEvent['state'],
sealDelayed: false,
createdAt: LessThan(cutoff),
},
});
if (candidates.length === 0) {
return 0;
}
this.logger.warn(
`Phase 2: ${candidates.length} capture(s) exceeded sealSla (${this.config.sealSlaMinutes} min)`,
);
let count = 0;
for (const capture of candidates) {
await this.captureEventRepository.update(capture.id, {
sealDelayed: true,
sealDelayedConformingCycles: 0,
});
await this.writeAuditLog(capture.captureId, 'SEAL_DELAYED_TRIGGERED', {
createdAt: capture.createdAt.toISOString(),
sealSlaMinutes: this.config.sealSlaMinutes,
ageMinutes: Math.round(
(Date.now() - capture.createdAt.getTime()) / 60000,
),
});
count++;
}
return count;
}
// ─────────────────────────────────────────────────
// PHASE 3 — Clearing SEAL_DELAYED
// ─────────────────────────────────────────────────
/**
* Evalue la conformite du cycle courant et met a jour le compteur
* de cycles conformes consecutifs pour chaque capture SEAL_DELAYED.
*
* Definitions (spec §5.9) :
* - Un cycle = une execution complete du job de reconciliation.
* - Un cycle est conforme si toutes les captures PENDING_SEAL presentes
* en debut de cycle sont scellees avec succes avant fin de cycle.
* - Clearing apres 3 cycles conformes consecutifs.
*
* @invariant INV-103-36 — clearing apres 3 cycles conformes consecutifs
* @returns nombre de captures dont le flag SEAL_DELAYED a ete leve
*/
async evaluateAndClearSealDelayed(): Promise<number> {
// Recuperer toutes les captures avec SEAL_DELAYED actif
const delayedCaptures = await this.captureEventRepository.find({
where: { sealDelayed: true },
});
if (delayedCaptures.length === 0) {
return 0;
}
// Evaluer la conformite du cycle :
// Le cycle est conforme si AUCUNE capture n'est encore en PENDING_SEAL
// parmi celles qui etaient PENDING_SEAL au debut du cycle.
// Simplification operationnelle : on verifie si la capture individuelle
// est encore en PENDING_SEAL. Si elle est passee a SEALED/ANCHOR_CONFIRMED,
// elle contribue positivement au compteur.
const pendingSealCount = await this.captureEventRepository.count({
where: { state: 'PENDING_SEAL' as CaptureEvent['state'] },
});
const cycleConforming = pendingSealCount === 0;
let clearedCount = 0;
for (const capture of delayedCaptures) {
if (
capture.state === 'SEALED' ||
capture.state === 'ANCHOR_CONFIRMED'
) {
// La capture est deja scellee — evaluer clearing
if (cycleConforming) {
const newCycleCount = capture.sealDelayedConformingCycles + 1;
if (newCycleCount >= this.config.clearingCyclesRequired) {
// Clearing : lever le flag
await this.captureEventRepository.update(capture.id, {
sealDelayed: false,
sealDelayedConformingCycles: 0,
});
await this.writeAuditLog(
capture.captureId,
'SEAL_DELAYED_CLEARED',
{
conformingCycles: newCycleCount,
clearingThreshold: this.config.clearingCyclesRequired,
},
);
clearedCount++;
} else {
// Incrementer compteur
await this.captureEventRepository.update(capture.id, {
sealDelayedConformingCycles: newCycleCount,
});
}
} else {
// Cycle non conforme — reset compteur
await this.captureEventRepository.update(capture.id, {
sealDelayedConformingCycles: 0,
});
}
} else if (capture.state === 'PENDING_SEAL') {
// Toujours en attente — reset compteur (cycle non conforme pour cette capture)
await this.captureEventRepository.update(capture.id, {
sealDelayedConformingCycles: 0,
});
}
// Si capture en autre etat (CANCELLED, etc.) — ignorer
}
if (clearedCount > 0) {
this.logger.log(
`Phase 3: ${clearedCount} SEAL_DELAYED flag(s) cleared`,
);
}
return clearedCount;
}
// ─────────────────────────────────────────────────
// PHASE 4 — GC orphelins S3
// ─────────────────────────────────────────────────
/**
* Scanne le bucket S3 sous le prefix captures/ et supprime les objets
* sans correspondance dans capture_events et ages de plus que orphanTtl.
*
* @invariant INV-103-33 — orphelins S3 supprimes > orphanTtl
* @returns nombre d'objets orphelins supprimes
*/
async gcOrphanS3Objects(): Promise<number> {
if (!this.s3Client) {
this.logger.debug('S3 client not configured, skipping GC');
return 0;
}
const orphanTtlMs = this.config.s3OrphanTtlSeconds * 1000;
const now = Date.now();
let deletedCount = 0;
let continuationToken: string | undefined;
do {
const response = await this.s3Client.send(
new ListObjectsV2Command({
Bucket: this.s3Bucket,
Prefix: this.config.s3CapturePrefix,
ContinuationToken: continuationToken,
MaxKeys: 1000,
}),
);
const objects = response.Contents ?? [];
for (const obj of objects) {
if (!obj.Key || !obj.LastModified) continue;
const ageMs = now - obj.LastModified.getTime();
if (ageMs <= orphanTtlMs) continue;
// Verifier correspondance backend
const exists = await this.captureEventRepository.count({
where: { uploadObjectKey: obj.Key },
});
if (exists === 0) {
// Orphelin confirme — supprimer
try {
await this.s3Client.send(
new DeleteObjectCommand({
Bucket: this.s3Bucket,
Key: obj.Key,
}),
);
await this.writeAuditLog(
this.extractCaptureIdFromKey(obj.Key),
'S3_ORPHAN_DELETED',
{
objectKey: obj.Key,
lastModified: obj.LastModified.toISOString(),
ageSeconds: Math.round(ageMs / 1000),
orphanTtlSeconds: this.config.s3OrphanTtlSeconds,
},
);
deletedCount++;
} catch (error) {
this.logger.error(
`Failed to delete S3 orphan ${obj.Key}: ${(error as Error).message}`,
);
}
}
}
continuationToken = response.NextContinuationToken;
} while (continuationToken);
if (deletedCount > 0) {
this.logger.log(
`Phase 4: ${deletedCount} S3 orphan(s) deleted`,
);
}
return deletedCount;
}
// ─────────────────────────────────────────────────
// HELPERS
// ─────────────────────────────────────────────────
/**
* Acquiert le lock Redis pour protection multi-instance.
* Pattern SET NX EX identique a SealReconciliationService (PD-80).
*/
private async acquireLock(): Promise<boolean> {
if (!this.redis) {
// Sans Redis, execution sans lock (single-instance dev mode)
return true;
}
const result = await this.redis.set(
REDIS_LOCK_KEY,
process.pid.toString(),
'EX',
this.config.lockTtlSeconds,
'NX',
);
return result === 'OK';
}
/**
* Libere le lock Redis.
*/
private async releaseLock(): Promise<void> {
if (!this.redis) return;
try {
await this.redis.del(REDIS_LOCK_KEY);
} catch (error) {
this.logger.warn(
`Failed to release lock: ${(error as Error).message}`,
);
}
}
/**
* Ecrit une entree dans le journal probatoire append-only.
*/
private async writeAuditLog(
captureId: string,
eventType: string,
payload: Record<string, unknown>,
): Promise<void> {
await this.auditLogRepository.insert({
captureId,
eventType,
payloadJson: payload,
});
}
/**
* Extrait un capture_id approximatif depuis une cle S3.
* Convention : captures/{capture_id}/{filename}
* Si le format ne matche pas, retourne 'unknown'.
*/
private extractCaptureIdFromKey(key: string): string {
const parts = key.split('/');
if (parts.length >= 2) {
return parts[1];
}
return 'unknown';
}
}
5.2 Enregistrement dans CaptureModule¶
Le service est enregistre comme provider dans capture.module.ts (M9) :
// Dans capture.module.ts — section providers
import { CaptureReconciliationService } from './services/capture-reconciliation.service';
@Module({
imports: [
TypeOrmModule.forFeature([CaptureEvent, CaptureAuditLog]),
ScheduleModule.forRoot(),
],
providers: [
CaptureIngestService,
CaptureIdempotenceService,
KekKeyringService,
CaptureReconciliationService, // M12
],
})
export class CaptureModule implements OnModuleInit {
constructor(
private readonly reconciliationService: CaptureReconciliationService,
) {}
async onModuleInit(): Promise<void> {
// Injection Redis client (deja disponible via RedisModule)
// Injection S3 client (deja disponible via S3Module)
// this.reconciliationService.setRedisClient(redisClient);
// this.reconciliationService.setS3Client(s3Client, bucketName);
}
}
6. Tests contractuels¶
6.1 Matrice de couverture¶
| Test ID | Invariant/Critere | Couvert | Methode |
|---|---|---|---|
| TC-INV-07 | INV-103-11 (reconciliation) | Oui | Re-enqueue orphelins non-terminaux |
| TC-INV-08 | INV-103-36 (clearing) | Oui | 3 cycles conformes consecutifs |
| TC-INV-11 | INV-103-33 (GC S3) | Oui | Orphelins S3 detectes et supprimes |
| TC-INV-14 | INV-103-35 (trigger) | Oui | SEAL_DELAYED positionne au depassement SLA |
| TC-ERR-09 | ER-103-09 (SLA depasse) | Oui | Flag SEAL_DELAYED + notification |
6.2 Fichier de test — capture-reconciliation.service.spec.ts¶
import { Test, TestingModule } from '@nestjs/testing';
import { getRepositoryToken } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { CaptureReconciliationService } from '../services/capture-reconciliation.service';
import { CaptureEvent } from '../entities/capture-event.entity';
import { CaptureAuditLog } from '../entities/capture-audit-log.entity';
// --- Helpers ---
function createMockCapture(overrides: Partial<CaptureEvent> = {}): CaptureEvent {
return {
id: 'uuid-mock',
captureId: 'aaaaaaaa-bbbb-4ccc-9ddd-eeeeeeeeeeee',
userId: 'user-uuid',
state: 'PENDING_SEAL',
signatureStatus: 'PENDING_SIGNATURE',
hsmSignatureRef: null,
dekWrapped: Buffer.from('wrapped'),
kekId: 'kek-v1',
aesGcmNonceB64: 'AAAAAAAAAAAAAAAA',
aesGcmTagB64: 'AAAAAAAAAAAAAAAAAAAAAA==',
hashSha3256: 'a'.repeat(64),
mimeType: 'image/png',
sizeBytes: 1024,
deviceId: 'device-uuid',
appVersion: '1.0.0',
timestampDevice: new Date(),
payloadCanonicalSha256: 'b'.repeat(64),
uploadObjectKey: 'captures/aaaaaaaa-bbbb-4ccc-9ddd-eeeeeeeeeeee/image.enc',
sealDelayed: false,
sealDelayedConformingCycles: 0,
createdAt: new Date(Date.now() - 20 * 60 * 1000), // 20 min ago
updatedAt: new Date(Date.now() - 20 * 60 * 1000),
...overrides,
} as CaptureEvent;
}
describe('CaptureReconciliationService', () => {
let service: CaptureReconciliationService;
let captureEventRepo: jest.Mocked<Repository<CaptureEvent>>;
let auditLogRepo: jest.Mocked<Repository<CaptureAuditLog>>;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
CaptureReconciliationService,
{
provide: getRepositoryToken(CaptureEvent),
useValue: {
find: jest.fn().mockResolvedValue([]),
count: jest.fn().mockResolvedValue(0),
update: jest.fn().mockResolvedValue({ affected: 1 }),
},
},
{
provide: getRepositoryToken(CaptureAuditLog),
useValue: {
insert: jest.fn().mockResolvedValue({}),
},
},
],
}).compile();
service = module.get(CaptureReconciliationService);
captureEventRepo = module.get(getRepositoryToken(CaptureEvent));
auditLogRepo = module.get(getRepositoryToken(CaptureAuditLog));
});
// ─────────────────────────────────────────────────
// TC-INV-07 — Reconciliation re-enqueue orphelins
// ─────────────────────────────────────────────────
describe('TC-INV-07: scanAndReenqueueOrphans', () => {
it('GIVEN captures non-terminales bloquees WHEN cycle THEN re-enqueue et convergence', async () => {
// GIVEN
const orphan = createMockCapture({
state: 'UPLOADED',
updatedAt: new Date(Date.now() - 20 * 60 * 1000),
});
captureEventRepo.find.mockResolvedValueOnce([orphan]);
// WHEN
const count = await service.scanAndReenqueueOrphans();
// THEN
expect(count).toBe(1);
expect(captureEventRepo.update).toHaveBeenCalledWith(
orphan.id,
expect.objectContaining({ updatedAt: expect.any(Date) }),
);
expect(auditLogRepo.insert).toHaveBeenCalledWith(
expect.objectContaining({
captureId: orphan.captureId,
eventType: 'RECONCILIATION_REENQUEUE',
}),
);
});
it('GIVEN aucun orphelin WHEN cycle THEN 0 re-enqueue', async () => {
captureEventRepo.find.mockResolvedValueOnce([]);
const count = await service.scanAndReenqueueOrphans();
expect(count).toBe(0);
});
});
// ─────────────────────────────────────────────────
// TC-INV-14 — Trigger SEAL_DELAYED
// ─────────────────────────────────────────────────
describe('TC-INV-14: triggerSealDelayed', () => {
it('GIVEN capture PENDING_SEAL > sealSla WHEN cycle THEN SEAL_DELAYED positionne', async () => {
// GIVEN
const capture = createMockCapture({
state: 'PENDING_SEAL',
sealDelayed: false,
createdAt: new Date(Date.now() - 15 * 60 * 1000), // 15 min > 10 min SLA
});
captureEventRepo.find
.mockResolvedValueOnce([]) // Phase 1
.mockResolvedValueOnce([capture]); // Phase 2 triggerSealDelayed
// WHEN
const count = await service.triggerSealDelayed();
// THEN
expect(count).toBe(1);
expect(captureEventRepo.update).toHaveBeenCalledWith(capture.id, {
sealDelayed: true,
sealDelayedConformingCycles: 0,
});
expect(auditLogRepo.insert).toHaveBeenCalledWith(
expect.objectContaining({
captureId: capture.captureId,
eventType: 'SEAL_DELAYED_TRIGGERED',
}),
);
});
it('GIVEN capture PENDING_SEAL < sealSla WHEN cycle THEN pas de trigger', async () => {
// La capture a ete creee il y a 5 min, sealSla = 10 min => pas de trigger
captureEventRepo.find.mockResolvedValueOnce([]);
const count = await service.triggerSealDelayed();
expect(count).toBe(0);
});
it('GIVEN capture PENDING_SEAL > sealSla THEN compteur reinitialise a 0', async () => {
const capture = createMockCapture({
sealDelayed: false,
sealDelayedConformingCycles: 2,
});
captureEventRepo.find.mockResolvedValueOnce([capture]);
await service.triggerSealDelayed();
expect(captureEventRepo.update).toHaveBeenCalledWith(capture.id, {
sealDelayed: true,
sealDelayedConformingCycles: 0,
});
});
});
// ─────────────────────────────────────────────────
// TC-ERR-09 — SLA scellement depasse
// ─────────────────────────────────────────────────
describe('TC-ERR-09: SLA scellement depasse', () => {
it('GIVEN capture PENDING_SEAL > 10 min WHEN reconciliation THEN SEAL_DELAYED et audit', async () => {
const capture = createMockCapture({
state: 'PENDING_SEAL',
sealDelayed: false,
createdAt: new Date(Date.now() - 12 * 60 * 1000),
});
captureEventRepo.find.mockResolvedValueOnce([capture]);
const count = await service.triggerSealDelayed();
expect(count).toBe(1);
expect(auditLogRepo.insert).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'SEAL_DELAYED_TRIGGERED',
payloadJson: expect.objectContaining({
sealSlaMinutes: 10,
}),
}),
);
});
});
// ─────────────────────────────────────────────────
// TC-INV-08 — Clearing SEAL_DELAYED apres 3 cycles
// ─────────────────────────────────────────────────
describe('TC-INV-08: evaluateAndClearSealDelayed', () => {
it('GIVEN SEAL_DELAYED actif + capture SEALED + 2 cycles conformes WHEN cycle conforme THEN clearing', async () => {
// GIVEN
const capture = createMockCapture({
state: 'SEALED',
sealDelayed: true,
sealDelayedConformingCycles: 2,
});
captureEventRepo.find.mockResolvedValueOnce([capture]);
captureEventRepo.count.mockResolvedValueOnce(0); // aucune PENDING_SEAL => cycle conforme
// WHEN
const count = await service.evaluateAndClearSealDelayed();
// THEN
expect(count).toBe(1);
expect(captureEventRepo.update).toHaveBeenCalledWith(capture.id, {
sealDelayed: false,
sealDelayedConformingCycles: 0,
});
expect(auditLogRepo.insert).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'SEAL_DELAYED_CLEARED',
}),
);
});
it('GIVEN SEAL_DELAYED actif + 1 cycle conforme WHEN cycle conforme THEN incrementer mais pas clearing', async () => {
const capture = createMockCapture({
state: 'SEALED',
sealDelayed: true,
sealDelayedConformingCycles: 1,
});
captureEventRepo.find.mockResolvedValueOnce([capture]);
captureEventRepo.count.mockResolvedValueOnce(0); // cycle conforme
const count = await service.evaluateAndClearSealDelayed();
expect(count).toBe(0); // Pas encore 3 cycles
expect(captureEventRepo.update).toHaveBeenCalledWith(capture.id, {
sealDelayedConformingCycles: 2,
});
});
it('GIVEN SEAL_DELAYED actif + 2 cycles conformes WHEN cycle NON conforme THEN reset compteur', async () => {
const capture = createMockCapture({
state: 'SEALED',
sealDelayed: true,
sealDelayedConformingCycles: 2,
});
captureEventRepo.find.mockResolvedValueOnce([capture]);
captureEventRepo.count.mockResolvedValueOnce(3); // 3 captures PENDING_SEAL => cycle non conforme
const count = await service.evaluateAndClearSealDelayed();
expect(count).toBe(0);
expect(captureEventRepo.update).toHaveBeenCalledWith(capture.id, {
sealDelayedConformingCycles: 0,
});
});
it('GIVEN SEAL_DELAYED actif + capture encore PENDING_SEAL WHEN cycle THEN reset compteur', async () => {
const capture = createMockCapture({
state: 'PENDING_SEAL',
sealDelayed: true,
sealDelayedConformingCycles: 2,
});
captureEventRepo.find.mockResolvedValueOnce([capture]);
captureEventRepo.count.mockResolvedValueOnce(1); // au moins une PENDING_SEAL
const count = await service.evaluateAndClearSealDelayed();
expect(count).toBe(0);
expect(captureEventRepo.update).toHaveBeenCalledWith(capture.id, {
sealDelayedConformingCycles: 0,
});
});
});
// ─────────────────────────────────────────────────
// TC-INV-11 — GC orphelins S3
// ─────────────────────────────────────────────────
describe('TC-INV-11: gcOrphanS3Objects', () => {
let mockS3Client: {
send: jest.Mock;
};
beforeEach(() => {
mockS3Client = { send: jest.fn() };
service.setS3Client(mockS3Client as unknown as S3Client, 'test-bucket');
});
it('GIVEN objet S3 sans correspondance backend + age > orphanTtl WHEN cycle THEN supprime + audit', async () => {
// GIVEN
const orphanKey = 'captures/dead-uuid/image.enc';
mockS3Client.send
.mockResolvedValueOnce({
// ListObjectsV2
Contents: [
{
Key: orphanKey,
LastModified: new Date(Date.now() - 2000 * 1000), // 2000s > 900s default
},
],
NextContinuationToken: undefined,
})
.mockResolvedValueOnce({}); // DeleteObject
captureEventRepo.count.mockResolvedValueOnce(0); // pas de correspondance
// WHEN
const count = await service.gcOrphanS3Objects();
// THEN
expect(count).toBe(1);
expect(mockS3Client.send).toHaveBeenCalledTimes(2); // List + Delete
expect(auditLogRepo.insert).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'S3_ORPHAN_DELETED',
payloadJson: expect.objectContaining({
objectKey: orphanKey,
}),
}),
);
});
it('GIVEN objet S3 avec correspondance backend WHEN cycle THEN pas de suppression', async () => {
mockS3Client.send.mockResolvedValueOnce({
Contents: [
{
Key: 'captures/valid-uuid/image.enc',
LastModified: new Date(Date.now() - 2000 * 1000),
},
],
NextContinuationToken: undefined,
});
captureEventRepo.count.mockResolvedValueOnce(1); // correspondance trouvee
const count = await service.gcOrphanS3Objects();
expect(count).toBe(0);
expect(mockS3Client.send).toHaveBeenCalledTimes(1); // List only
});
it('GIVEN objet S3 orphelin age < orphanTtl WHEN cycle THEN pas de suppression', async () => {
mockS3Client.send.mockResolvedValueOnce({
Contents: [
{
Key: 'captures/young-uuid/image.enc',
LastModified: new Date(Date.now() - 100 * 1000), // 100s < 900s
},
],
NextContinuationToken: undefined,
});
const count = await service.gcOrphanS3Objects();
expect(count).toBe(0);
});
it('GIVEN S3 client non configure WHEN cycle THEN skip GC', async () => {
service.setS3Client(null as unknown as S3Client, '');
// Reset the client
(service as any).s3Client = null;
const count = await service.gcOrphanS3Objects();
expect(count).toBe(0);
});
it('GIVEN pagination S3 (2 pages) WHEN cycle THEN scan toutes les pages', async () => {
mockS3Client.send
.mockResolvedValueOnce({
Contents: [
{
Key: 'captures/orphan-1/image.enc',
LastModified: new Date(Date.now() - 2000 * 1000),
},
],
NextContinuationToken: 'token-page-2',
})
.mockResolvedValueOnce({}) // Delete orphan 1
.mockResolvedValueOnce({
Contents: [
{
Key: 'captures/orphan-2/image.enc',
LastModified: new Date(Date.now() - 2000 * 1000),
},
],
NextContinuationToken: undefined,
})
.mockResolvedValueOnce({}); // Delete orphan 2
captureEventRepo.count
.mockResolvedValueOnce(0) // orphan 1
.mockResolvedValueOnce(0); // orphan 2
const count = await service.gcOrphanS3Objects();
expect(count).toBe(2);
expect(mockS3Client.send).toHaveBeenCalledTimes(4); // 2 List + 2 Delete
});
});
// ─────────────────────────────────────────────────
// Lock Redis
// ─────────────────────────────────────────────────
describe('Lock Redis', () => {
let mockRedis: { set: jest.Mock; del: jest.Mock };
beforeEach(() => {
mockRedis = {
set: jest.fn(),
del: jest.fn().mockResolvedValue(1),
};
service.setRedisClient(mockRedis as unknown as Redis);
});
it('GIVEN lock acquis WHEN runCycle THEN execute les 4 phases', async () => {
mockRedis.set.mockResolvedValueOnce('OK');
captureEventRepo.find.mockResolvedValue([]);
captureEventRepo.count.mockResolvedValue(0);
await service.runCycle();
// Verifie que les 4 phases sont appelees (find pour phase 1, 2, 3)
expect(captureEventRepo.find).toHaveBeenCalled();
expect(mockRedis.del).toHaveBeenCalled();
});
it('GIVEN lock non acquis WHEN runCycle THEN skip cycle', async () => {
mockRedis.set.mockResolvedValueOnce(null);
await service.runCycle();
expect(captureEventRepo.find).not.toHaveBeenCalled();
});
it('GIVEN pas de Redis (dev mode) WHEN runCycle THEN execute sans lock', async () => {
service.setRedisClient(null as unknown as Redis);
(service as any).redis = null;
captureEventRepo.find.mockResolvedValue([]);
captureEventRepo.count.mockResolvedValue(0);
await service.runCycle();
expect(captureEventRepo.find).toHaveBeenCalled();
});
});
// ─────────────────────────────────────────────────
// Cycle complet runCycle
// ─────────────────────────────────────────────────
describe('runCycle integration', () => {
it('GIVEN erreur dans une phase WHEN runCycle THEN catch + release lock', async () => {
const mockRedis = {
set: jest.fn().mockResolvedValue('OK'),
del: jest.fn().mockResolvedValue(1),
};
service.setRedisClient(mockRedis as unknown as Redis);
// Phase 1 throw
captureEventRepo.find.mockRejectedValueOnce(new Error('DB down'));
await service.runCycle();
// Lock doit etre libere malgre l'erreur
expect(mockRedis.del).toHaveBeenCalled();
});
});
});
7. Invariants couverts¶
| Invariant | Mecanisme | Observable |
|---|---|---|
| INV-103-11 (reconciliation) | Phase 1 : scan updated_at < cutoff sur etats non-terminaux, re-enqueue via BullMQ (STUB: PD-55) | Audit log RECONCILIATION_REENQUEUE avec capture_id et etat |
| INV-103-33 (GC S3 orphelins) | Phase 4 : ListObjectsV2 + comparaison upload_object_key en base + DeleteObject si orphelin > orphanTtl | Audit log S3_ORPHAN_DELETED avec objectKey et age |
| INV-103-35 (trigger SEAL_DELAYED) | Phase 2 : scan PENDING_SEAL avec created_at < NOW() - sealSla et seal_delayed=false | seal_delayed=true en base + audit log SEAL_DELAYED_TRIGGERED |
| INV-103-36 (clearing SEAL_DELAYED) | Phase 3 : evaluation conformite cycle, compteur persistant seal_delayed_conforming_cycles, clearing a 3 | seal_delayed=false + audit log SEAL_DELAYED_CLEARED avec compteur |
8. Hypotheses¶
| ID | Hypothese | Impact si faux |
|---|---|---|
| HM12-01 | Le bucket S3 utilise le prefix captures/ pour les objets de capture (confirme dans M15) | Modifier s3CapturePrefix dans la config |
| HM12-02 | La colonne upload_object_key dans capture_events correspond exactement a la cle S3 | Le GC ne pourra pas matcher les orphelins |
| HM12-03 | Redis est disponible en production pour le lock distribue | En single-instance dev, le lock est bypasse (cf. acquireLock()) |
| HM12-04 | BullMQ est configure pour le queue de scellement (STUB: PD-55) | Le re-enqueue est actuellement un no-op (touch updated_at seulement) |
| HM12-05 | Le cron s'execute sur une seule instance grace au lock Redis | Si Redis est down, le cron s'execute sur toutes les instances (acceptable car idempotent) |
9. Stubs¶
| Stub | Story destination | Description |
|---|---|---|
reenqueueCapture() | PD-55 | Re-enqueue vers le worker de scellement HSM. Actuellement : touch updated_at pour eviter re-detection immediate. |
| Notification "scellement en cours" | PD-41 | A SEAL_DELAYED, notification de retard vers l'utilisateur. Actuellement : audit log uniquement. |
10. Fichiers hors perimetre identifies¶
Aucun fichier hors perimetre ne necessite de modification. Le service est auto-contenu et utilise les entites et repositories deja fournis par M9/M13.
11. Decision trace¶
architectural_decisions:
- decision: "Cron @nestjs/schedule avec lock Redis SET NX EX"
rationale: "Pattern identique a SealReconciliationService (PD-80). Empeche double execution en multi-instance. Lock TTL = 2 * scan interval."
alternatives_considered:
- "BullMQ repeatable job"
- "Cron externe CloudWatch"
trade_offs: "Dependance Redis pour le lock. Acceptable car Redis deja utilise pour rate-limit."
- decision: "GC orphelins S3 dans le meme cron que reconciliation (pas de cron separe)"
rationale: "Decision D9 de la decomposition. Le scan S3 est pagine et ajoute ~1-5s par cycle. Evite un second cron a maintenir."
alternatives_considered:
- "Cron GC S3 dedie"
- "S3 lifecycle rules seules"
trade_offs: "Le cycle complet prend plus longtemps. Le TTL lock (20 min) couvre largement."