Aller au contenu

PD-21 — Plan d'Implémentation

Document : Plan technique détaillé Spécification de référence : PD-21-specification.md Statut : Draft (Révision 2 — Corrections compliance) Date : 2025-12-23


1. Découpage en Composants

1.1 Vue d'ensemble architecturale

┌─────────────────────────────────────────────────────────────────────┐
│                           API Layer (NestJS)                        │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────────┐  │
│  │ DocumentsModule │  │ BlockchainModule│  │   Autres Modules    │  │
│  └────────┬────────┘  └────────┬────────┘  └──────────┬──────────┘  │
│           │                    │                      │             │
│           ▼                    ▼                      ▼             │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │                      JobsModule (Orchestrateur)              │   │
│  │  ┌────────────────┐  ┌────────────────┐  ┌────────────────┐  │   │
│  │  │  JobsService   │  │ JobsController │  │ JobsRepository │  │   │
│  │  │  (Producer)    │  │   (API État)   │  │  (Traçabilité) │  │   │
│  │  └───────┬────────┘  └───────┬────────┘  └───────┬────────┘  │   │
│  └──────────┼───────────────────┼───────────────────┼───────────┘   │
│             │                   │                   │               │
└─────────────┼───────────────────┼───────────────────┼───────────────┘
              │                   │                   │
              ▼                   ▼                   ▼
┌─────────────────────────────────────────────────────────────────────┐
│                         Persistence Layer                           │
│  ┌─────────────────────────────┐  ┌─────────────────────────────┐   │
│  │     Redis (BullMQ Queue)    │  │   PostgreSQL (Job History)  │   │
│  │     - Jobs en attente       │  │   - job_history (entity)    │   │
│  │     - États transitoires    │  │   - États terminaux         │   │
│  │     - Locks d'exécution     │  │   - Audit trail complet     │   │
│  └─────────────────────────────┘  └─────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────┘
              ▲                                       ▲
              │                                       │
┌─────────────┼───────────────────────────────────────┼───────────────┐
│             │              Worker Layer             │               │
│  ┌──────────┴──────────────────────────────────────┐│               │
│  │                    JobsWorkerModule             ││               │
│  │  ┌────────────────┐  ┌────────────────┐  ┌─────┐││               │
│  │  │  WorkerService │  │ ProcessorBase  │  │Health│               │
│  │  │  (Consumer)    │  │ (Abstraction)  │  │Check││               │
│  │  └────────────────┘  └────────────────┘  └─────┘││               │
│  │  ┌────────────────┐  ┌────────────────┐         ││               │
│  │  │ DocumentProc.  │  │ BlockchainProc.│  ...    ││               │
│  │  └────────────────┘  └────────────────┘         ││               │
│  └─────────────────────────────────────────────────┘│               │
└─────────────────────────────────────────────────────┼───────────────┘
┌─────────────────────────────────────────────────────┼───────────────┐
│                    Prefect Orchestration Layer      │               │
│  ┌──────────────────────────────────────────────────┴─────────────┐ │
│  │                jobs_dlq_retry_flow.py (Schedulé)               │ │
│  │  ┌──────────────────┐  ┌──────────────────┐  ┌──────────────┐  │ │
│  │  │ get_failed_jobs  │  │  trigger_retry   │  │ emit_events  │  │ │
│  │  │     (task)       │  │     (task)       │  │   (task)     │  │ │
│  │  └────────┬─────────┘  └────────┬─────────┘  └──────────────┘  │ │
│  │           │ GET /jobs/failed    │ POST /jobs/:id/retry         │ │
│  │           │     ?retryable=true │                              │ │
│  │           ▼                     ▼                              │ │
│  │  ┌─────────────────────────────────────────────────────────┐   │ │
│  │  │              Backend API (NestJS)                       │   │ │
│  │  └─────────────────────────────────────────────────────────┘   │ │
│  └────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘

Architecture "Retry via Prefect" : Conforme au pattern existant audit_dlq_retry.py. Les jobs BullMQ n'ont jamais de retry implicite (attempts: 1). Les retries sont orchestrés explicitement par un flow Prefect schedulé.

1.2 Composants détaillés

A. JobsModule — Module principal (API side)

Fichier Responsabilité
jobs.module.ts Configuration BullMQ, imports, exports
jobs.service.ts API de soumission de jobs (Producer)
jobs.controller.ts Endpoints REST pour consultation d'état
jobs.repository.ts Persistance PostgreSQL des traces
entities/job-history.entity.ts Entité TypeORM pour historique
dto/submit-job.dto.ts DTO de soumission avec validation
dto/job-status.dto.ts DTO de réponse d'état
enums/job-state.enum.ts États du cycle de vie
enums/job-type.enum.ts Types de jobs supportés

B. JobsWorkerModule — Module Worker (Process séparé)

Fichier Responsabilité
worker.module.ts Bootstrap du worker process
worker.service.ts Orchestration des processors
processors/base.processor.ts Classe abstraite des processors
processors/document-archive.processor.ts Traitement archivage
processors/blockchain-anchor.processor.ts Traitement ancrage
health/worker-health.service.ts Health checks du worker

C. Infrastructure partagée

Fichier Responsabilité
constants/queue-names.ts Noms des queues (constantes)
interfaces/job-data.interface.ts Contrat des payloads
interfaces/job-result.interface.ts Contrat des résultats
utils/job-id-generator.ts Génération UUID v7

D. Prefect Flows — Orchestration (Python/Infra)

Emplacement : ProbatioVault-infra/prefect/flows/

D.1 Flow de Retry — jobs_dlq_retry.py
Composant Responsabilité
jobs_dlq_retry_flow Flow principal schedulé (cron */15 * * * *)
get_failed_jobs (task) Appel GET /jobs/failed?retryable=true
get_dlq_stats (task) Appel GET /jobs/stats pour métriques
trigger_job_retry (task) Appel POST /jobs/:id/retry par job éligible
evaluate_dlq_health (task) Seuils critiques, alertes Prefect Events
emit_retry_event (task) Émission événements pour monitoring
D.2 Flow de Purge — jobs_purge.py (NOUVEAU)

Objectif : Formaliser la politique de rétention I-5 avec purge automatique.

Composant Responsabilité
jobs_purge_flow Flow principal schedulé (cron 0 3 * * * — 3h du matin)
get_purgeable_jobs (task) Appel GET /jobs/purgeable?olderThan=30d
purge_db_jobs (task) Appel DELETE /jobs/purge?target=db&olderThan=30d
purge_redis_jobs (task) Appel DELETE /jobs/purge?target=redis&olderThan=7d
emit_purge_event (task) Émission événements avec compteurs
verify_retention (task) Vérification post-purge de conformité

Politique de rétention formalisée :

Cible Rétention Cron Action
PostgreSQL (job_history) 30 jours 0 3 * * * DELETE jobs terminaux > 30j
Redis (BullMQ) 7 jours 0 3 * * * Nettoyage completed/failed > 7j
D.3 Flow Health Check Redis — redis_health.py (NOUVEAU)

Objectif : Surveillance proactive de la sécurité et disponibilité Redis.

Composant Responsabilité
redis_health_flow Flow principal schedulé (cron */5 * * * * — toutes les 5 min)
check_redis_connection (task) Ping Redis, vérification latence
verify_tls_enabled (task) Vérification TLS actif (prod/staging)
verify_auth_required (task) Vérification AUTH obligatoire
check_memory_usage (task) Monitoring mémoire Redis (seuil 80%)
emit_health_event (task) Alerte si anomalie détectée

Alertes émises :

Condition Événement Prefect Sévérité
Connexion Redis échouée redis.health.connection_failed CRITICAL
TLS désactivé en prod redis.health.tls_disabled CRITICAL
AUTH désactivé en prod redis.health.auth_disabled CRITICAL
Mémoire > 80% redis.health.memory_warning WARNING
Latence > 100ms redis.health.latency_warning WARNING

E. Endpoints API pour Prefect (ajouts au Controller)

Endpoint Méthode Description
/jobs/failed GET Liste jobs FAILED avec filtres (retryable, job_type, since)
/jobs/:id/retry POST Resoumission d'un job failed (crée nouveau job avec parent_job_id)
/jobs/stats GET Statistiques agrégées (pending, running, failed, succeeded)
/jobs/purgeable GET Liste jobs terminaux éligibles à la purge (olderThan)
/jobs/purge DELETE Purge des jobs terminaux (target=db\ | redis, olderThan)
/health/redis GET Statut santé Redis (connexion, TLS, AUTH, mémoire)

1.3 Séparation Infrastructure PD-21 / Job Types Métier

Principe de séparation : PD-21 définit l'infrastructure générique de traitement des jobs. Les job types métier sont définis par les PDs spécifiques (PD-45, PD-48, PD-55, PD-72). Cette section documente les hypothèses de conception pour faciliter l'intégration future.

A. Périmètre PD-21 (Infrastructure Générique)

Composant Responsabilité Livrable
JobsModule Configuration BullMQ, connexion Redis sécurisée ✅ PD-21
JobsService API de soumission générique submit(type, data) ✅ PD-21
JobsController Endpoints REST : /jobs/:id, /jobs/failed, /jobs/stats ✅ PD-21
JobsRepository Persistance PostgreSQL, entity JobHistory ✅ PD-21
BaseProcessor Classe abstraite pour tous les processors ✅ PD-21
Queue DEFAULT Queue par défaut pour tests/développement ✅ PD-21
Prefect flows jobs_dlq_retry_flow.py, jobs_purge_flow.py, redis_health_flow.py ✅ PD-21

B. Registre des Queues (Extensible)

// constants/queue-names.ts — PD-21 Core
export const QUEUE_NAMES = {
  // Queue par défaut (tests, développement)
  DEFAULT: 'pv:jobs:default',
} as const;

// Les queues métier seront ajoutées par les PDs respectifs :
// - PD-45 : EXPORT
// - PD-48 : PURGE
// - PD-55 : BLOCKCHAIN
// - PD-72 : TRANSFER

C. Hypothèses pour Job Types Métier (Hors PD-21)

Ces définitions sont des hypothèses pour guider le développement futur. Chaque PD métier DOIT formaliser ses propres job types.

PD Queue proposée Job Types proposés Statut
PD-45 pv:jobs:export EXPORT_GLACIER, EXPORT_ZIP 🔮 Hypothèse
PD-48 pv:jobs:purge PURGE_LEGAL, PURGE_USER_REQUEST 🔮 Hypothèse
PD-55 pv:jobs:blockchain BLOCKCHAIN_ANCHOR, BLOCKCHAIN_VERIFY 🔮 Hypothèse
PD-72 pv:jobs:transfer PRE_TRANSFER_B2B2C, TRANSFER_COMPLETE 🔮 Hypothèse
pv:jobs:notification NOTIFICATION_EMAIL, NOTIFICATION_WEBHOOK 🔮 Hypothèse

D. Mécanisme d'extension (Pattern)

// Chaque PD métier étendra le registre via :
// 1. Ajout de sa queue dans QUEUE_NAMES
// 2. Ajout de ses job types dans JobType enum
// 3. Mapping dans JOB_TYPE_TO_QUEUE
// 4. Implémentation de son Processor héritant de BaseProcessor

// Exemple PD-55 (à implémenter par PD-55, pas PD-21)
// constants/queue-names.ts — Extension PD-55
export const QUEUE_NAMES = {
  ...QUEUE_NAMES_CORE,
  BLOCKCHAIN: 'pv:jobs:blockchain', // Ajouté par PD-55
} as const;

E. Validation stricte des Job Types

// jobs.service.ts — Mécanisme générique PD-21
async submit(dto: SubmitJobDto): Promise<JobHistory> {
  // Validation stricte : le job type DOIT exister dans le registre
  if (!Object.values(JobType).includes(dto.type)) {
    throw new BadRequestException(`Unknown job type: ${dto.type}`);
  }

  const queueName = JOB_TYPE_TO_QUEUE[dto.type];
  if (!queueName) {
    throw new BadRequestException(`No queue configured for job type: ${dto.type}`);
  }

  // CE-3 : La queue DOIT exister (vérifiée au boot)
  const queue = this.getQueue(queueName);
  if (!queue) {
    throw new InternalServerErrorException(`Queue not initialized: ${queueName}`);
  }

  // ... suite de la soumission
}

F. Contraintes d'intégrité

Contrainte Mécanisme Erreur si violation
Job type inconnu Validation enum JobType 400 Bad Request
Job type sans queue Lookup JOB_TYPE_TO_QUEUE 400 Bad Request
Queue non initialisée Check this.queues.has(queueName) 500 Internal Server Error
Queue fantôme Refus de création dynamique N/A (impossible)

Invariant : Aucune queue ne peut être créée dynamiquement au runtime. Toutes les queues sont déclarées dans queue-names.ts et initialisées au boot.


2. Flux Techniques

2.1 FN-1 — Soumission d'un job (Flux nominal)

┌────────┐     ┌─────────────┐     ┌─────────────┐     ┌───────┐     ┌──────────┐
│ Client │────▶│ Controller  │────▶│ JobsService │────▶│ Redis │     │ Response │
└────────┘     └─────────────┘     └─────────────┘     └───────┘     └──────────┘
    │                │                    │                │              ▲
    │   POST /jobs   │                    │                │              │
    │   {type,data}  │                    │                │              │
    │                │  validate(dto)     │                │              │
    │                │ ─────────────────▶ │                │              │
    │                │                    │ generateJobId()│              │
    │                │                    │ ─────────────▶ │              │
    │                │                    │                │              │
    │                │                    │ queue.add()    │              │
    │                │                    │ ───────────────▶              │
    │                │                    │                │ (async)      │
    │                │                    │ persistHistory │              │
    │                │                    │ ───────────────▶ PostgreSQL   │
    │                │                    │                │              │
    │                │                    │◀─── jobId ─────│              │
    │                │◀─────────────────  │                │              │
    │                │                    │                │              │
    │◀───────────────│ 202 Accepted       │                │              │
    │   {jobId,      │ {jobId: "..."}     │                │              │
    │    status}     │                    │                │              │

Séquence détaillée :

  1. Réception : Le controller reçoit POST /jobs avec le DTO validé
  2. Validation : class-validator vérifie le payload (type, data)
  3. ID Generation : UUID v7 (monotone, sortable) via job-id-generator.ts
  4. Persistence initiale : Insertion job_history avec état PENDING
  5. Enqueue : queue.add(jobId, data, options)non-bloquant
  6. Réponse : HTTP 202 Accepted avec jobId retourné immédiatement

2.2 FN-2 — Exécution nominale (Worker)

┌───────┐     ┌───────────────┐     ┌────────────┐     ┌──────────┐     ┌───────┐
│ Redis │────▶│ WorkerService │────▶│ Processor  │────▶│ Business │────▶│ Redis │
└───────┘     └───────────────┘     └────────────┘     └──────────┘     └───────┘
    │                │                    │                │              │
    │ BRPOPLPUSH     │                    │                │              │
    │ (atomic fetch) │                    │                │              │
    │ ──────────────▶│                    │                │              │
    │                │ lock acquired      │                │              │
    │                │ (Redis SETNX)      │                │              │
    │                │                    │                │              │
    │                │ updateState(RUN)   │                │              │
    │                │ ─────────────────▶ PostgreSQL       │              │
    │                │                    │                │              │
    │                │ process(job)       │                │              │
    │                │ ──────────────────▶│                │              │
    │                │                    │ execute()      │              │
    │                │                    │ ───────────────▶              │
    │                │                    │                │              │
    │                │                    │◀─── result ────│              │
    │                │◀─────────────────  │                │              │
    │                │                    │                │              │
    │                │ updateState(SUCC)  │                │              │
    │                │ ─────────────────▶ PostgreSQL       │              │
    │                │                    │                │              │
    │◀───────────────│ LREM (ack)         │                │              │
    │                │                    │                │              │

Points clés :

  • Lock atomique : BullMQ utilise Lua scripts pour garantir l'unicité
  • Transition d'état : PENDING → RUNNING → SUCCEEDED tracée en PostgreSQL
  • Acknowledgement : Job retiré de Redis après succès uniquement

2.3 Flux de consultation d'état

GET /jobs/:jobId/status

┌────────┐     ┌─────────────┐     ┌─────────────┐     ┌────────────┐
│ Client │────▶│ Controller  │────▶│ Repository  │────▶│ PostgreSQL │
└────────┘     └─────────────┘     └─────────────┘     └────────────┘
    │                │                    │                  │
    │                │  findByJobId()     │                  │
    │                │ ─────────────────▶ │                  │
    │                │                    │ SELECT ...       │
    │                │                    │ ────────────────▶│
    │                │                    │◀──── row ────────│
    │                │◀─────────────────  │                  │
    │◀───────────────│ {state, history}   │                  │

2.4 Flux de retry explicite via Prefect

Principe I-3 : Aucun retry implicite dans BullMQ. Les retries sont orchestrés par un flow Prefect schedulé qui appelle l'API backend.

┌─────────────────────────────────────────────────────────────────────────────┐
│                         Prefect Worker (Schedulé cron)                      │
│  ┌───────────────────────────────────────────────────────────────────────┐  │
│  │                    jobs_dlq_retry_flow                                │  │
│  │                                                                       │  │
│  │  1. get_failed_jobs()                                                 │  │
│  │     GET /jobs/failed?retryable=true&limit=50                          │  │
│  │         │                                                             │  │
│  │         ▼                                                             │  │
│  │     ┌─────────────────────────────────────────────────────────────┐   │  │
│  │     │ Pour chaque job éligible (retry_count < max_retries):       │   │  │
│  │     │                                                             │   │  │
│  │     │   2. trigger_job_retry(job_id)                              │   │  │
│  │     │      POST /jobs/{job_id}/retry                              │   │  │
│  │     │          │                                                  │   │  │
│  │     │          ▼                                                  │   │  │
│  │     │      ┌───────────────────────────────────────────────────┐  │   │  │
│  │     │      │ Backend crée NOUVEAU job avec:                    │  │   │  │
│  │     │      │  - Nouveau job_id (UUID v7)                       │  │   │  │
│  │     │      │  - parent_job_id = job_id original                │  │   │  │
│  │     │      │  - retry_count = parent.retry_count + 1           │  │   │  │
│  │     │      │  - Même payload que le job original               │  │   │  │
│  │     │      │                                                   │  │   │  │
│  │     │      │ Job original marqué: state = RETRIED              │  │   │  │
│  │     │      └───────────────────────────────────────────────────┘  │   │  │
│  │     └─────────────────────────────────────────────────────────────┘   │  │
│  │                                                                       │  │
│  │  3. emit_retry_event()                                                │  │
│  │     → Prefect Events pour monitoring/alerting                         │  │
│  │                                                                       │  │
│  │  4. evaluate_dlq_health()                                             │  │
│  │     → Seuils critiques → alertes si dépassés                          │  │
│  └───────────────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────────┘

Cycle de vie d'un job avec retry :

                    ┌──────────────────────────────────────────────┐
                    │                                              │
     ┌──────────┐   │   ┌──────────┐   ┌───────────┐   ┌─────────┐ │
     │ PENDING  │───┼──▶│ RUNNING  │──▶│ SUCCEEDED │   │ RETRIED │◀┘
     └──────────┘   │   └──────────┘   └───────────┘   └─────────┘
                    │        │                              ▲
                    │        ▼                              │
                    │   ┌──────────┐                        │
                    │   │  FAILED  │────────────────────────┘
                    │   └──────────┘   (via Prefect flow,
                    │        │          si retry_count < max)
                    │        ▼
                    │   ┌──────────┐
                    │   │ ABANDONED│  (retry_count >= max_retries)
                    │   └──────────┘
                    └─ Nouveau job créé (parent_job_id = original)

Nouveaux états :

État Description
RETRIED Job original après création d'un retry (état terminal)
ABANDONED Job ayant atteint max_retries sans succès (état terminal)

2bis. Diagrammes Mermaid

2bis.1 Graphe de dépendances des composants

graph TD
    subgraph API["API Layer (NestJS)"]
        DM[DocumentsModule]
        BM[BlockchainModule]
        OM[Autres Modules]
    end

    subgraph JM["JobsModule (Orchestrateur)"]
        JS[JobsService<br/>Producer]
        JC[JobsController<br/>API État]
        JR[JobsRepository<br/>Traçabilité]
        JH[JobHistory Entity]
        SJ[SubmitJobDto]
        JSD[JobStatusDto]
        JSE[JobState Enum]
        JTE[JobType Enum]
    end

    subgraph Shared["Infrastructure partagée"]
        QN[queue-names.ts<br/>Constantes]
        JDI[job-data.interface.ts]
        JRI[job-result.interface.ts]
        JIG[job-id-generator.ts<br/>UUID v7]
    end

    subgraph WM["JobsWorkerModule (Process séparé)"]
        WS[WorkerService<br/>Consumer]
        BP[BaseProcessor<br/>Abstraction]
        DAP[DocumentArchiveProcessor]
        BAP[BlockchainAnchorProcessor]
        WH[WorkerHealthService]
    end

    subgraph Persistence["Persistence Layer"]
        Redis[(Redis<br/>BullMQ Queue)]
        PG[(PostgreSQL<br/>job_history)]
    end

    subgraph Prefect["Prefect Orchestration"]
        DLQ[jobs_dlq_retry_flow.py<br/>cron */15]
        PUR[jobs_purge_flow.py<br/>cron 0 3 * * *]
        RH[redis_health_flow.py<br/>cron */5]
    end

    DM --> JS
    BM --> JS
    OM --> JS

    JS --> QN
    JS --> JIG
    JS --> JTE
    JS --> JDI
    JS --> Redis
    JS --> JR

    JC --> JR
    JC --> JSD
    JC --> JSE

    JR --> JH
    JR --> PG

    WS --> Redis
    WS --> BP
    BP --> JDI
    BP --> JRI
    DAP --> BP
    BAP --> BP
    WS --> JR
    WH --> Redis

    DLQ -->|"GET /jobs/failed"| JC
    DLQ -->|"POST /jobs/:id/retry"| JC
    PUR -->|"DELETE /jobs/purge"| JC
    RH -->|"GET /health/redis"| JC

2bis.2 Diagramme de séquence — Soumission + Exécution + Retry Prefect

sequenceDiagram
    participant C as Client
    participant JC as JobsController
    participant JS as JobsService
    participant Redis as Redis (BullMQ)
    participant PG as PostgreSQL
    participant WS as WorkerService
    participant BP as BaseProcessor
    participant PF as Prefect Flow

    Note over C,PG: FN-1 — Soumission d'un job
    C->>JC: POST /jobs {type, data}
    JC->>JS: validate(dto)
    JS->>JS: generateJobId() — UUID v7
    JS->>PG: INSERT job_history (PENDING)
    JS->>Redis: queue.add(jobId, data, {attempts:1})
    JS-->>JC: jobId
    JC-->>C: 202 Accepted {jobId, status: PENDING}

    Note over Redis,PG: FN-2 — Exécution nominale (Worker)
    Redis->>WS: BRPOPLPUSH (atomic fetch)
    WS->>PG: updateState(RUNNING)
    WS->>BP: process(job)
    BP->>BP: execute() — logique métier
    BP-->>WS: result
    WS->>PG: updateState(SUCCEEDED)
    WS->>Redis: LREM (ack)

    Note over Redis,PF: Flux d'échec + Retry Prefect
    Redis->>WS: BRPOPLPUSH (atomic fetch)
    WS->>PG: updateState(RUNNING)
    WS->>BP: process(job)
    BP--xWS: throw Error
    WS->>PG: updateState(FAILED, errorMsg)
    WS->>Redis: moveToFailed

    Note over PF,PG: Retry orchestré par Prefect (cron */15)
    PF->>JC: GET /jobs/failed?retryable=true
    JC->>PG: SELECT failed, retry_count < max
    PG-->>JC: jobs[]
    JC-->>PF: jobs[]
    loop Pour chaque job éligible
        PF->>JC: POST /jobs/{id}/retry
        JC->>JS: createRetryJob(originalId)
        JS->>PG: UPDATE original → RETRIED
        JS->>PG: INSERT new job (parent_job_id, retry_count+1)
        JS->>Redis: queue.add(newJobId, data)
        JS-->>JC: newJobId
        JC-->>PF: 201 Created
    end
    PF->>PF: emit_retry_event()
    PF->>PF: evaluate_dlq_health()

2bis.3 Machine d'états — Cycle de vie d'un job

stateDiagram-v2
    [*] --> PENDING: queue.add()
    PENDING --> RUNNING: Worker fetch (BRPOPLPUSH)
    RUNNING --> SUCCEEDED: Exécution OK
    RUNNING --> FAILED: Exécution KO

    FAILED --> RETRIED: Prefect retry<br/>(retry_count < max)
    FAILED --> ABANDONED: retry_count >= max_retries

    RETRIED --> [*]: État terminal<br/>(nouveau job créé)
    SUCCEEDED --> [*]: État terminal
    ABANDONED --> [*]: État terminal

    note right of RETRIED
        Un nouveau job est créé
        avec parent_job_id = original
    end note

3. Mapping Invariants → Mécanismes

Invariant Mécanisme Technique Validation
I-1 Non-blocage queue.add() retourne une Promise résolue dès l'insertion Redis (pas d'attente worker) Test: soumission sans worker actif → réponse < 100ms
I-2 Exécution unique Lua script BullMQ moveToActive avec SETNX atomique Test: 2 workers concurrents sur même job → 1 seul RUNNING
I-3 Pas de retry implicite BullMQ attempts: 1 + Retries explicites via flow Prefect jobs_dlq_retry_flow Test: job échoué → reste FAILED jusqu'au prochain run Prefect
I-4 Traçabilité Entity job_history avec created_at, started_at, completed_at, failed_reason, retry_count, parent_job_id Test: job FAILEDfailed_reason non null
I-5 Persistance post-terminal removeOnComplete: false + politique de rétention formalisée avec job Prefect jobs_purge_flow Test: job SUCCEEDED → consultable après 24h, purgé après 30 jours
I-6 Sécurité Redis TLS obligatoire + AUTH + namespace isolation + health check continu Test: connexion sans TLS/AUTH → rejetée
I-7 Idempotence des effets métier Clé d'idempotence obligatoire dans payload + vérification avant effet irréversible Test: même job exécuté 2x → effet métier unique

I-7 — Idempotence des effets métier (NOUVEAU)

Contexte : L'invariant I-2 garantit qu'un job ne s'exécute qu'une fois au niveau BullMQ. Cependant, un retry via Prefect crée un nouveau job_id avec le même payload. Sans idempotence, l'effet métier pourrait être dupliqué (ex: double ancrage blockchain).

Obligation : Chaque processor métier DOIT implémenter une vérification d'idempotence basée sur une clé métier (ex: document_id, hash) avant toute action irréversible.

3.1 Détail des mécanismes critiques

I-2 : Garantie d'exécution unique

// Configuration BullMQ
const queueOptions: QueueOptions = {
  defaultJobOptions: {
    attempts: 1,           // I-3: pas de retry
    removeOnComplete: false, // I-5: persistance
    removeOnFail: false,     // I-5: persistance
  },
};

// Le lock est géré par BullMQ via Lua script atomique
// Extrait simplifié du mécanisme interne :
// EVALSHA moveToActive
//   1. BRPOPLPUSH waiting → active (atomique)
//   2. SETNX lock:jobId (TTL = lockDuration)
//   3. Si lock échoue → LPUSH back to waiting

I-3 : Retries explicites via Prefect

// Configuration BullMQ : JAMAIS de retry implicite
const queueOptions: QueueOptions = {
  defaultJobOptions: {
    attempts: 1,              // I-3: UN SEUL essai
    removeOnComplete: false,  // I-5: persistance
    removeOnFail: false,      // I-5: persistance pour retry Prefect
  },
};

// Les retries sont gérés par Prefect via l'API REST
// Endpoint POST /jobs/:id/retry
async retryJob(jobId: string): Promise<JobHistory> {
  const originalJob = await this.repository.findOne({ where: { job_id: jobId } });

  if (originalJob.state !== JobState.FAILED) {
    throw new BadRequestException('Only FAILED jobs can be retried');
  }

  if (originalJob.retry_count >= this.getMaxRetries(originalJob.job_type)) {
    // Marquer comme ABANDONED
    await this.repository.update(jobId, { state: JobState.ABANDONED });
    throw new BadRequestException('Max retries reached');
  }

  // Créer un NOUVEAU job (traçabilité complète)
  const newJob = await this.submit({
    type: originalJob.job_type,
    data: originalJob.payload,
    parentJobId: originalJob.job_id,
    retryCount: originalJob.retry_count + 1,
  });

  // Marquer l'original comme RETRIED
  await this.repository.update(jobId, {
    state: JobState.RETRIED,
    retry_child_id: newJob.job_id,
  });

  return newJob;
}

I-4 : Structure de traçabilité

@Entity('job_history')
export class JobHistory {
  @PrimaryColumn({ type: 'uuid' })
  job_id: string;

  @Column({ type: 'varchar', length: 50 })
  job_type: string;

  @Column({ type: 'varchar', length: 20 })
  state: JobState; // PENDING | RUNNING | SUCCEEDED | FAILED | CANCELLED | RETRIED | ABANDONED

  @Column({ type: 'jsonb', nullable: true })
  payload: Record<string, unknown>;

  @Column({ type: 'jsonb', nullable: true })
  result: Record<string, unknown>;

  @Column({ type: 'text', nullable: true })
  failed_reason: string;

  // === Champs Retry (Prefect) ===

  @Column({ type: 'int', default: 0 })
  retry_count: number; // Nombre de tentatives (0 = première exécution)

  @Column({ type: 'uuid', nullable: true })
  parent_job_id: string; // ID du job original (si retry)

  @Column({ type: 'uuid', nullable: true })
  retry_child_id: string; // ID du job retry créé (si état RETRIED)

  @Column({ type: 'boolean', default: true })
  retryable: boolean; // False pour jobs non-retryables (ex: one-shot)

  // === Timestamps ===

  @CreateDateColumn()
  created_at: Date; // Timestamp soumission

  @Column({ type: 'timestamp', nullable: true })
  started_at: Date; // Timestamp début exécution

  @Column({ type: 'timestamp', nullable: true })
  completed_at: Date; // Timestamp fin (succès ou échec)
}

// Enum des états
export enum JobState {
  PENDING = 'PENDING',       // En attente d'exécution
  RUNNING = 'RUNNING',       // En cours d'exécution
  SUCCEEDED = 'SUCCEEDED',   // Terminé avec succès
  FAILED = 'FAILED',         // Échec (éligible retry si retryable=true)
  CANCELLED = 'CANCELLED',   // Annulé manuellement
  RETRIED = 'RETRIED',       // Retry créé (état terminal)
  ABANDONED = 'ABANDONED',   // Max retries atteint (état terminal)
}

I-6 : Configuration sécurisée Redis + Health Check Continu

// Configuration via ConfigService
const redisConfig: RedisOptions = {
  host: config.get('REDIS_HOST'),
  port: config.get('REDIS_PORT'),
  password: config.get('REDIS_PASSWORD'), // AUTH obligatoire
  tls: config.get('NODE_ENV') !== 'development'
    ? { rejectUnauthorized: true }
    : undefined,
  keyPrefix: 'pv:jobs:', // Isolation par namespace
  maxRetriesPerRequest: 3,
  enableReadyCheck: true,
};

// Health check continu via événements BullMQ
@Injectable()
export class RedisHealthService implements OnModuleInit {
  private readonly logger = new Logger(RedisHealthService.name);
  private isHealthy = true;

  onModuleInit() {
    // Écoute des événements de connexion Redis via BullMQ
    this.queue.on('error', (error) => {
      this.isHealthy = false;
      this.logger.error(`Redis connection error: ${error.message}`);
      this.emitHealthEvent('unhealthy', error.message);
    });

    this.queue.on('ready', () => {
      if (!this.isHealthy) {
        this.logger.log('Redis connection restored');
        this.emitHealthEvent('healthy');
      }
      this.isHealthy = true;
    });
  }

  async checkHealth(): Promise<HealthStatus> {
    try {
      await this.queue.client.ping();
      return { status: 'healthy', latency: Date.now() - start };
    } catch (error) {
      return { status: 'unhealthy', error: error.message };
    }
  }
}

Health check hybride : Événements BullMQ (réactif) + Flow Prefect redis_health_flow.py (proactif)

I-7 : Idempotence des effets métier

// Interface pour les payloads avec clé d'idempotence
export interface IdempotentJobPayload {
  idempotencyKey: string;  // Clé métier unique (ex: hash document, transaction ID)
}

// Vérification d'idempotence dans BaseProcessor
@Processor()
export abstract class BaseProcessor {
  abstract process(job: Job): Promise<JobResult>;

  /**
   * Vérifie si l'effet métier a déjà été produit.
   * DOIT être appelé AVANT toute action irréversible.
   */
  protected async checkIdempotency(
    idempotencyKey: string,
    effectType: string,
  ): Promise<IdempotencyCheck> {
    const existing = await this.idempotencyRepository.findOne({
      where: { key: idempotencyKey, effect_type: effectType },
    });

    if (existing) {
      return {
        alreadyProcessed: true,
        originalJobId: existing.job_id,
        processedAt: existing.created_at,
      };
    }

    return { alreadyProcessed: false };
  }

  /**
   * Enregistre l'effet métier après succès.
   * Doit être appelé dans une transaction avec l'effet.
   */
  protected async recordEffect(
    jobId: string,
    idempotencyKey: string,
    effectType: string,
  ): Promise<void> {
    await this.idempotencyRepository.insert({
      key: idempotencyKey,
      effect_type: effectType,
      job_id: jobId,
      created_at: new Date(),
    });
  }
}

// Exemple d'utilisation dans un processor métier
@Processor(QUEUE_NAMES.BLOCKCHAIN)
export class BlockchainAnchorProcessor extends BaseProcessor {
  async process(job: Job<BlockchainAnchorData>): Promise<JobResult> {
    // 1. Vérification d'idempotence AVANT l'ancrage
    const check = await this.checkIdempotency(
      job.data.documentHash,  // Clé métier = hash du document
      'BLOCKCHAIN_ANCHOR',
    );

    if (check.alreadyProcessed) {
      this.logger.warn(
        `Skip: document ${job.data.documentHash} already anchored by job ${check.originalJobId}`
      );
      return {
        status: 'SKIPPED',
        reason: 'already_anchored',
        originalJobId: check.originalJobId,
      };
    }

    // 2. Effectuer l'ancrage blockchain (effet irréversible)
    const txHash = await this.blockchainService.anchor(job.data.documentHash);

    // 3. Enregistrer l'effet pour bloquer les futurs retries
    await this.recordEffect(job.id, job.data.documentHash, 'BLOCKCHAIN_ANCHOR');

    return { status: 'SUCCEEDED', txHash };
  }
}

Table d'idempotence : job_effects avec index unique sur (key, effect_type)

CREATE TABLE job_effects (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  key VARCHAR(255) NOT NULL,          -- Clé métier (hash, document_id, etc.)
  effect_type VARCHAR(50) NOT NULL,   -- Type d'effet (BLOCKCHAIN_ANCHOR, etc.)
  job_id UUID NOT NULL,               -- Job ayant produit l'effet
  created_at TIMESTAMP NOT NULL,
  UNIQUE(key, effect_type)            -- Garantie d'unicité
);

4. Gestion des Erreurs

4.1 Matrice des erreurs

Code Cas d'erreur Comportement Réponse HTTP Action corrective
CE-1 Échec exécution job État → FAILED, failed_reason renseigné N/A (async) Consultation via /jobs/:id
CE-2 Redis indisponible (soumission) Exception levée, transaction rollback 503 Service Unavailable Retry client (exponential backoff)
CE-3 Queue inexistante Validation échoue au boot 500 (startup) Fix configuration, redéploiement
ERR-4 Job ID invalide (consultation) 404 Not Found 404 Client vérifie l'ID
ERR-5 Timeout exécution worker Job marqué FAILED (stalled) N/A Alerte monitoring
ERR-6 Payload invalide Validation DTO échoue 400 Bad Request Client corrige payload

4.2 Circuit de traitement des erreurs

// Dans le processor
@Processor(QUEUE_NAMES.DOCUMENT_ARCHIVE)
export class DocumentArchiveProcessor extends BaseProcessor {
  async process(job: Job<DocumentArchiveData>): Promise<JobResult> {
    const startedAt = new Date();

    try {
      // Mise à jour état RUNNING
      await this.jobsRepository.updateState(job.id, JobState.RUNNING, { startedAt });

      // Exécution métier
      const result = await this.executeArchive(job.data);

      // Succès
      await this.jobsRepository.updateState(job.id, JobState.SUCCEEDED, {
        completedAt: new Date(),
        result,
      });

      return result;

    } catch (error) {
      // Échec explicite (I-2)
      await this.jobsRepository.updateState(job.id, JobState.FAILED, {
        completedAt: new Date(),
        failedReason: this.formatError(error),
      });

      // Rethrow pour que BullMQ marque le job comme failed
      throw error;
    }
  }
}

4.3 Gestion des jobs "stalled"

// Configuration worker
const workerOptions: WorkerOptions = {
  lockDuration: 30000,      // 30s lock
  stalledInterval: 15000,   // Check toutes les 15s
  maxStalledCount: 1,       // 1 stall = FAILED (pas de retry implicite, I-3)
};

// Event handler
worker.on('stalled', async (jobId: string) => {
  await this.jobsRepository.updateState(jobId, JobState.FAILED, {
    completedAt: new Date(),
    failedReason: 'Job stalled: worker timeout or crash',
  });
  this.logger.error(`Job ${jobId} stalled - marked as FAILED`);
});

5. Impacts Sécurité

5.1 Surface d'attaque

Vecteur Risque Mitigation
Redis non authentifié Exécution de commandes arbitraires AUTH obligatoire (requirepass)
Redis sans TLS Interception credentials/payloads TLS 1.2+ obligatoire en prod
Injection dans payload Exécution de code malveillant Validation DTO stricte + sanitization
Déni de service (flood) Saturation queue Rate limiting + max queue size
Accès non autorisé aux états Fuite d'information AuthGuard sur /jobs/:id

5.2 Contrôles de sécurité

A. Authentification Redis

// Guard au démarrage
if (config.get('NODE_ENV') !== 'development') {
  if (!config.get('REDIS_PASSWORD')) {
    throw new Error('REDIS_PASSWORD is required in non-development environments');
  }
  if (!config.get('REDIS_TLS_ENABLED')) {
    throw new Error('REDIS_TLS_ENABLED must be true in non-development environments');
  }
}

B. Validation des payloads

// DTO avec validation stricte
export class SubmitJobDto {
  @IsEnum(JobType)
  type: JobType;

  @ValidateNested()
  @Type(() => JobPayloadDto)
  data: JobPayloadDto;

  @IsOptional()
  @IsUUID('4')
  correlationId?: string;
}

C. Isolation des accès

// Controller avec guards
@Controller('jobs')
@UseGuards(JwtAuthGuard)
export class JobsController {
  @Get(':jobId')
  @UseGuards(JobOwnershipGuard) // Vérifie que le user peut voir ce job
  async getStatus(@Param('jobId') jobId: string): Promise<JobStatusDto> {
    return this.jobsService.getStatus(jobId);
  }
}

5.3 Checklist sécurité

  • Redis AUTH configuré et testé
  • TLS Redis activé en staging/prod
  • Rate limiting sur endpoints de soumission
  • Validation DTO exhaustive
  • Guard d'ownership sur consultation
  • Logs de sécurité (tentatives non autorisées)
  • Secrets via Vault (pas en variables d'environnement directes)

6. Hypothèses Techniques

6.1 Hypothèses héritées de la spécification

Réf Hypothèse Impact implémentation
H-SPEC-1 Backend de queue dispose d'une persistance durable Redis configuré avec AOF (appendonly yes)
H-SPEC-2 Horodatages fournis par source fiable Utilisation de Date.now() serveur (NTP synchronisé)
H-SPEC-3 Identifiants de job globalement uniques UUID v7 (monotone, collision-free)

6.2 Hypothèses techniques additionnelles

Réf Hypothèse Justification
H-IMPL-1 Redis disponible et accessible depuis API et Workers Prérequis infrastructure
H-IMPL-2 PostgreSQL disponible pour persistance historique Base existante du projet
H-IMPL-3 Workers déployés comme processus séparés Isolation et scalabilité
H-IMPL-4 Un seul namespace Redis pour les jobs Simplification ; isolation par préfixe
H-IMPL-5 Les workers partagent la même version du code Déploiement coordonné
H-IMPL-6 Horloge système synchronisée (NTP) Cohérence des timestamps

6.3 Décisions de conception (Points à clarifier §10) — VALIDÉES

Point spec Décision Statut
10.1 Rétention post-terminal Formalisée : 30j DB, 7j Redis, purge automatique via Prefect ✅ Validé
10.2 Retries explicites Via Prefect flow jobs_dlq_retry_flow (cron */15 * * * *) ✅ Validé
10.3 Interface consultation REST API /jobs/:id + /jobs/failed + /jobs/stats + /jobs/purgeable ✅ Validé
10.4 Isolation Redis Redis dédié jobs (namespace pv:jobs:) + health check continu ✅ Validé
10.5 Limites de charge Max 1000 jobs pending/queue + rate limit 100/min ✅ Validé

6.4 Politique de rétention formalisée (I-5)

Correction Compliance : La rétention n'était pas formalisée avec un mécanisme de purge.

Composant Rétention Mécanisme de purge Cron
PostgreSQL (job_history) 30 jours DELETE /jobs/purge?target=db&olderThan=30d 0 3 * * *
Redis (BullMQ completed/failed) 7 jours DELETE /jobs/purge?target=redis&olderThan=7d 0 3 * * *
Table job_effects (idempotence) 90 jours Purge en cascade avec job_history 0 3 * * *

Garanties : - Aucun job terminal n'est supprimé avant sa période de rétention - La purge est tracée via Prefect Events (jobs.purge.completed) - Les jobs RUNNING ou PENDING ne sont jamais purgés - Une vérification post-purge confirme la conformité

6.5 Configuration des retries par type de job — VALIDÉE

Queue Job Type Max Retries Retryable Rationale
Archive DOCUMENT_ARCHIVE 3 true Échecs transitoires (stockage)
Blockchain BLOCKCHAIN_ANCHOR 5 true Dépendance externe blockchain
Blockchain BLOCKCHAIN_VERIFY 3 true Vérification peut échouer temporairement
Export EXPORT_GLACIER 5 true Transfert S3/Glacier long
Export EXPORT_ZIP 3 true Génération ZIP peut OOM
Purge PURGE_LEGAL 2 true Critique légalement, retry limité
Purge PURGE_USER_REQUEST 3 true Demande utilisateur
Transfer PRE_TRANSFER_B2B2C 3 true Préparation transfert coffre
Transfer TRANSFER_COMPLETE 0 false One-shot après succès PRE
Notification NOTIFICATION_EMAIL 2 true Échecs SMTP temporaires
Notification NOTIFICATION_WEBHOOK 3 true Endpoint distant indisponible

Configuration : Stockée en config/job-retry-policies.ts et consultable via GET /jobs/config

6.6 Règles de retry — VALIDÉES

Règle Décision Statut
Fréquence retry flow Toutes les 15 minutes (*/15 * * * *) ✅ Validé
Backoff exponentiel Non — retry immédiat au prochain run ✅ Validé
Exclusion retry auto Aucune — tous les jobs retryable=true sont éligibles ✅ Validé
Alerte ABANDONED Prefect Events simple ✅ Validé

TODO : Évolution future des alertes ABANDONED (email, Slack, escalade manuelle)


7. Points de Vigilance

7.1 Risques techniques

Risque Probabilité Impact Mitigation
R-1 Redis OOM Moyenne Critique Monitoring mémoire + maxmemory policy
R-2 Deadlock worker Faible Élevé Lock TTL + stalled job detection
R-3 Perte de jobs (Redis crash) Faible Critique AOF persistence + réplication
R-4 Drift d'horloge Faible Moyen NTP + monitoring drift
R-5 Incompatibilité payload Moyenne Moyen Versioning payloads + validation

7.2 Points d'attention implémentation

  1. Transaction atomique soumission : L'insertion PostgreSQL et l'enqueue Redis doivent être cohérents. Utiliser un pattern Outbox ou accepter une fenêtre d'incohérence temporaire (job en DB mais pas en queue = état PENDING perpétuel détectable).

  2. Ordre des opérations worker :

  3. Toujours mettre à jour PostgreSQL AVANT d'effectuer des actions métier irréversibles
  4. En cas de crash après action métier mais avant update DB : le job reste RUNNING → détecté comme stalled → marqué FAILED

  5. Idempotence des processors : Bien que I-2 garantisse une seule exécution, concevoir les processors comme idempotents permet de gérer les edge cases (crash après effet mais avant ack).

  6. Graceful shutdown workers : Implémenter un handler SIGTERM qui :

  7. Arrête d'accepter de nouveaux jobs
  8. Attend la fin des jobs en cours (timeout raisonnable)
  9. Marque les jobs non terminés comme FAILED avec raison explicite

  10. Monitoring essentiel :

  11. Queue depth (jobs pending)
  12. Latence traitement (p50, p95, p99)
  13. Taux d'échec par type de job
  14. Jobs stalled count
  15. Redis memory usage

7.3 Tests critiques à implémenter

Réf Test Couvre
T-1 Soumission retourne avant exécution I-1, CA-1
T-2 Deux workers sur même job → un seul exécute I-2, CA-2, ST-2
T-3 Job échoué → pas de retry automatique I-3
T-4 Job failed → cause lisible en DB I-4, CA-3, ST-3
T-5 Job completed → reste en DB 30 jours I-5, CA-4
T-6 Connexion Redis sans auth → rejetée I-6, CA-5
T-7 Soumission sans worker → job PENDING consultable ST-1

7.4 Dépendances et prérequis

┌─────────────────────────────────────────────────────────┐
│                    Prérequis Infra                      │
├─────────────────────────────────────────────────────────┤
│ ✓ Redis 7.x avec AOF enabled                            │
│ ✓ Redis AUTH + TLS (prod/staging)                       │
│ ✓ PostgreSQL (existant - ProbatioVault)                 │
│ ✓ NTP synchronisé sur tous les serveurs                 │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│                   Prérequis Code                        │
├─────────────────────────────────────────────────────────┤
│ ✓ bullmq ^5.1.0 (déjà installé)                         │
│ ✓ @nestjs/bull ^10.0.1 (déjà installé)                  │
│ ✓ ioredis ^5.3.2 (déjà installé)                        │
│ ○ uuid (pour génération v7 - à ajouter si nécessaire)   │
└─────────────────────────────────────────────────────────┘

8. Plan de Livraison

Phase 1 : Fondations (Core Module)

  1. Entity JobHistory + migration
  2. JobsRepository avec méthodes CRUD
  3. Enums JobState, JobType
  4. Configuration Redis sécurisée
  5. Tests unitaires repository

Phase 2 : Producer (API Side)

  1. JobsService avec méthode submit()
  2. JobsController avec endpoints état
  3. DTOs de validation
  4. Guards d'autorisation
  5. Tests d'intégration soumission

Phase 3 : Consumer (Worker Side)

  1. BaseProcessor abstrait
  2. Premier processor concret (ex: DocumentArchive)
  3. Gestion des événements (completed, failed, stalled)
  4. Tests d'intégration exécution
  5. Graceful shutdown

Phase 4 : Observabilité

  1. Métriques Prometheus (queue depth, latency)
  2. Logs structurés
  3. Alertes (stalled jobs, error rate)
  4. Dashboard monitoring

Phase 5 : Retries via Prefect

  1. Endpoints API pour Prefect :
  2. GET /jobs/failed?retryable=true
  3. POST /jobs/:id/retry
  4. GET /jobs/stats
  5. Flow Prefect jobs_dlq_retry.py :
  6. Tasks : get_failed_jobs, trigger_job_retry, emit_events
  7. Déploiement cron */15 * * * *
  8. Seuils d'alerte et Prefect Events
  9. Tests d'intégration retry flow
  10. Documentation opérationnelle

9. Tests Additionnels (Retry Prefect)

Réf Test Couvre
T-8 Job FAILED avec retryable=true → visible dans /jobs/failed Endpoint Prefect
T-9 POST /jobs/:id/retry crée nouveau job avec parent_job_id Chaînage retry
T-10 Job original passe à RETRIED après retry Transition d'état
T-11 retry_count >= max_retries → état ABANDONED Limite retries
T-12 Job avec retryable=false → non listé dans /jobs/failed?retryable=true Filtrage
T-13 Flow Prefect exécuté → jobs failed traités Intégration E2E

10. Tests Additionnels (Corrections Compliance)

10.1 Tests Idempotence (I-7)

Réf Test Couvre
T-14 Processor vérifie idempotence AVANT effet métier I-7 Pattern
T-15 Même payload soumis 2x → effet métier unique I-7 Garantie
T-16 Job retry avec même idempotencyKey → statut SKIPPED I-7 + Retry
T-17 Table job_effects a contrainte UNIQUE sur (key, effect_type) I-7 DB

10.2 Tests Purge (I-5 formalisé)

Réf Test Couvre
T-18 Job terminal > 30j → éligible purge DB Rétention DB
T-19 Job terminal < 30j → NON purgé Protection rétention
T-20 Job RUNNING ou PENDING → JAMAIS purgé Intégrité
T-21 DELETE /jobs/purge?target=db → compteur correct Endpoint
T-22 Flow Prefect jobs_purge_flow → purge effective E2E

10.3 Tests Health Check Redis (I-6 renforcé)

Réf Test Couvre
T-23 Redis déconnecté → événement unhealthy émis Détection
T-24 Redis reconnecté → événement healthy émis Récupération
T-25 TLS désactivé en prod → alerte CRITICAL Sécurité
T-26 AUTH désactivé en prod → alerte CRITICAL Sécurité
T-27 /health/redis retourne statut correct Endpoint
T-28 Flow Prefect redis_health_flow → vérifications OK E2E

11. Plan de Livraison Révisé

Phase 5 : Prefect Orchestration (MISE À JOUR)

  1. Flow Retry (jobs_dlq_retry.py) :
  2. Tasks : get_failed_jobs, trigger_job_retry, emit_events
  3. Déploiement cron */15 * * * *

  4. Flow Purge (jobs_purge.py) — NOUVEAU :

  5. Tasks : get_purgeable_jobs, purge_db_jobs, purge_redis_jobs, verify_retention
  6. Déploiement cron 0 3 * * *

  7. Flow Health Check (redis_health.py) — NOUVEAU :

  8. Tasks : check_redis_connection, verify_tls_enabled, verify_auth_required, check_memory_usage
  9. Déploiement cron */5 * * * *

  10. Endpoints API :

  11. GET /jobs/purgeable — NOUVEAU
  12. DELETE /jobs/purge — NOUVEAU
  13. GET /health/redis — NOUVEAU

  14. Tests d'intégration flows Prefect

Phase 6 : Idempotence (NOUVELLE)

  1. Table job_effects + migration
  2. Interface IdempotentJobPayload
  3. Méthodes checkIdempotency() / recordEffect() dans BaseProcessor
  4. Tests unitaires idempotence
  5. Documentation pattern pour processors métier

Fin du plan d'implémentation Toute déviation de ce plan doit être documentée et justifiée.