Aller au contenu

PD-3 — Plan d'implémentation


Navigation User Story | Document | | | ---------- | -- | | [Spécification](PD-3-specification.md) | | | **Plan d'implémentation** | *(ce document)* | | [Tests](PD-3-tests.md) | | | Retour d'expérience | *(à venir)* | [Retour à infrastructure-souveraine](../index.md) - [Index User Story](index.md)

1. Découpage en composants

Composant Responsabilité Repo
Redis Server Persistance des jobs, pub/sub pour workers infra
BullMQ Module Orchestration des queues, workers, retry backend
Queue Registry Enregistrement centralisé des files backend
Job Monitor Observabilité des états et métriques backend
Alert Service Signalement des anomalies backend/infra
Documentation Ops Procédures d'exploitation infra

2. Flux techniques

FN-01 : Traitement nominal d'une tâche

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  Producer   │────▶│    Redis    │────▶│   Worker    │────▶│  Completed  │
│ (API/Cron)  │     │   (Queue)   │     │  (BullMQ)   │     │   (State)   │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘
       │                   │                   │                   │
       │              LPUSH job           BRPOPLPUSH          SET state
       │              SET state:waiting   SET state:active    completed
       └───────────────────┴───────────────────┴───────────────────┘
                         Redis Persistence (AOF)
  1. Le producer enregistre un job dans la queue via queue.add()
  2. Redis persiste le job (AOF appendonly)
  3. Un worker prend le job via BRPOPLPUSH (atomique)
  4. L'état passe de waitingactive
  5. Le worker exécute la logique métier
  6. L'état passe à completed ou failed
  7. L'état final est persisté et consultable

FN-02 : Reprise après interruption

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Worker    │     │    Redis    │     │   Systemd   │
│  (crash)    │     │ (persisted) │     │  (restart)  │
└─────────────┘     └─────────────┘     └─────────────┘
       │                   │                   │
       X crash             │                   │
       │              Job in active list       │
       │                   │                   │
       │                   │            restart worker
       │                   │◀──────────────────┤
       │              stalled check            │
       │              move to waiting          │
       │                   │                   │
       └───────────────────┴───────────────────┘
  1. Un worker crash pendant le traitement
  2. Le job reste dans la liste active de Redis
  3. Systemd redémarre le worker automatiquement
  4. BullMQ détecte le job "stalled" (lockDuration expirée)
  5. Le job est automatiquement remis en waiting ou passé en failed

2bis. Diagrammes Mermaid

Graphe de dépendances des composants

graph TD
    subgraph infra["Infra (ProbatioVault-infra)"]
        REDIS["Redis Server<br/>AOF persistence, pub/sub"]
        SYSTEMD["Systemd<br/>Restart=always"]
        ALERTINFRA["Alert Infra<br/>Slack/email"]
    end

    subgraph backend["Backend (ProbatioVault-backend)"]
        BULLMQ["BullMQ Module<br/>jobs.module.ts"]
        REGISTRY["Queue Registry<br/>queue.registry.ts"]
        MONITOR["Job Monitor<br/>job-monitor.service.ts"]
        EVENTS["Job Events Listener<br/>job-events.listener.ts"]
        CONTROLLER["Jobs Controller<br/>jobs.controller.ts"]
        SERVICE["Jobs Service<br/>jobs.service.ts"]
        ALERT["Alert Service"]
    end

    BULLMQ -->|connection| REDIS
    BULLMQ -->|enregistre queues| REGISTRY
    SERVICE -->|orchestre| BULLMQ
    SERVICE -->|checkDataIntegrity| REDIS
    CONTROLLER -->|expose API| SERVICE
    MONITOR -->|observe états| BULLMQ
    EVENTS -->|on failed/stalled| BULLMQ
    EVENTS -->|signale anomalie| ALERT
    ALERT -->|notifie| ALERTINFRA
    SYSTEMD -->|restart workers| BULLMQ

Séquence FN-01 : Traitement nominal d'une tâche

sequenceDiagram
    participant P as Producer (API/Cron)
    participant S as Jobs Service
    participant R as Redis (Queue)
    participant W as Worker (BullMQ)
    participant E as Job Events Listener
    participant M as Job Monitor

    P->>S: queue.add(jobName, payload)
    S->>R: LPUSH job + SET state:waiting
    R-->>S: jobId

    W->>R: BRPOPLPUSH (atomique)
    R-->>W: job data
    R->>R: SET state:active
    E->>E: on('active') → log INFO

    W->>W: Exécution logique métier

    alt Succès
        W->>R: SET state:completed
        E->>E: on('completed') → log INFO
    else Échec
        W->>R: SET state:failed
        E->>E: on('failed') → log ERROR
        E->>E: Emit alerte via Alert Service
    end

    M->>R: GET job state (observabilité)
    M-->>M: Expose via API /jobs/:queue/:jobId/status

Séquence FN-02 : Reprise après interruption (stalled job recovery)

sequenceDiagram
    participant W as Worker (BullMQ)
    participant R as Redis (Queue)
    participant SD as Systemd
    participant B as BullMQ Stalled Check
    participant E as Job Events Listener

    W->>R: BRPOPLPUSH → state:active
    W->>W: Traitement en cours...
    W-xW: CRASH (kill -9, OOM)

    Note over R: Job reste en liste active<br/>lockDuration court

    SD->>W: Restart=always → relance worker

    B->>R: stalledInterval check
    B->>R: Détecte lock expirée
    R->>R: Move job: active → waiting
    E->>E: on('stalled') → log WARN

    W->>R: BRPOPLPUSH (re-traitement)
    W->>R: SET state:completed
    E->>E: on('completed') → log INFO

3. Mapping invariants → mécanismes

Invariant ID Exigence Mécanisme Composant Observable Risque
INV-01 Aucune tâche ne doit être perdue silencieusement Redis AOF (appendonly), BullMQ job persistence, removeOnComplete: false Redis, BullMQ LLEN queue:* retourne tous les jobs, logs d'ajout Perte si AOF désactivé
INV-02 Toute tâche doit avoir un état observable à tout moment BullMQ job states (waiting/active/completed/failed/delayed), Bull Board UI Job Monitor GET bull:queue:job:state, API /jobs/:id/status UI non déployée
INV-03 L'état des tâches doit être persistant face à une interruption Redis AOF persistence, BullMQ stalled job recovery, Systemd restart Redis, Systemd États restaurés après restart, jobs re-traités AOF ou Systemd mal configuré
INV-04 Toute anomalie doit produire un signal explicite Event listeners on('failed'), Prometheus metrics, alerting rules Alert Service Alertes Slack/email, bull_jobs_failed_total Alertes non configurées

4. Mapping critères d'acceptation → mécanismes

Critère ID Mécanisme(s) Composant Observable Risque
CA-01 Redis AOF persistence, job IDs uniques, audit log des créations Redis, BullMQ Count jobs créés = count jobs (completed + failed + active + waiting) Perte si AOF off
CA-02 BullMQ getJob(), Bull Board UI, API status endpoint Job Monitor GET /jobs/:id retourne état, UI accessible API non exposée
CA-03 Event listeners on('failed'), Prometheus metrics, alerting rules Alert Service Alerte reçue sur job failed Alerting non configuré
CA-04 Systemd Restart=always, BullMQ stalledInterval Redis, BullMQ Service UP après kill -9, jobs stalled re-traités Restart policy manquante
CA-05 README ops, runbooks incidents, procédures de recovery, protocole de validation Documentation Runbook exécuté par testeur non-expert en ≤15min, checklist validation signée Docs non validées

5. Mapping tests (TC-*) → mécanismes + observables

Test ID Référence spec Mécanisme(s) Point(s) d'observation Niveau de test visé
TC-NOM-01 INV-01, INV-02, CA-01, CA-02 BullMQ add/process, Redis persistence Job state = completed, state consultable via API Integration
TC-NOM-02 INV-03, CA-04 Systemd restart, BullMQ stalled recovery, Redis AOF Service UP après restart, job re-traité, états persistés E2E
TC-ERR-01 ERR-02, INV-04, CA-03 on('failed') event, alerting État = failed, alerte émise Integration
TC-ERR-02 ERR-01, INV-01, INV-03 Redis AOF, BullMQ persistence Tous jobs identifiables après kill -9 E2E
TC-ERR-03 ERR-03, INV-04 checkDataIntegrity(), état incomplete, log FORCE_MAJEURE, endpoint /jobs/health/integrity GET /jobs/health/integrity retourne { status: 'degraded', incompleteJobs }, log CRITICAL visible Integration
TC-INV-01 INV-01 Audit count jobs, no orphans Count créés = count finaux Integration
TC-INV-02 INV-02 BullMQ getJob(), Bull Board UI État consultable à tout moment Integration
TC-INV-03 INV-03 Redis restart, state preservation États inchangés après restart E2E
TC-INV-04 INV-04 on('failed') listener, alerting Signal explicite sur anomalie Integration
TC-NR-01 INV-03 Redis restart, state preservation États inchangés après restart E2E
TC-NR-02 INV-02 Charge nominale, no missing states Tous états consultables sous charge Perf
TC-NEG-01 INV-01, INV-03 Interruptions répétées Aucun état incohérent, aucune perte E2E/Chaos
TC-NEG-02 INV-04 Échecs massifs Alerte globale visible Integration
TC-DOC-01 CA-05 Protocole validation runbooks, testeur non-expert, temps mesuré Fichier runbooks-validation.md signé, temps ≤ max, 0 questions Manual

6. Gestion des erreurs

Code Situation Traitement Observable
JOB_FAILED Exécution échouée Job marqué failed, event émis, retry si configuré Job state = failed, logs erreur
JOB_STALLED Worker crash pendant exécution Job re-queued ou failed après stalledInterval Job re-traité ou state = failed
REDIS_DOWN Redis indisponible Retry connection avec backoff, alerte Logs reconnection, alerte ops
QUEUE_FULL Trop de jobs en attente Alerte, throttling optionnel Métrique queue length, alerte
WORKER_TIMEOUT Job dépasse timeout Job failed avec timeout error Job state = failed, error = timeout
FORCE_MAJEURE Perte données Redis (crash sans AOF, corruption) Jobs marqués incomplete, alerte critique, rapport d'impact État incomplete sur jobs affectés, log FORCE_MAJEURE, alerte ops

Mécanisme ERR-03 : Force majeure (obligatoire)

En cas de perte de données exceptionnelle (crash Redis sans AOF, corruption), le système doit :

  1. Détecter l'incohérence : Au redémarrage, comparer le dernier checkpoint connu avec l'état Redis
  2. Marquer les jobs affectés : État incomplete avec metadata { reason: 'FORCE_MAJEURE', detectedAt, lastKnownState }
  3. Émettre un signal explicite : Log CRITICAL [JobsService] FORCE_MAJEURE: X jobs in incomplete state
  4. Alerter les opérateurs : Alerte dédiée avec liste des jobs impactés
// jobs.service.ts - Mécanisme ERR-03
async checkDataIntegrity(): Promise<ForceMajeureReport | null> {
  const lastCheckpoint = await this.getLastCheckpoint();
  const currentState = await this.getAllJobStates();

  const incompleteJobs = this.detectIncompleteJobs(lastCheckpoint, currentState);

  if (incompleteJobs.length > 0) {
    // Marquer les jobs comme incomplete
    for (const job of incompleteJobs) {
      await this.markAsIncomplete(job.id, 'FORCE_MAJEURE');
    }

    // Log explicite (ERR-03)
    this.logger.error(`FORCE_MAJEURE: ${incompleteJobs.length} jobs in incomplete state`, {
      jobIds: incompleteJobs.map(j => j.id),
      detectedAt: new Date().toISOString(),
    });

    // Alerte ops
    await this.alertService.sendCriticalAlert('FORCE_MAJEURE', incompleteJobs);

    return { incompleteJobs, detectedAt: new Date() };
  }
  return null;
}

Observable TC-ERR-03 : Endpoint GET /jobs/health/integrity retourne { status: 'degraded', incompleteJobs: [...] } si force majeure détectée.

Codes d'erreur applicatifs

export enum JobErrorCode {
  JOB_NOT_FOUND = 'JOB_NOT_FOUND',
  JOB_ALREADY_COMPLETED = 'JOB_ALREADY_COMPLETED',
  JOB_PROCESSING = 'JOB_PROCESSING',
  QUEUE_NOT_FOUND = 'QUEUE_NOT_FOUND',
  REDIS_CONNECTION_ERROR = 'REDIS_CONNECTION_ERROR',
  FORCE_MAJEURE = 'FORCE_MAJEURE', // ERR-03
}

export enum JobState {
  WAITING = 'waiting',
  ACTIVE = 'active',
  COMPLETED = 'completed',
  FAILED = 'failed',
  DELAYED = 'delayed',
  INCOMPLETE = 'incomplete', // ERR-03: Force majeure
}

7. Impacts sécurité

Risque Mitigation Observable
Données sensibles dans jobs Chiffrer payload si nécessaire, pas de secrets en clair Audit code review
Accès Redis non autorisé requirepass, bind localhost, TLS en prod Config Redis vérifiée
Injection via job data Validation stricte des payloads Tests de validation
DoS via flood de jobs Rate limiting sur création, alertes sur queue size Métriques queue length

Journalisation

  • Création de job : INFO [JobsService] Job created: {jobId, queue, timestamp}
  • Completion : INFO [Worker] Job completed: {jobId, duration}
  • Failure : ERROR [Worker] Job failed: {jobId, error, attempt}
  • Stalled : WARN [BullMQ] Job stalled: {jobId, queue}

8. Hypothèses techniques

ID Hypothèse Impact si faux
H-01 Redis est déployé avec AOF activé Perte de jobs possible au crash
H-02 Systemd gère le redémarrage des services Pas de recovery automatique
H-03 BullMQ est compatible avec la version Redis déployée Incompatibilités possibles
H-04 Les workers sont idempotents ou le retry est désactivé Duplications d'effets de bord
H-05 La volumétrie reste sous 10K jobs/heure Scaling nécessaire au-delà
H-06 Un seul Redis (pas de cluster) suffit pour la charge Migration cluster nécessaire

9. Points de vigilance (risques, dette, pièges)

Point Description Action recommandée
AOF désactivé Redis par défaut sans AOF = perte de données Vérifier appendonly yes
removeOnComplete: true Jobs supprimés après succès = pas d'audit Garder removeOnComplete: false ou durée
stalledInterval trop long Jobs bloqués longtemps Configurer 30s-60s
Pas de monitoring Jobs failed non détectés Déployer Bull Board + alertes
Workers sans graceful shutdown Jobs perdus au redémarrage Implémenter shutdown hooks
Retry infini Jobs en boucle de retry Configurer attempts max

Dette technique identifiée

  • Pas de métriques Prometheus → à ajouter post-MVP

Note : L'API /jobs/:id/status est obligatoire dès le MVP pour satisfaire CA-02 (observabilité). Bull Board est optionnel mais l'endpoint API est requis.


10. Hors périmètre

  • Logique métier des jobs : Implémentée dans chaque consumer (PD-21, etc.)
  • Choix hébergeur Redis : OVH retenu mais hors scope spec
  • Redis Cluster : Non requis pour volumétrie initiale
  • Scheduler avancé : Cron jobs simples, pas de scheduling complexe
  • Multi-tenant queues : Une instance Redis pour toute l'app

11. Implémentation technique

Phase 1 : Infrastructure Redis

Fichiers Terraform :

# infra/terraform/modules/redis/main.tf
resource "ovh_cloud_project_database" "redis" {
  service_name = var.ovh_project_id
  engine       = "redis"
  version      = "7.0"
  plan         = "essential"
  nodes {
    region = "GRA"
  }
}

Configuration Redis :

# Persistence
appendonly yes
appendfsync everysec

# Memory
maxmemory-policy noeviction

# Security
requirepass ${REDIS_PASSWORD}
bind 127.0.0.1

Phase 2 : Module BullMQ (Backend)

Structure :

src/modules/jobs/
├── jobs.module.ts           # Module principal
├── jobs.controller.ts       # API observabilité (CA-02)
├── jobs.service.ts          # Service d'orchestration
├── dto/
│   └── job-status.dto.ts    # DTO état job
├── queues/
│   ├── queue.registry.ts    # Registre des queues
│   └── queue.config.ts      # Configuration par défaut
├── processors/
│   └── base.processor.ts    # Classe de base workers
└── monitoring/
    ├── job-monitor.service.ts
    └── job-events.listener.ts

Configuration BullMQ :

// jobs.module.ts
@Module({
  imports: [
    BullModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: (config: ConfigService) => ({
        connection: {
          host: config.get('REDIS_HOST'),
          port: config.get('REDIS_PORT'),
          password: config.get('REDIS_PASSWORD'),
        },
        defaultJobOptions: {
          removeOnComplete: { age: 86400, count: 1000 }, // 24h ou 1000 jobs
          removeOnFail: { age: 604800 }, // 7 jours
          attempts: 3,
          backoff: {
            type: 'exponential',
            delay: 1000,
          },
        },
      }),
      inject: [ConfigService],
    }),
  ],
})
export class JobsModule {}

Phase 3 : Monitoring et Alertes

Event Listener :

@Injectable()
export class JobEventsListener {
  private readonly logger = new Logger(JobEventsListener.name);

  @OnQueueEvent('failed')
  onFailed(job: Job, error: Error) {
    this.logger.error(`Job ${job.id} failed: ${error.message}`, {
      jobId: job.id,
      queue: job.queueName,
      error: error.stack,
      attempt: job.attemptsMade,
    });
    // Emit alert via AlertService
  }

  @OnQueueEvent('stalled')
  onStalled(jobId: string) {
    this.logger.warn(`Job ${jobId} stalled`);
  }
}

API Endpoint d'observabilité (obligatoire pour CA-02) :

// jobs.controller.ts
@Controller('jobs')
export class JobsController {
  constructor(private readonly jobsService: JobsService) {}

  /**
   * Retourne l'état d'un job (CA-02: observabilité)
   * @returns { id, queue, state, progress, failedReason?, timestamps }
   */
  @Get(':queue/:jobId/status')
  async getJobStatus(
    @Param('queue') queue: string,
    @Param('jobId') jobId: string,
  ): Promise<JobStatusDto> {
    return this.jobsService.getJobStatus(queue, jobId);
  }

  /**
   * Liste les jobs d'une queue avec leur état
   */
  @Get(':queue')
  async listJobs(
    @Param('queue') queue: string,
    @Query('state') state?: JobState,
  ): Promise<JobStatusDto[]> {
    return this.jobsService.listJobs(queue, state);
  }
}

Phase 4 : Documentation Ops (CA-05 obligatoire)

Runbooks requis avec critères d'exécutabilité :

Runbook Objectif Temps max Prérequis testeur Critère de succès
redis-restart.md Redémarrage Redis 10 min Accès SSH, droits sudo Service UP, jobs préservés
queue-blocked.md Déblocage queue saturée 15 min Accès SSH Queue length < seuil
job-retry.md Relance manuelle d'un job 5 min Accès API/CLI Job re-traité avec succès
monitoring-setup.md Configuration Bull Board 15 min Docker installé UI accessible, jobs visibles

Structure obligatoire de chaque runbook :

# [Titre]
## Prérequis
- Liste des accès/outils nécessaires
## Diagnostic
- Commandes pour identifier le problème
## Résolution
- Étapes numérotées avec commandes copiables
## Vérification
- Commande(s) de validation du succès
## Rollback
- Procédure de retour arrière si échec

Protocole de validation TC-DOC-01 :

  1. Sélection testeur : Personne non-experte Redis/BullMQ (ex: dev frontend, PM)
  2. Environnement : Staging avec accès SSH configuré
  3. Exécution : Testeur suit le runbook sans aide extérieure
  4. Mesures :
  5. Temps d'exécution (chronomètre)
  6. Nombre de questions posées (doit être 0)
  7. Résultat final (succès/échec)
  8. Validation : Runbook validé si exécuté en ≤ temps max sans question
  9. Traçabilité : Checklist signée avec date, nom testeur, temps mesuré

Observable TC-DOC-01 : Fichier docs/validation/runbooks-validation.md contenant les résultats de validation signés


12. Checklist de validation

  • Redis déployé avec AOF enabled
  • BullMQ module configuré avec retry/backoff
  • Event listeners sur failed/stalled
  • API /jobs/:queue/:jobId/status opérationnelle (CA-02 obligatoire)
  • Endpoint /jobs/health/integrity opérationnel (ERR-03 obligatoire)
  • Mécanisme checkDataIntegrity() implémenté avec état incomplete
  • Systemd Restart=always pour tous services
  • Tests TC-NOM-01 et TC-NOM-02 passants
  • Tests TC-INV-02 (observabilité API) passants
  • Tests TC-ERR-03 (force majeure) passants
  • Alertes configurées sur failures
  • Runbooks rédigés avec structure obligatoire (CA-05)
  • Runbooks validés par testeur non-expert (TC-DOC-01)
  • Fichier runbooks-validation.md signé avec temps mesurés