PD-21 — Plan d'Implémentation¶
Document : Plan technique détaillé Spécification de référence : PD-21-specification.md Statut : Draft (Révision 2 — Corrections compliance) Date : 2025-12-23
1. Découpage en Composants¶
1.1 Vue d'ensemble architecturale¶
┌─────────────────────────────────────────────────────────────────────┐
│ API Layer (NestJS) │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────┐ │
│ │ DocumentsModule │ │ BlockchainModule│ │ Autres Modules │ │
│ └────────┬────────┘ └────────┬────────┘ └──────────┬──────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ JobsModule (Orchestrateur) │ │
│ │ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │ │
│ │ │ JobsService │ │ JobsController │ │ JobsRepository │ │ │
│ │ │ (Producer) │ │ (API État) │ │ (Traçabilité) │ │ │
│ │ └───────┬────────┘ └───────┬────────┘ └───────┬────────┘ │ │
│ └──────────┼───────────────────┼───────────────────┼───────────┘ │
│ │ │ │ │
└─────────────┼───────────────────┼───────────────────┼───────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ Persistence Layer │
│ ┌─────────────────────────────┐ ┌─────────────────────────────┐ │
│ │ Redis (BullMQ Queue) │ │ PostgreSQL (Job History) │ │
│ │ - Jobs en attente │ │ - job_history (entity) │ │
│ │ - États transitoires │ │ - États terminaux │ │
│ │ - Locks d'exécution │ │ - Audit trail complet │ │
│ └─────────────────────────────┘ └─────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
▲ ▲
│ │
┌─────────────┼───────────────────────────────────────┼───────────────┐
│ │ Worker Layer │ │
│ ┌──────────┴──────────────────────────────────────┐│ │
│ │ JobsWorkerModule ││ │
│ │ ┌────────────────┐ ┌────────────────┐ ┌─────┐││ │
│ │ │ WorkerService │ │ ProcessorBase │ │Health│ │
│ │ │ (Consumer) │ │ (Abstraction) │ │Check││ │
│ │ └────────────────┘ └────────────────┘ └─────┘││ │
│ │ ┌────────────────┐ ┌────────────────┐ ││ │
│ │ │ DocumentProc. │ │ BlockchainProc.│ ... ││ │
│ │ └────────────────┘ └────────────────┘ ││ │
│ └─────────────────────────────────────────────────┘│ │
└─────────────────────────────────────────────────────┼───────────────┘
│
┌─────────────────────────────────────────────────────┼───────────────┐
│ Prefect Orchestration Layer │ │
│ ┌──────────────────────────────────────────────────┴─────────────┐ │
│ │ jobs_dlq_retry_flow.py (Schedulé) │ │
│ │ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────┐ │ │
│ │ │ get_failed_jobs │ │ trigger_retry │ │ emit_events │ │ │
│ │ │ (task) │ │ (task) │ │ (task) │ │ │
│ │ └────────┬─────────┘ └────────┬─────────┘ └──────────────┘ │ │
│ │ │ GET /jobs/failed │ POST /jobs/:id/retry │ │
│ │ │ ?retryable=true │ │ │
│ │ ▼ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────────┐ │ │
│ │ │ Backend API (NestJS) │ │ │
│ │ └─────────────────────────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Architecture "Retry via Prefect" : Conforme au pattern existant
audit_dlq_retry.py. Les jobs BullMQ n'ont jamais de retry implicite (attempts: 1). Les retries sont orchestrés explicitement par un flow Prefect schedulé.
1.2 Composants détaillés¶
A. JobsModule — Module principal (API side)¶
| Fichier | Responsabilité |
|---|---|
jobs.module.ts | Configuration BullMQ, imports, exports |
jobs.service.ts | API de soumission de jobs (Producer) |
jobs.controller.ts | Endpoints REST pour consultation d'état |
jobs.repository.ts | Persistance PostgreSQL des traces |
entities/job-history.entity.ts | Entité TypeORM pour historique |
dto/submit-job.dto.ts | DTO de soumission avec validation |
dto/job-status.dto.ts | DTO de réponse d'état |
enums/job-state.enum.ts | États du cycle de vie |
enums/job-type.enum.ts | Types de jobs supportés |
B. JobsWorkerModule — Module Worker (Process séparé)¶
| Fichier | Responsabilité |
|---|---|
worker.module.ts | Bootstrap du worker process |
worker.service.ts | Orchestration des processors |
processors/base.processor.ts | Classe abstraite des processors |
processors/document-archive.processor.ts | Traitement archivage |
processors/blockchain-anchor.processor.ts | Traitement ancrage |
health/worker-health.service.ts | Health checks du worker |
C. Infrastructure partagée¶
| Fichier | Responsabilité |
|---|---|
constants/queue-names.ts | Noms des queues (constantes) |
interfaces/job-data.interface.ts | Contrat des payloads |
interfaces/job-result.interface.ts | Contrat des résultats |
utils/job-id-generator.ts | Génération UUID v7 |
D. Prefect Flows — Orchestration (Python/Infra)¶
Emplacement :
ProbatioVault-infra/prefect/flows/
D.1 Flow de Retry — jobs_dlq_retry.py¶
| Composant | Responsabilité |
|---|---|
jobs_dlq_retry_flow | Flow principal schedulé (cron */15 * * * *) |
get_failed_jobs (task) | Appel GET /jobs/failed?retryable=true |
get_dlq_stats (task) | Appel GET /jobs/stats pour métriques |
trigger_job_retry (task) | Appel POST /jobs/:id/retry par job éligible |
evaluate_dlq_health (task) | Seuils critiques, alertes Prefect Events |
emit_retry_event (task) | Émission événements pour monitoring |
D.2 Flow de Purge — jobs_purge.py (NOUVEAU)¶
Objectif : Formaliser la politique de rétention I-5 avec purge automatique.
| Composant | Responsabilité |
|---|---|
jobs_purge_flow | Flow principal schedulé (cron 0 3 * * * — 3h du matin) |
get_purgeable_jobs (task) | Appel GET /jobs/purgeable?olderThan=30d |
purge_db_jobs (task) | Appel DELETE /jobs/purge?target=db&olderThan=30d |
purge_redis_jobs (task) | Appel DELETE /jobs/purge?target=redis&olderThan=7d |
emit_purge_event (task) | Émission événements avec compteurs |
verify_retention (task) | Vérification post-purge de conformité |
Politique de rétention formalisée :
| Cible | Rétention | Cron | Action |
|---|---|---|---|
PostgreSQL (job_history) | 30 jours | 0 3 * * * | DELETE jobs terminaux > 30j |
| Redis (BullMQ) | 7 jours | 0 3 * * * | Nettoyage completed/failed > 7j |
D.3 Flow Health Check Redis — redis_health.py (NOUVEAU)¶
Objectif : Surveillance proactive de la sécurité et disponibilité Redis.
| Composant | Responsabilité |
|---|---|
redis_health_flow | Flow principal schedulé (cron */5 * * * * — toutes les 5 min) |
check_redis_connection (task) | Ping Redis, vérification latence |
verify_tls_enabled (task) | Vérification TLS actif (prod/staging) |
verify_auth_required (task) | Vérification AUTH obligatoire |
check_memory_usage (task) | Monitoring mémoire Redis (seuil 80%) |
emit_health_event (task) | Alerte si anomalie détectée |
Alertes émises :
| Condition | Événement Prefect | Sévérité |
|---|---|---|
| Connexion Redis échouée | redis.health.connection_failed | CRITICAL |
| TLS désactivé en prod | redis.health.tls_disabled | CRITICAL |
| AUTH désactivé en prod | redis.health.auth_disabled | CRITICAL |
| Mémoire > 80% | redis.health.memory_warning | WARNING |
| Latence > 100ms | redis.health.latency_warning | WARNING |
E. Endpoints API pour Prefect (ajouts au Controller)¶
| Endpoint | Méthode | Description |
|---|---|---|
/jobs/failed | GET | Liste jobs FAILED avec filtres (retryable, job_type, since) |
/jobs/:id/retry | POST | Resoumission d'un job failed (crée nouveau job avec parent_job_id) |
/jobs/stats | GET | Statistiques agrégées (pending, running, failed, succeeded) |
/jobs/purgeable | GET | Liste jobs terminaux éligibles à la purge (olderThan) |
/jobs/purge | DELETE | Purge des jobs terminaux (target=db\ | redis, olderThan) |
/health/redis | GET | Statut santé Redis (connexion, TLS, AUTH, mémoire) |
1.3 Séparation Infrastructure PD-21 / Job Types Métier¶
Principe de séparation : PD-21 définit l'infrastructure générique de traitement des jobs. Les job types métier sont définis par les PDs spécifiques (PD-45, PD-48, PD-55, PD-72). Cette section documente les hypothèses de conception pour faciliter l'intégration future.
A. Périmètre PD-21 (Infrastructure Générique)¶
| Composant | Responsabilité | Livrable |
|---|---|---|
JobsModule | Configuration BullMQ, connexion Redis sécurisée | ✅ PD-21 |
JobsService | API de soumission générique submit(type, data) | ✅ PD-21 |
JobsController | Endpoints REST : /jobs/:id, /jobs/failed, /jobs/stats | ✅ PD-21 |
JobsRepository | Persistance PostgreSQL, entity JobHistory | ✅ PD-21 |
BaseProcessor | Classe abstraite pour tous les processors | ✅ PD-21 |
Queue DEFAULT | Queue par défaut pour tests/développement | ✅ PD-21 |
| Prefect flows | jobs_dlq_retry_flow.py, jobs_purge_flow.py, redis_health_flow.py | ✅ PD-21 |
B. Registre des Queues (Extensible)¶
// constants/queue-names.ts — PD-21 Core
export const QUEUE_NAMES = {
// Queue par défaut (tests, développement)
DEFAULT: 'pv:jobs:default',
} as const;
// Les queues métier seront ajoutées par les PDs respectifs :
// - PD-45 : EXPORT
// - PD-48 : PURGE
// - PD-55 : BLOCKCHAIN
// - PD-72 : TRANSFER
C. Hypothèses pour Job Types Métier (Hors PD-21)¶
Ces définitions sont des hypothèses pour guider le développement futur. Chaque PD métier DOIT formaliser ses propres job types.
| PD | Queue proposée | Job Types proposés | Statut |
|---|---|---|---|
| PD-45 | pv:jobs:export | EXPORT_GLACIER, EXPORT_ZIP | 🔮 Hypothèse |
| PD-48 | pv:jobs:purge | PURGE_LEGAL, PURGE_USER_REQUEST | 🔮 Hypothèse |
| PD-55 | pv:jobs:blockchain | BLOCKCHAIN_ANCHOR, BLOCKCHAIN_VERIFY | 🔮 Hypothèse |
| PD-72 | pv:jobs:transfer | PRE_TRANSFER_B2B2C, TRANSFER_COMPLETE | 🔮 Hypothèse |
| — | pv:jobs:notification | NOTIFICATION_EMAIL, NOTIFICATION_WEBHOOK | 🔮 Hypothèse |
D. Mécanisme d'extension (Pattern)¶
// Chaque PD métier étendra le registre via :
// 1. Ajout de sa queue dans QUEUE_NAMES
// 2. Ajout de ses job types dans JobType enum
// 3. Mapping dans JOB_TYPE_TO_QUEUE
// 4. Implémentation de son Processor héritant de BaseProcessor
// Exemple PD-55 (à implémenter par PD-55, pas PD-21)
// constants/queue-names.ts — Extension PD-55
export const QUEUE_NAMES = {
...QUEUE_NAMES_CORE,
BLOCKCHAIN: 'pv:jobs:blockchain', // Ajouté par PD-55
} as const;
E. Validation stricte des Job Types¶
// jobs.service.ts — Mécanisme générique PD-21
async submit(dto: SubmitJobDto): Promise<JobHistory> {
// Validation stricte : le job type DOIT exister dans le registre
if (!Object.values(JobType).includes(dto.type)) {
throw new BadRequestException(`Unknown job type: ${dto.type}`);
}
const queueName = JOB_TYPE_TO_QUEUE[dto.type];
if (!queueName) {
throw new BadRequestException(`No queue configured for job type: ${dto.type}`);
}
// CE-3 : La queue DOIT exister (vérifiée au boot)
const queue = this.getQueue(queueName);
if (!queue) {
throw new InternalServerErrorException(`Queue not initialized: ${queueName}`);
}
// ... suite de la soumission
}
F. Contraintes d'intégrité¶
| Contrainte | Mécanisme | Erreur si violation |
|---|---|---|
| Job type inconnu | Validation enum JobType | 400 Bad Request |
| Job type sans queue | Lookup JOB_TYPE_TO_QUEUE | 400 Bad Request |
| Queue non initialisée | Check this.queues.has(queueName) | 500 Internal Server Error |
| Queue fantôme | Refus de création dynamique | N/A (impossible) |
Invariant : Aucune queue ne peut être créée dynamiquement au runtime. Toutes les queues sont déclarées dans
queue-names.tset initialisées au boot.
2. Flux Techniques¶
2.1 FN-1 — Soumission d'un job (Flux nominal)¶
┌────────┐ ┌─────────────┐ ┌─────────────┐ ┌───────┐ ┌──────────┐
│ Client │────▶│ Controller │────▶│ JobsService │────▶│ Redis │ │ Response │
└────────┘ └─────────────┘ └─────────────┘ └───────┘ └──────────┘
│ │ │ │ ▲
│ POST /jobs │ │ │ │
│ {type,data} │ │ │ │
│ │ validate(dto) │ │ │
│ │ ─────────────────▶ │ │ │
│ │ │ generateJobId()│ │
│ │ │ ─────────────▶ │ │
│ │ │ │ │
│ │ │ queue.add() │ │
│ │ │ ───────────────▶ │
│ │ │ │ (async) │
│ │ │ persistHistory │ │
│ │ │ ───────────────▶ PostgreSQL │
│ │ │ │ │
│ │ │◀─── jobId ─────│ │
│ │◀───────────────── │ │ │
│ │ │ │ │
│◀───────────────│ 202 Accepted │ │ │
│ {jobId, │ {jobId: "..."} │ │ │
│ status} │ │ │ │
Séquence détaillée :
- Réception : Le controller reçoit
POST /jobsavec le DTO validé - Validation :
class-validatorvérifie le payload (type, data) - ID Generation : UUID v7 (monotone, sortable) via
job-id-generator.ts - Persistence initiale : Insertion
job_historyavec étatPENDING - Enqueue :
queue.add(jobId, data, options)— non-bloquant - Réponse : HTTP 202 Accepted avec
jobIdretourné immédiatement
2.2 FN-2 — Exécution nominale (Worker)¶
┌───────┐ ┌───────────────┐ ┌────────────┐ ┌──────────┐ ┌───────┐
│ Redis │────▶│ WorkerService │────▶│ Processor │────▶│ Business │────▶│ Redis │
└───────┘ └───────────────┘ └────────────┘ └──────────┘ └───────┘
│ │ │ │ │
│ BRPOPLPUSH │ │ │ │
│ (atomic fetch) │ │ │ │
│ ──────────────▶│ │ │ │
│ │ lock acquired │ │ │
│ │ (Redis SETNX) │ │ │
│ │ │ │ │
│ │ updateState(RUN) │ │ │
│ │ ─────────────────▶ PostgreSQL │ │
│ │ │ │ │
│ │ process(job) │ │ │
│ │ ──────────────────▶│ │ │
│ │ │ execute() │ │
│ │ │ ───────────────▶ │
│ │ │ │ │
│ │ │◀─── result ────│ │
│ │◀───────────────── │ │ │
│ │ │ │ │
│ │ updateState(SUCC) │ │ │
│ │ ─────────────────▶ PostgreSQL │ │
│ │ │ │ │
│◀───────────────│ LREM (ack) │ │ │
│ │ │ │ │
Points clés :
- Lock atomique : BullMQ utilise Lua scripts pour garantir l'unicité
- Transition d'état :
PENDING → RUNNING → SUCCEEDEDtracée en PostgreSQL - Acknowledgement : Job retiré de Redis après succès uniquement
2.3 Flux de consultation d'état¶
GET /jobs/:jobId/status
┌────────┐ ┌─────────────┐ ┌─────────────┐ ┌────────────┐
│ Client │────▶│ Controller │────▶│ Repository │────▶│ PostgreSQL │
└────────┘ └─────────────┘ └─────────────┘ └────────────┘
│ │ │ │
│ │ findByJobId() │ │
│ │ ─────────────────▶ │ │
│ │ │ SELECT ... │
│ │ │ ────────────────▶│
│ │ │◀──── row ────────│
│ │◀───────────────── │ │
│◀───────────────│ {state, history} │ │
2.4 Flux de retry explicite via Prefect¶
Principe I-3 : Aucun retry implicite dans BullMQ. Les retries sont orchestrés par un flow Prefect schedulé qui appelle l'API backend.
┌─────────────────────────────────────────────────────────────────────────────┐
│ Prefect Worker (Schedulé cron) │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ jobs_dlq_retry_flow │ │
│ │ │ │
│ │ 1. get_failed_jobs() │ │
│ │ GET /jobs/failed?retryable=true&limit=50 │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ │ Pour chaque job éligible (retry_count < max_retries): │ │ │
│ │ │ │ │ │
│ │ │ 2. trigger_job_retry(job_id) │ │ │
│ │ │ POST /jobs/{job_id}/retry │ │ │
│ │ │ │ │ │ │
│ │ │ ▼ │ │ │
│ │ │ ┌───────────────────────────────────────────────────┐ │ │ │
│ │ │ │ Backend crée NOUVEAU job avec: │ │ │ │
│ │ │ │ - Nouveau job_id (UUID v7) │ │ │ │
│ │ │ │ - parent_job_id = job_id original │ │ │ │
│ │ │ │ - retry_count = parent.retry_count + 1 │ │ │ │
│ │ │ │ - Même payload que le job original │ │ │ │
│ │ │ │ │ │ │ │
│ │ │ │ Job original marqué: state = RETRIED │ │ │ │
│ │ │ └───────────────────────────────────────────────────┘ │ │ │
│ │ └─────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ 3. emit_retry_event() │ │
│ │ → Prefect Events pour monitoring/alerting │ │
│ │ │ │
│ │ 4. evaluate_dlq_health() │ │
│ │ → Seuils critiques → alertes si dépassés │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
Cycle de vie d'un job avec retry :
┌──────────────────────────────────────────────┐
│ │
┌──────────┐ │ ┌──────────┐ ┌───────────┐ ┌─────────┐ │
│ PENDING │───┼──▶│ RUNNING │──▶│ SUCCEEDED │ │ RETRIED │◀┘
└──────────┘ │ └──────────┘ └───────────┘ └─────────┘
│ │ ▲
│ ▼ │
│ ┌──────────┐ │
│ │ FAILED │────────────────────────┘
│ └──────────┘ (via Prefect flow,
│ │ si retry_count < max)
│ ▼
│ ┌──────────┐
│ │ ABANDONED│ (retry_count >= max_retries)
│ └──────────┘
│
└─ Nouveau job créé (parent_job_id = original)
Nouveaux états :
| État | Description |
|---|---|
RETRIED | Job original après création d'un retry (état terminal) |
ABANDONED | Job ayant atteint max_retries sans succès (état terminal) |
2bis. Diagrammes Mermaid¶
2bis.1 Graphe de dépendances des composants¶
graph TD
subgraph API["API Layer (NestJS)"]
DM[DocumentsModule]
BM[BlockchainModule]
OM[Autres Modules]
end
subgraph JM["JobsModule (Orchestrateur)"]
JS[JobsService<br/>Producer]
JC[JobsController<br/>API État]
JR[JobsRepository<br/>Traçabilité]
JH[JobHistory Entity]
SJ[SubmitJobDto]
JSD[JobStatusDto]
JSE[JobState Enum]
JTE[JobType Enum]
end
subgraph Shared["Infrastructure partagée"]
QN[queue-names.ts<br/>Constantes]
JDI[job-data.interface.ts]
JRI[job-result.interface.ts]
JIG[job-id-generator.ts<br/>UUID v7]
end
subgraph WM["JobsWorkerModule (Process séparé)"]
WS[WorkerService<br/>Consumer]
BP[BaseProcessor<br/>Abstraction]
DAP[DocumentArchiveProcessor]
BAP[BlockchainAnchorProcessor]
WH[WorkerHealthService]
end
subgraph Persistence["Persistence Layer"]
Redis[(Redis<br/>BullMQ Queue)]
PG[(PostgreSQL<br/>job_history)]
end
subgraph Prefect["Prefect Orchestration"]
DLQ[jobs_dlq_retry_flow.py<br/>cron */15]
PUR[jobs_purge_flow.py<br/>cron 0 3 * * *]
RH[redis_health_flow.py<br/>cron */5]
end
DM --> JS
BM --> JS
OM --> JS
JS --> QN
JS --> JIG
JS --> JTE
JS --> JDI
JS --> Redis
JS --> JR
JC --> JR
JC --> JSD
JC --> JSE
JR --> JH
JR --> PG
WS --> Redis
WS --> BP
BP --> JDI
BP --> JRI
DAP --> BP
BAP --> BP
WS --> JR
WH --> Redis
DLQ -->|"GET /jobs/failed"| JC
DLQ -->|"POST /jobs/:id/retry"| JC
PUR -->|"DELETE /jobs/purge"| JC
RH -->|"GET /health/redis"| JC 2bis.2 Diagramme de séquence — Soumission + Exécution + Retry Prefect¶
sequenceDiagram
participant C as Client
participant JC as JobsController
participant JS as JobsService
participant Redis as Redis (BullMQ)
participant PG as PostgreSQL
participant WS as WorkerService
participant BP as BaseProcessor
participant PF as Prefect Flow
Note over C,PG: FN-1 — Soumission d'un job
C->>JC: POST /jobs {type, data}
JC->>JS: validate(dto)
JS->>JS: generateJobId() — UUID v7
JS->>PG: INSERT job_history (PENDING)
JS->>Redis: queue.add(jobId, data, {attempts:1})
JS-->>JC: jobId
JC-->>C: 202 Accepted {jobId, status: PENDING}
Note over Redis,PG: FN-2 — Exécution nominale (Worker)
Redis->>WS: BRPOPLPUSH (atomic fetch)
WS->>PG: updateState(RUNNING)
WS->>BP: process(job)
BP->>BP: execute() — logique métier
BP-->>WS: result
WS->>PG: updateState(SUCCEEDED)
WS->>Redis: LREM (ack)
Note over Redis,PF: Flux d'échec + Retry Prefect
Redis->>WS: BRPOPLPUSH (atomic fetch)
WS->>PG: updateState(RUNNING)
WS->>BP: process(job)
BP--xWS: throw Error
WS->>PG: updateState(FAILED, errorMsg)
WS->>Redis: moveToFailed
Note over PF,PG: Retry orchestré par Prefect (cron */15)
PF->>JC: GET /jobs/failed?retryable=true
JC->>PG: SELECT failed, retry_count < max
PG-->>JC: jobs[]
JC-->>PF: jobs[]
loop Pour chaque job éligible
PF->>JC: POST /jobs/{id}/retry
JC->>JS: createRetryJob(originalId)
JS->>PG: UPDATE original → RETRIED
JS->>PG: INSERT new job (parent_job_id, retry_count+1)
JS->>Redis: queue.add(newJobId, data)
JS-->>JC: newJobId
JC-->>PF: 201 Created
end
PF->>PF: emit_retry_event()
PF->>PF: evaluate_dlq_health() 2bis.3 Machine d'états — Cycle de vie d'un job¶
stateDiagram-v2
[*] --> PENDING: queue.add()
PENDING --> RUNNING: Worker fetch (BRPOPLPUSH)
RUNNING --> SUCCEEDED: Exécution OK
RUNNING --> FAILED: Exécution KO
FAILED --> RETRIED: Prefect retry<br/>(retry_count < max)
FAILED --> ABANDONED: retry_count >= max_retries
RETRIED --> [*]: État terminal<br/>(nouveau job créé)
SUCCEEDED --> [*]: État terminal
ABANDONED --> [*]: État terminal
note right of RETRIED
Un nouveau job est créé
avec parent_job_id = original
end note 3. Mapping Invariants → Mécanismes¶
| Invariant | Mécanisme Technique | Validation |
|---|---|---|
| I-1 Non-blocage | queue.add() retourne une Promise résolue dès l'insertion Redis (pas d'attente worker) | Test: soumission sans worker actif → réponse < 100ms |
| I-2 Exécution unique | Lua script BullMQ moveToActive avec SETNX atomique | Test: 2 workers concurrents sur même job → 1 seul RUNNING |
| I-3 Pas de retry implicite | BullMQ attempts: 1 + Retries explicites via flow Prefect jobs_dlq_retry_flow | Test: job échoué → reste FAILED jusqu'au prochain run Prefect |
| I-4 Traçabilité | Entity job_history avec created_at, started_at, completed_at, failed_reason, retry_count, parent_job_id | Test: job FAILED → failed_reason non null |
| I-5 Persistance post-terminal | removeOnComplete: false + politique de rétention formalisée avec job Prefect jobs_purge_flow | Test: job SUCCEEDED → consultable après 24h, purgé après 30 jours |
| I-6 Sécurité Redis | TLS obligatoire + AUTH + namespace isolation + health check continu | Test: connexion sans TLS/AUTH → rejetée |
| I-7 Idempotence des effets métier | Clé d'idempotence obligatoire dans payload + vérification avant effet irréversible | Test: même job exécuté 2x → effet métier unique |
I-7 — Idempotence des effets métier (NOUVEAU)
Contexte : L'invariant I-2 garantit qu'un job ne s'exécute qu'une fois au niveau BullMQ. Cependant, un retry via Prefect crée un nouveau job_id avec le même payload. Sans idempotence, l'effet métier pourrait être dupliqué (ex: double ancrage blockchain).
Obligation : Chaque processor métier DOIT implémenter une vérification d'idempotence basée sur une clé métier (ex:
document_id,hash) avant toute action irréversible.
3.1 Détail des mécanismes critiques¶
I-2 : Garantie d'exécution unique¶
// Configuration BullMQ
const queueOptions: QueueOptions = {
defaultJobOptions: {
attempts: 1, // I-3: pas de retry
removeOnComplete: false, // I-5: persistance
removeOnFail: false, // I-5: persistance
},
};
// Le lock est géré par BullMQ via Lua script atomique
// Extrait simplifié du mécanisme interne :
// EVALSHA moveToActive
// 1. BRPOPLPUSH waiting → active (atomique)
// 2. SETNX lock:jobId (TTL = lockDuration)
// 3. Si lock échoue → LPUSH back to waiting
I-3 : Retries explicites via Prefect¶
// Configuration BullMQ : JAMAIS de retry implicite
const queueOptions: QueueOptions = {
defaultJobOptions: {
attempts: 1, // I-3: UN SEUL essai
removeOnComplete: false, // I-5: persistance
removeOnFail: false, // I-5: persistance pour retry Prefect
},
};
// Les retries sont gérés par Prefect via l'API REST
// Endpoint POST /jobs/:id/retry
async retryJob(jobId: string): Promise<JobHistory> {
const originalJob = await this.repository.findOne({ where: { job_id: jobId } });
if (originalJob.state !== JobState.FAILED) {
throw new BadRequestException('Only FAILED jobs can be retried');
}
if (originalJob.retry_count >= this.getMaxRetries(originalJob.job_type)) {
// Marquer comme ABANDONED
await this.repository.update(jobId, { state: JobState.ABANDONED });
throw new BadRequestException('Max retries reached');
}
// Créer un NOUVEAU job (traçabilité complète)
const newJob = await this.submit({
type: originalJob.job_type,
data: originalJob.payload,
parentJobId: originalJob.job_id,
retryCount: originalJob.retry_count + 1,
});
// Marquer l'original comme RETRIED
await this.repository.update(jobId, {
state: JobState.RETRIED,
retry_child_id: newJob.job_id,
});
return newJob;
}
I-4 : Structure de traçabilité¶
@Entity('job_history')
export class JobHistory {
@PrimaryColumn({ type: 'uuid' })
job_id: string;
@Column({ type: 'varchar', length: 50 })
job_type: string;
@Column({ type: 'varchar', length: 20 })
state: JobState; // PENDING | RUNNING | SUCCEEDED | FAILED | CANCELLED | RETRIED | ABANDONED
@Column({ type: 'jsonb', nullable: true })
payload: Record<string, unknown>;
@Column({ type: 'jsonb', nullable: true })
result: Record<string, unknown>;
@Column({ type: 'text', nullable: true })
failed_reason: string;
// === Champs Retry (Prefect) ===
@Column({ type: 'int', default: 0 })
retry_count: number; // Nombre de tentatives (0 = première exécution)
@Column({ type: 'uuid', nullable: true })
parent_job_id: string; // ID du job original (si retry)
@Column({ type: 'uuid', nullable: true })
retry_child_id: string; // ID du job retry créé (si état RETRIED)
@Column({ type: 'boolean', default: true })
retryable: boolean; // False pour jobs non-retryables (ex: one-shot)
// === Timestamps ===
@CreateDateColumn()
created_at: Date; // Timestamp soumission
@Column({ type: 'timestamp', nullable: true })
started_at: Date; // Timestamp début exécution
@Column({ type: 'timestamp', nullable: true })
completed_at: Date; // Timestamp fin (succès ou échec)
}
// Enum des états
export enum JobState {
PENDING = 'PENDING', // En attente d'exécution
RUNNING = 'RUNNING', // En cours d'exécution
SUCCEEDED = 'SUCCEEDED', // Terminé avec succès
FAILED = 'FAILED', // Échec (éligible retry si retryable=true)
CANCELLED = 'CANCELLED', // Annulé manuellement
RETRIED = 'RETRIED', // Retry créé (état terminal)
ABANDONED = 'ABANDONED', // Max retries atteint (état terminal)
}
I-6 : Configuration sécurisée Redis + Health Check Continu¶
// Configuration via ConfigService
const redisConfig: RedisOptions = {
host: config.get('REDIS_HOST'),
port: config.get('REDIS_PORT'),
password: config.get('REDIS_PASSWORD'), // AUTH obligatoire
tls: config.get('NODE_ENV') !== 'development'
? { rejectUnauthorized: true }
: undefined,
keyPrefix: 'pv:jobs:', // Isolation par namespace
maxRetriesPerRequest: 3,
enableReadyCheck: true,
};
// Health check continu via événements BullMQ
@Injectable()
export class RedisHealthService implements OnModuleInit {
private readonly logger = new Logger(RedisHealthService.name);
private isHealthy = true;
onModuleInit() {
// Écoute des événements de connexion Redis via BullMQ
this.queue.on('error', (error) => {
this.isHealthy = false;
this.logger.error(`Redis connection error: ${error.message}`);
this.emitHealthEvent('unhealthy', error.message);
});
this.queue.on('ready', () => {
if (!this.isHealthy) {
this.logger.log('Redis connection restored');
this.emitHealthEvent('healthy');
}
this.isHealthy = true;
});
}
async checkHealth(): Promise<HealthStatus> {
try {
await this.queue.client.ping();
return { status: 'healthy', latency: Date.now() - start };
} catch (error) {
return { status: 'unhealthy', error: error.message };
}
}
}
Health check hybride : Événements BullMQ (réactif) + Flow Prefect
redis_health_flow.py(proactif)
I-7 : Idempotence des effets métier¶
// Interface pour les payloads avec clé d'idempotence
export interface IdempotentJobPayload {
idempotencyKey: string; // Clé métier unique (ex: hash document, transaction ID)
}
// Vérification d'idempotence dans BaseProcessor
@Processor()
export abstract class BaseProcessor {
abstract process(job: Job): Promise<JobResult>;
/**
* Vérifie si l'effet métier a déjà été produit.
* DOIT être appelé AVANT toute action irréversible.
*/
protected async checkIdempotency(
idempotencyKey: string,
effectType: string,
): Promise<IdempotencyCheck> {
const existing = await this.idempotencyRepository.findOne({
where: { key: idempotencyKey, effect_type: effectType },
});
if (existing) {
return {
alreadyProcessed: true,
originalJobId: existing.job_id,
processedAt: existing.created_at,
};
}
return { alreadyProcessed: false };
}
/**
* Enregistre l'effet métier après succès.
* Doit être appelé dans une transaction avec l'effet.
*/
protected async recordEffect(
jobId: string,
idempotencyKey: string,
effectType: string,
): Promise<void> {
await this.idempotencyRepository.insert({
key: idempotencyKey,
effect_type: effectType,
job_id: jobId,
created_at: new Date(),
});
}
}
// Exemple d'utilisation dans un processor métier
@Processor(QUEUE_NAMES.BLOCKCHAIN)
export class BlockchainAnchorProcessor extends BaseProcessor {
async process(job: Job<BlockchainAnchorData>): Promise<JobResult> {
// 1. Vérification d'idempotence AVANT l'ancrage
const check = await this.checkIdempotency(
job.data.documentHash, // Clé métier = hash du document
'BLOCKCHAIN_ANCHOR',
);
if (check.alreadyProcessed) {
this.logger.warn(
`Skip: document ${job.data.documentHash} already anchored by job ${check.originalJobId}`
);
return {
status: 'SKIPPED',
reason: 'already_anchored',
originalJobId: check.originalJobId,
};
}
// 2. Effectuer l'ancrage blockchain (effet irréversible)
const txHash = await this.blockchainService.anchor(job.data.documentHash);
// 3. Enregistrer l'effet pour bloquer les futurs retries
await this.recordEffect(job.id, job.data.documentHash, 'BLOCKCHAIN_ANCHOR');
return { status: 'SUCCEEDED', txHash };
}
}
Table d'idempotence :
job_effectsavec index unique sur(key, effect_type)
CREATE TABLE job_effects (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
key VARCHAR(255) NOT NULL, -- Clé métier (hash, document_id, etc.)
effect_type VARCHAR(50) NOT NULL, -- Type d'effet (BLOCKCHAIN_ANCHOR, etc.)
job_id UUID NOT NULL, -- Job ayant produit l'effet
created_at TIMESTAMP NOT NULL,
UNIQUE(key, effect_type) -- Garantie d'unicité
);
4. Gestion des Erreurs¶
4.1 Matrice des erreurs¶
| Code | Cas d'erreur | Comportement | Réponse HTTP | Action corrective |
|---|---|---|---|---|
CE-1 | Échec exécution job | État → FAILED, failed_reason renseigné | N/A (async) | Consultation via /jobs/:id |
CE-2 | Redis indisponible (soumission) | Exception levée, transaction rollback | 503 Service Unavailable | Retry client (exponential backoff) |
CE-3 | Queue inexistante | Validation échoue au boot | 500 (startup) | Fix configuration, redéploiement |
ERR-4 | Job ID invalide (consultation) | 404 Not Found | 404 | Client vérifie l'ID |
ERR-5 | Timeout exécution worker | Job marqué FAILED (stalled) | N/A | Alerte monitoring |
ERR-6 | Payload invalide | Validation DTO échoue | 400 Bad Request | Client corrige payload |
4.2 Circuit de traitement des erreurs¶
// Dans le processor
@Processor(QUEUE_NAMES.DOCUMENT_ARCHIVE)
export class DocumentArchiveProcessor extends BaseProcessor {
async process(job: Job<DocumentArchiveData>): Promise<JobResult> {
const startedAt = new Date();
try {
// Mise à jour état RUNNING
await this.jobsRepository.updateState(job.id, JobState.RUNNING, { startedAt });
// Exécution métier
const result = await this.executeArchive(job.data);
// Succès
await this.jobsRepository.updateState(job.id, JobState.SUCCEEDED, {
completedAt: new Date(),
result,
});
return result;
} catch (error) {
// Échec explicite (I-2)
await this.jobsRepository.updateState(job.id, JobState.FAILED, {
completedAt: new Date(),
failedReason: this.formatError(error),
});
// Rethrow pour que BullMQ marque le job comme failed
throw error;
}
}
}
4.3 Gestion des jobs "stalled"¶
// Configuration worker
const workerOptions: WorkerOptions = {
lockDuration: 30000, // 30s lock
stalledInterval: 15000, // Check toutes les 15s
maxStalledCount: 1, // 1 stall = FAILED (pas de retry implicite, I-3)
};
// Event handler
worker.on('stalled', async (jobId: string) => {
await this.jobsRepository.updateState(jobId, JobState.FAILED, {
completedAt: new Date(),
failedReason: 'Job stalled: worker timeout or crash',
});
this.logger.error(`Job ${jobId} stalled - marked as FAILED`);
});
5. Impacts Sécurité¶
5.1 Surface d'attaque¶
| Vecteur | Risque | Mitigation |
|---|---|---|
| Redis non authentifié | Exécution de commandes arbitraires | AUTH obligatoire (requirepass) |
| Redis sans TLS | Interception credentials/payloads | TLS 1.2+ obligatoire en prod |
| Injection dans payload | Exécution de code malveillant | Validation DTO stricte + sanitization |
| Déni de service (flood) | Saturation queue | Rate limiting + max queue size |
| Accès non autorisé aux états | Fuite d'information | AuthGuard sur /jobs/:id |
5.2 Contrôles de sécurité¶
A. Authentification Redis¶
// Guard au démarrage
if (config.get('NODE_ENV') !== 'development') {
if (!config.get('REDIS_PASSWORD')) {
throw new Error('REDIS_PASSWORD is required in non-development environments');
}
if (!config.get('REDIS_TLS_ENABLED')) {
throw new Error('REDIS_TLS_ENABLED must be true in non-development environments');
}
}
B. Validation des payloads¶
// DTO avec validation stricte
export class SubmitJobDto {
@IsEnum(JobType)
type: JobType;
@ValidateNested()
@Type(() => JobPayloadDto)
data: JobPayloadDto;
@IsOptional()
@IsUUID('4')
correlationId?: string;
}
C. Isolation des accès¶
// Controller avec guards
@Controller('jobs')
@UseGuards(JwtAuthGuard)
export class JobsController {
@Get(':jobId')
@UseGuards(JobOwnershipGuard) // Vérifie que le user peut voir ce job
async getStatus(@Param('jobId') jobId: string): Promise<JobStatusDto> {
return this.jobsService.getStatus(jobId);
}
}
5.3 Checklist sécurité¶
- Redis AUTH configuré et testé
- TLS Redis activé en staging/prod
- Rate limiting sur endpoints de soumission
- Validation DTO exhaustive
- Guard d'ownership sur consultation
- Logs de sécurité (tentatives non autorisées)
- Secrets via Vault (pas en variables d'environnement directes)
6. Hypothèses Techniques¶
6.1 Hypothèses héritées de la spécification¶
| Réf | Hypothèse | Impact implémentation |
|---|---|---|
| H-SPEC-1 | Backend de queue dispose d'une persistance durable | Redis configuré avec AOF (appendonly yes) |
| H-SPEC-2 | Horodatages fournis par source fiable | Utilisation de Date.now() serveur (NTP synchronisé) |
| H-SPEC-3 | Identifiants de job globalement uniques | UUID v7 (monotone, collision-free) |
6.2 Hypothèses techniques additionnelles¶
| Réf | Hypothèse | Justification |
|---|---|---|
| H-IMPL-1 | Redis disponible et accessible depuis API et Workers | Prérequis infrastructure |
| H-IMPL-2 | PostgreSQL disponible pour persistance historique | Base existante du projet |
| H-IMPL-3 | Workers déployés comme processus séparés | Isolation et scalabilité |
| H-IMPL-4 | Un seul namespace Redis pour les jobs | Simplification ; isolation par préfixe |
| H-IMPL-5 | Les workers partagent la même version du code | Déploiement coordonné |
| H-IMPL-6 | Horloge système synchronisée (NTP) | Cohérence des timestamps |
6.3 Décisions de conception (Points à clarifier §10) — VALIDÉES¶
| Point spec | Décision | Statut |
|---|---|---|
| 10.1 Rétention post-terminal | Formalisée : 30j DB, 7j Redis, purge automatique via Prefect | ✅ Validé |
| 10.2 Retries explicites | Via Prefect flow jobs_dlq_retry_flow (cron */15 * * * *) | ✅ Validé |
| 10.3 Interface consultation | REST API /jobs/:id + /jobs/failed + /jobs/stats + /jobs/purgeable | ✅ Validé |
| 10.4 Isolation Redis | Redis dédié jobs (namespace pv:jobs:) + health check continu | ✅ Validé |
| 10.5 Limites de charge | Max 1000 jobs pending/queue + rate limit 100/min | ✅ Validé |
6.4 Politique de rétention formalisée (I-5)¶
Correction Compliance : La rétention n'était pas formalisée avec un mécanisme de purge.
| Composant | Rétention | Mécanisme de purge | Cron |
|---|---|---|---|
PostgreSQL (job_history) | 30 jours | DELETE /jobs/purge?target=db&olderThan=30d | 0 3 * * * |
| Redis (BullMQ completed/failed) | 7 jours | DELETE /jobs/purge?target=redis&olderThan=7d | 0 3 * * * |
Table job_effects (idempotence) | 90 jours | Purge en cascade avec job_history | 0 3 * * * |
Garanties : - Aucun job terminal n'est supprimé avant sa période de rétention - La purge est tracée via Prefect Events (jobs.purge.completed) - Les jobs RUNNING ou PENDING ne sont jamais purgés - Une vérification post-purge confirme la conformité
6.5 Configuration des retries par type de job — VALIDÉE¶
| Queue | Job Type | Max Retries | Retryable | Rationale |
|---|---|---|---|---|
| Archive | DOCUMENT_ARCHIVE | 3 | true | Échecs transitoires (stockage) |
| Blockchain | BLOCKCHAIN_ANCHOR | 5 | true | Dépendance externe blockchain |
| Blockchain | BLOCKCHAIN_VERIFY | 3 | true | Vérification peut échouer temporairement |
| Export | EXPORT_GLACIER | 5 | true | Transfert S3/Glacier long |
| Export | EXPORT_ZIP | 3 | true | Génération ZIP peut OOM |
| Purge | PURGE_LEGAL | 2 | true | Critique légalement, retry limité |
| Purge | PURGE_USER_REQUEST | 3 | true | Demande utilisateur |
| Transfer | PRE_TRANSFER_B2B2C | 3 | true | Préparation transfert coffre |
| Transfer | TRANSFER_COMPLETE | 0 | false | One-shot après succès PRE |
| Notification | NOTIFICATION_EMAIL | 2 | true | Échecs SMTP temporaires |
| Notification | NOTIFICATION_WEBHOOK | 3 | true | Endpoint distant indisponible |
Configuration : Stockée en
config/job-retry-policies.tset consultable viaGET /jobs/config
6.6 Règles de retry — VALIDÉES¶
| Règle | Décision | Statut |
|---|---|---|
| Fréquence retry flow | Toutes les 15 minutes (*/15 * * * *) | ✅ Validé |
| Backoff exponentiel | Non — retry immédiat au prochain run | ✅ Validé |
| Exclusion retry auto | Aucune — tous les jobs retryable=true sont éligibles | ✅ Validé |
| Alerte ABANDONED | Prefect Events simple | ✅ Validé |
TODO : Évolution future des alertes ABANDONED (email, Slack, escalade manuelle)
7. Points de Vigilance¶
7.1 Risques techniques¶
| Risque | Probabilité | Impact | Mitigation |
|---|---|---|---|
| R-1 Redis OOM | Moyenne | Critique | Monitoring mémoire + maxmemory policy |
| R-2 Deadlock worker | Faible | Élevé | Lock TTL + stalled job detection |
| R-3 Perte de jobs (Redis crash) | Faible | Critique | AOF persistence + réplication |
| R-4 Drift d'horloge | Faible | Moyen | NTP + monitoring drift |
| R-5 Incompatibilité payload | Moyenne | Moyen | Versioning payloads + validation |
7.2 Points d'attention implémentation¶
-
Transaction atomique soumission : L'insertion PostgreSQL et l'enqueue Redis doivent être cohérents. Utiliser un pattern Outbox ou accepter une fenêtre d'incohérence temporaire (job en DB mais pas en queue = état
PENDINGperpétuel détectable). -
Ordre des opérations worker :
- Toujours mettre à jour PostgreSQL AVANT d'effectuer des actions métier irréversibles
-
En cas de crash après action métier mais avant update DB : le job reste
RUNNING→ détecté comme stalled → marquéFAILED -
Idempotence des processors : Bien que I-2 garantisse une seule exécution, concevoir les processors comme idempotents permet de gérer les edge cases (crash après effet mais avant ack).
-
Graceful shutdown workers : Implémenter un handler
SIGTERMqui : - Arrête d'accepter de nouveaux jobs
- Attend la fin des jobs en cours (timeout raisonnable)
-
Marque les jobs non terminés comme
FAILEDavec raison explicite -
Monitoring essentiel :
- Queue depth (jobs pending)
- Latence traitement (p50, p95, p99)
- Taux d'échec par type de job
- Jobs stalled count
- Redis memory usage
7.3 Tests critiques à implémenter¶
| Réf | Test | Couvre |
|---|---|---|
| T-1 | Soumission retourne avant exécution | I-1, CA-1 |
| T-2 | Deux workers sur même job → un seul exécute | I-2, CA-2, ST-2 |
| T-3 | Job échoué → pas de retry automatique | I-3 |
| T-4 | Job failed → cause lisible en DB | I-4, CA-3, ST-3 |
| T-5 | Job completed → reste en DB 30 jours | I-5, CA-4 |
| T-6 | Connexion Redis sans auth → rejetée | I-6, CA-5 |
| T-7 | Soumission sans worker → job PENDING consultable | ST-1 |
7.4 Dépendances et prérequis¶
┌─────────────────────────────────────────────────────────┐
│ Prérequis Infra │
├─────────────────────────────────────────────────────────┤
│ ✓ Redis 7.x avec AOF enabled │
│ ✓ Redis AUTH + TLS (prod/staging) │
│ ✓ PostgreSQL (existant - ProbatioVault) │
│ ✓ NTP synchronisé sur tous les serveurs │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ Prérequis Code │
├─────────────────────────────────────────────────────────┤
│ ✓ bullmq ^5.1.0 (déjà installé) │
│ ✓ @nestjs/bull ^10.0.1 (déjà installé) │
│ ✓ ioredis ^5.3.2 (déjà installé) │
│ ○ uuid (pour génération v7 - à ajouter si nécessaire) │
└─────────────────────────────────────────────────────────┘
8. Plan de Livraison¶
Phase 1 : Fondations (Core Module)¶
- Entity
JobHistory+ migration JobsRepositoryavec méthodes CRUD- Enums
JobState,JobType - Configuration Redis sécurisée
- Tests unitaires repository
Phase 2 : Producer (API Side)¶
JobsServiceavec méthodesubmit()JobsControlleravec endpoints état- DTOs de validation
- Guards d'autorisation
- Tests d'intégration soumission
Phase 3 : Consumer (Worker Side)¶
BaseProcessorabstrait- Premier processor concret (ex: DocumentArchive)
- Gestion des événements (completed, failed, stalled)
- Tests d'intégration exécution
- Graceful shutdown
Phase 4 : Observabilité¶
- Métriques Prometheus (queue depth, latency)
- Logs structurés
- Alertes (stalled jobs, error rate)
- Dashboard monitoring
Phase 5 : Retries via Prefect¶
- Endpoints API pour Prefect :
GET /jobs/failed?retryable=truePOST /jobs/:id/retryGET /jobs/stats- Flow Prefect
jobs_dlq_retry.py: - Tasks :
get_failed_jobs,trigger_job_retry,emit_events - Déploiement cron
*/15 * * * * - Seuils d'alerte et Prefect Events
- Tests d'intégration retry flow
- Documentation opérationnelle
9. Tests Additionnels (Retry Prefect)¶
| Réf | Test | Couvre |
|---|---|---|
| T-8 | Job FAILED avec retryable=true → visible dans /jobs/failed | Endpoint Prefect |
| T-9 | POST /jobs/:id/retry crée nouveau job avec parent_job_id | Chaînage retry |
| T-10 | Job original passe à RETRIED après retry | Transition d'état |
| T-11 | retry_count >= max_retries → état ABANDONED | Limite retries |
| T-12 | Job avec retryable=false → non listé dans /jobs/failed?retryable=true | Filtrage |
| T-13 | Flow Prefect exécuté → jobs failed traités | Intégration E2E |
10. Tests Additionnels (Corrections Compliance)¶
10.1 Tests Idempotence (I-7)¶
| Réf | Test | Couvre |
|---|---|---|
| T-14 | Processor vérifie idempotence AVANT effet métier | I-7 Pattern |
| T-15 | Même payload soumis 2x → effet métier unique | I-7 Garantie |
| T-16 | Job retry avec même idempotencyKey → statut SKIPPED | I-7 + Retry |
| T-17 | Table job_effects a contrainte UNIQUE sur (key, effect_type) | I-7 DB |
10.2 Tests Purge (I-5 formalisé)¶
| Réf | Test | Couvre |
|---|---|---|
| T-18 | Job terminal > 30j → éligible purge DB | Rétention DB |
| T-19 | Job terminal < 30j → NON purgé | Protection rétention |
| T-20 | Job RUNNING ou PENDING → JAMAIS purgé | Intégrité |
| T-21 | DELETE /jobs/purge?target=db → compteur correct | Endpoint |
| T-22 | Flow Prefect jobs_purge_flow → purge effective | E2E |
10.3 Tests Health Check Redis (I-6 renforcé)¶
| Réf | Test | Couvre |
|---|---|---|
| T-23 | Redis déconnecté → événement unhealthy émis | Détection |
| T-24 | Redis reconnecté → événement healthy émis | Récupération |
| T-25 | TLS désactivé en prod → alerte CRITICAL | Sécurité |
| T-26 | AUTH désactivé en prod → alerte CRITICAL | Sécurité |
| T-27 | /health/redis retourne statut correct | Endpoint |
| T-28 | Flow Prefect redis_health_flow → vérifications OK | E2E |
11. Plan de Livraison Révisé¶
Phase 5 : Prefect Orchestration (MISE À JOUR)¶
- Flow Retry (
jobs_dlq_retry.py) : - Tasks :
get_failed_jobs,trigger_job_retry,emit_events -
Déploiement cron
*/15 * * * * -
Flow Purge (
jobs_purge.py) — NOUVEAU : - Tasks :
get_purgeable_jobs,purge_db_jobs,purge_redis_jobs,verify_retention -
Déploiement cron
0 3 * * * -
Flow Health Check (
redis_health.py) — NOUVEAU : - Tasks :
check_redis_connection,verify_tls_enabled,verify_auth_required,check_memory_usage -
Déploiement cron
*/5 * * * * -
Endpoints API :
GET /jobs/purgeable— NOUVEAUDELETE /jobs/purge— NOUVEAU-
GET /health/redis— NOUVEAU -
Tests d'intégration flows Prefect
Phase 6 : Idempotence (NOUVELLE)¶
- Table
job_effects+ migration - Interface
IdempotentJobPayload - Méthodes
checkIdempotency()/recordEffect()dansBaseProcessor - Tests unitaires idempotence
- Documentation pattern pour processors métier
Fin du plan d'implémentation Toute déviation de ce plan doit être documentée et justifiée.