PD-3 — Plan d'implémentation¶
Navigation User Story
| Document | | | ---------- | -- | | [Spécification](PD-3-specification.md) | | | **Plan d'implémentation** | *(ce document)* | | [Tests](PD-3-tests.md) | | | Retour d'expérience | *(à venir)* | [Retour à infrastructure-souveraine](../index.md) - [Index User Story](index.md)1. Découpage en composants¶
| Composant | Responsabilité | Repo |
|---|---|---|
| Redis Server | Persistance des jobs, pub/sub pour workers | infra |
| BullMQ Module | Orchestration des queues, workers, retry | backend |
| Queue Registry | Enregistrement centralisé des files | backend |
| Job Monitor | Observabilité des états et métriques | backend |
| Alert Service | Signalement des anomalies | backend/infra |
| Documentation Ops | Procédures d'exploitation | infra |
2. Flux techniques¶
FN-01 : Traitement nominal d'une tâche¶
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Producer │────▶│ Redis │────▶│ Worker │────▶│ Completed │
│ (API/Cron) │ │ (Queue) │ │ (BullMQ) │ │ (State) │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│ │ │ │
│ LPUSH job BRPOPLPUSH SET state
│ SET state:waiting SET state:active completed
└───────────────────┴───────────────────┴───────────────────┘
Redis Persistence (AOF)
- Le producer enregistre un job dans la queue via
queue.add() - Redis persiste le job (AOF appendonly)
- Un worker prend le job via BRPOPLPUSH (atomique)
- L'état passe de
waiting→active - Le worker exécute la logique métier
- L'état passe à
completedoufailed - L'état final est persisté et consultable
FN-02 : Reprise après interruption¶
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Worker │ │ Redis │ │ Systemd │
│ (crash) │ │ (persisted) │ │ (restart) │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
X crash │ │
│ Job in active list │
│ │ │
│ │ restart worker
│ │◀──────────────────┤
│ stalled check │
│ move to waiting │
│ │ │
└───────────────────┴───────────────────┘
- Un worker crash pendant le traitement
- Le job reste dans la liste
activede Redis - Systemd redémarre le worker automatiquement
- BullMQ détecte le job "stalled" (lockDuration expirée)
- Le job est automatiquement remis en
waitingou passé enfailed
2bis. Diagrammes Mermaid¶
Graphe de dépendances des composants¶
graph TD
subgraph infra["Infra (ProbatioVault-infra)"]
REDIS["Redis Server<br/>AOF persistence, pub/sub"]
SYSTEMD["Systemd<br/>Restart=always"]
ALERTINFRA["Alert Infra<br/>Slack/email"]
end
subgraph backend["Backend (ProbatioVault-backend)"]
BULLMQ["BullMQ Module<br/>jobs.module.ts"]
REGISTRY["Queue Registry<br/>queue.registry.ts"]
MONITOR["Job Monitor<br/>job-monitor.service.ts"]
EVENTS["Job Events Listener<br/>job-events.listener.ts"]
CONTROLLER["Jobs Controller<br/>jobs.controller.ts"]
SERVICE["Jobs Service<br/>jobs.service.ts"]
ALERT["Alert Service"]
end
BULLMQ -->|connection| REDIS
BULLMQ -->|enregistre queues| REGISTRY
SERVICE -->|orchestre| BULLMQ
SERVICE -->|checkDataIntegrity| REDIS
CONTROLLER -->|expose API| SERVICE
MONITOR -->|observe états| BULLMQ
EVENTS -->|on failed/stalled| BULLMQ
EVENTS -->|signale anomalie| ALERT
ALERT -->|notifie| ALERTINFRA
SYSTEMD -->|restart workers| BULLMQ Séquence FN-01 : Traitement nominal d'une tâche¶
sequenceDiagram
participant P as Producer (API/Cron)
participant S as Jobs Service
participant R as Redis (Queue)
participant W as Worker (BullMQ)
participant E as Job Events Listener
participant M as Job Monitor
P->>S: queue.add(jobName, payload)
S->>R: LPUSH job + SET state:waiting
R-->>S: jobId
W->>R: BRPOPLPUSH (atomique)
R-->>W: job data
R->>R: SET state:active
E->>E: on('active') → log INFO
W->>W: Exécution logique métier
alt Succès
W->>R: SET state:completed
E->>E: on('completed') → log INFO
else Échec
W->>R: SET state:failed
E->>E: on('failed') → log ERROR
E->>E: Emit alerte via Alert Service
end
M->>R: GET job state (observabilité)
M-->>M: Expose via API /jobs/:queue/:jobId/status Séquence FN-02 : Reprise après interruption (stalled job recovery)¶
sequenceDiagram
participant W as Worker (BullMQ)
participant R as Redis (Queue)
participant SD as Systemd
participant B as BullMQ Stalled Check
participant E as Job Events Listener
W->>R: BRPOPLPUSH → state:active
W->>W: Traitement en cours...
W-xW: CRASH (kill -9, OOM)
Note over R: Job reste en liste active<br/>lockDuration court
SD->>W: Restart=always → relance worker
B->>R: stalledInterval check
B->>R: Détecte lock expirée
R->>R: Move job: active → waiting
E->>E: on('stalled') → log WARN
W->>R: BRPOPLPUSH (re-traitement)
W->>R: SET state:completed
E->>E: on('completed') → log INFO 3. Mapping invariants → mécanismes¶
| Invariant ID | Exigence | Mécanisme | Composant | Observable | Risque |
|---|---|---|---|---|---|
| INV-01 | Aucune tâche ne doit être perdue silencieusement | Redis AOF (appendonly), BullMQ job persistence, removeOnComplete: false | Redis, BullMQ | LLEN queue:* retourne tous les jobs, logs d'ajout | Perte si AOF désactivé |
| INV-02 | Toute tâche doit avoir un état observable à tout moment | BullMQ job states (waiting/active/completed/failed/delayed), Bull Board UI | Job Monitor | GET bull:queue:job:state, API /jobs/:id/status | UI non déployée |
| INV-03 | L'état des tâches doit être persistant face à une interruption | Redis AOF persistence, BullMQ stalled job recovery, Systemd restart | Redis, Systemd | États restaurés après restart, jobs re-traités | AOF ou Systemd mal configuré |
| INV-04 | Toute anomalie doit produire un signal explicite | Event listeners on('failed'), Prometheus metrics, alerting rules | Alert Service | Alertes Slack/email, bull_jobs_failed_total | Alertes non configurées |
4. Mapping critères d'acceptation → mécanismes¶
| Critère ID | Mécanisme(s) | Composant | Observable | Risque |
|---|---|---|---|---|
| CA-01 | Redis AOF persistence, job IDs uniques, audit log des créations | Redis, BullMQ | Count jobs créés = count jobs (completed + failed + active + waiting) | Perte si AOF off |
| CA-02 | BullMQ getJob(), Bull Board UI, API status endpoint | Job Monitor | GET /jobs/:id retourne état, UI accessible | API non exposée |
| CA-03 | Event listeners on('failed'), Prometheus metrics, alerting rules | Alert Service | Alerte reçue sur job failed | Alerting non configuré |
| CA-04 | Systemd Restart=always, BullMQ stalledInterval | Redis, BullMQ | Service UP après kill -9, jobs stalled re-traités | Restart policy manquante |
| CA-05 | README ops, runbooks incidents, procédures de recovery, protocole de validation | Documentation | Runbook exécuté par testeur non-expert en ≤15min, checklist validation signée | Docs non validées |
5. Mapping tests (TC-*) → mécanismes + observables¶
| Test ID | Référence spec | Mécanisme(s) | Point(s) d'observation | Niveau de test visé |
|---|---|---|---|---|
| TC-NOM-01 | INV-01, INV-02, CA-01, CA-02 | BullMQ add/process, Redis persistence | Job state = completed, state consultable via API | Integration |
| TC-NOM-02 | INV-03, CA-04 | Systemd restart, BullMQ stalled recovery, Redis AOF | Service UP après restart, job re-traité, états persistés | E2E |
| TC-ERR-01 | ERR-02, INV-04, CA-03 | on('failed') event, alerting | État = failed, alerte émise | Integration |
| TC-ERR-02 | ERR-01, INV-01, INV-03 | Redis AOF, BullMQ persistence | Tous jobs identifiables après kill -9 | E2E |
| TC-ERR-03 | ERR-03, INV-04 | checkDataIntegrity(), état incomplete, log FORCE_MAJEURE, endpoint /jobs/health/integrity | GET /jobs/health/integrity retourne { status: 'degraded', incompleteJobs }, log CRITICAL visible | Integration |
| TC-INV-01 | INV-01 | Audit count jobs, no orphans | Count créés = count finaux | Integration |
| TC-INV-02 | INV-02 | BullMQ getJob(), Bull Board UI | État consultable à tout moment | Integration |
| TC-INV-03 | INV-03 | Redis restart, state preservation | États inchangés après restart | E2E |
| TC-INV-04 | INV-04 | on('failed') listener, alerting | Signal explicite sur anomalie | Integration |
| TC-NR-01 | INV-03 | Redis restart, state preservation | États inchangés après restart | E2E |
| TC-NR-02 | INV-02 | Charge nominale, no missing states | Tous états consultables sous charge | Perf |
| TC-NEG-01 | INV-01, INV-03 | Interruptions répétées | Aucun état incohérent, aucune perte | E2E/Chaos |
| TC-NEG-02 | INV-04 | Échecs massifs | Alerte globale visible | Integration |
| TC-DOC-01 | CA-05 | Protocole validation runbooks, testeur non-expert, temps mesuré | Fichier runbooks-validation.md signé, temps ≤ max, 0 questions | Manual |
6. Gestion des erreurs¶
| Code | Situation | Traitement | Observable |
|---|---|---|---|
JOB_FAILED | Exécution échouée | Job marqué failed, event émis, retry si configuré | Job state = failed, logs erreur |
JOB_STALLED | Worker crash pendant exécution | Job re-queued ou failed après stalledInterval | Job re-traité ou state = failed |
REDIS_DOWN | Redis indisponible | Retry connection avec backoff, alerte | Logs reconnection, alerte ops |
QUEUE_FULL | Trop de jobs en attente | Alerte, throttling optionnel | Métrique queue length, alerte |
WORKER_TIMEOUT | Job dépasse timeout | Job failed avec timeout error | Job state = failed, error = timeout |
FORCE_MAJEURE | Perte données Redis (crash sans AOF, corruption) | Jobs marqués incomplete, alerte critique, rapport d'impact | État incomplete sur jobs affectés, log FORCE_MAJEURE, alerte ops |
Mécanisme ERR-03 : Force majeure (obligatoire)¶
En cas de perte de données exceptionnelle (crash Redis sans AOF, corruption), le système doit :
- Détecter l'incohérence : Au redémarrage, comparer le dernier checkpoint connu avec l'état Redis
- Marquer les jobs affectés : État
incompleteavec metadata{ reason: 'FORCE_MAJEURE', detectedAt, lastKnownState } - Émettre un signal explicite : Log
CRITICAL [JobsService] FORCE_MAJEURE: X jobs in incomplete state - Alerter les opérateurs : Alerte dédiée avec liste des jobs impactés
// jobs.service.ts - Mécanisme ERR-03
async checkDataIntegrity(): Promise<ForceMajeureReport | null> {
const lastCheckpoint = await this.getLastCheckpoint();
const currentState = await this.getAllJobStates();
const incompleteJobs = this.detectIncompleteJobs(lastCheckpoint, currentState);
if (incompleteJobs.length > 0) {
// Marquer les jobs comme incomplete
for (const job of incompleteJobs) {
await this.markAsIncomplete(job.id, 'FORCE_MAJEURE');
}
// Log explicite (ERR-03)
this.logger.error(`FORCE_MAJEURE: ${incompleteJobs.length} jobs in incomplete state`, {
jobIds: incompleteJobs.map(j => j.id),
detectedAt: new Date().toISOString(),
});
// Alerte ops
await this.alertService.sendCriticalAlert('FORCE_MAJEURE', incompleteJobs);
return { incompleteJobs, detectedAt: new Date() };
}
return null;
}
Observable TC-ERR-03 : Endpoint GET /jobs/health/integrity retourne { status: 'degraded', incompleteJobs: [...] } si force majeure détectée.
Codes d'erreur applicatifs¶
export enum JobErrorCode {
JOB_NOT_FOUND = 'JOB_NOT_FOUND',
JOB_ALREADY_COMPLETED = 'JOB_ALREADY_COMPLETED',
JOB_PROCESSING = 'JOB_PROCESSING',
QUEUE_NOT_FOUND = 'QUEUE_NOT_FOUND',
REDIS_CONNECTION_ERROR = 'REDIS_CONNECTION_ERROR',
FORCE_MAJEURE = 'FORCE_MAJEURE', // ERR-03
}
export enum JobState {
WAITING = 'waiting',
ACTIVE = 'active',
COMPLETED = 'completed',
FAILED = 'failed',
DELAYED = 'delayed',
INCOMPLETE = 'incomplete', // ERR-03: Force majeure
}
7. Impacts sécurité¶
| Risque | Mitigation | Observable |
|---|---|---|
| Données sensibles dans jobs | Chiffrer payload si nécessaire, pas de secrets en clair | Audit code review |
| Accès Redis non autorisé | requirepass, bind localhost, TLS en prod | Config Redis vérifiée |
| Injection via job data | Validation stricte des payloads | Tests de validation |
| DoS via flood de jobs | Rate limiting sur création, alertes sur queue size | Métriques queue length |
Journalisation¶
- Création de job :
INFO [JobsService] Job created: {jobId, queue, timestamp} - Completion :
INFO [Worker] Job completed: {jobId, duration} - Failure :
ERROR [Worker] Job failed: {jobId, error, attempt} - Stalled :
WARN [BullMQ] Job stalled: {jobId, queue}
8. Hypothèses techniques¶
| ID | Hypothèse | Impact si faux |
|---|---|---|
| H-01 | Redis est déployé avec AOF activé | Perte de jobs possible au crash |
| H-02 | Systemd gère le redémarrage des services | Pas de recovery automatique |
| H-03 | BullMQ est compatible avec la version Redis déployée | Incompatibilités possibles |
| H-04 | Les workers sont idempotents ou le retry est désactivé | Duplications d'effets de bord |
| H-05 | La volumétrie reste sous 10K jobs/heure | Scaling nécessaire au-delà |
| H-06 | Un seul Redis (pas de cluster) suffit pour la charge | Migration cluster nécessaire |
9. Points de vigilance (risques, dette, pièges)¶
| Point | Description | Action recommandée |
|---|---|---|
| AOF désactivé | Redis par défaut sans AOF = perte de données | Vérifier appendonly yes |
| removeOnComplete: true | Jobs supprimés après succès = pas d'audit | Garder removeOnComplete: false ou durée |
| stalledInterval trop long | Jobs bloqués longtemps | Configurer 30s-60s |
| Pas de monitoring | Jobs failed non détectés | Déployer Bull Board + alertes |
| Workers sans graceful shutdown | Jobs perdus au redémarrage | Implémenter shutdown hooks |
| Retry infini | Jobs en boucle de retry | Configurer attempts max |
Dette technique identifiée¶
- Pas de métriques Prometheus → à ajouter post-MVP
Note : L'API /jobs/:id/status est obligatoire dès le MVP pour satisfaire CA-02 (observabilité). Bull Board est optionnel mais l'endpoint API est requis.
10. Hors périmètre¶
- Logique métier des jobs : Implémentée dans chaque consumer (PD-21, etc.)
- Choix hébergeur Redis : OVH retenu mais hors scope spec
- Redis Cluster : Non requis pour volumétrie initiale
- Scheduler avancé : Cron jobs simples, pas de scheduling complexe
- Multi-tenant queues : Une instance Redis pour toute l'app
11. Implémentation technique¶
Phase 1 : Infrastructure Redis¶
Fichiers Terraform :
# infra/terraform/modules/redis/main.tf
resource "ovh_cloud_project_database" "redis" {
service_name = var.ovh_project_id
engine = "redis"
version = "7.0"
plan = "essential"
nodes {
region = "GRA"
}
}
Configuration Redis :
# Persistence
appendonly yes
appendfsync everysec
# Memory
maxmemory-policy noeviction
# Security
requirepass ${REDIS_PASSWORD}
bind 127.0.0.1
Phase 2 : Module BullMQ (Backend)¶
Structure :
src/modules/jobs/
├── jobs.module.ts # Module principal
├── jobs.controller.ts # API observabilité (CA-02)
├── jobs.service.ts # Service d'orchestration
├── dto/
│ └── job-status.dto.ts # DTO état job
├── queues/
│ ├── queue.registry.ts # Registre des queues
│ └── queue.config.ts # Configuration par défaut
├── processors/
│ └── base.processor.ts # Classe de base workers
└── monitoring/
├── job-monitor.service.ts
└── job-events.listener.ts
Configuration BullMQ :
// jobs.module.ts
@Module({
imports: [
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: (config: ConfigService) => ({
connection: {
host: config.get('REDIS_HOST'),
port: config.get('REDIS_PORT'),
password: config.get('REDIS_PASSWORD'),
},
defaultJobOptions: {
removeOnComplete: { age: 86400, count: 1000 }, // 24h ou 1000 jobs
removeOnFail: { age: 604800 }, // 7 jours
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
},
}),
inject: [ConfigService],
}),
],
})
export class JobsModule {}
Phase 3 : Monitoring et Alertes¶
Event Listener :
@Injectable()
export class JobEventsListener {
private readonly logger = new Logger(JobEventsListener.name);
@OnQueueEvent('failed')
onFailed(job: Job, error: Error) {
this.logger.error(`Job ${job.id} failed: ${error.message}`, {
jobId: job.id,
queue: job.queueName,
error: error.stack,
attempt: job.attemptsMade,
});
// Emit alert via AlertService
}
@OnQueueEvent('stalled')
onStalled(jobId: string) {
this.logger.warn(`Job ${jobId} stalled`);
}
}
API Endpoint d'observabilité (obligatoire pour CA-02) :
// jobs.controller.ts
@Controller('jobs')
export class JobsController {
constructor(private readonly jobsService: JobsService) {}
/**
* Retourne l'état d'un job (CA-02: observabilité)
* @returns { id, queue, state, progress, failedReason?, timestamps }
*/
@Get(':queue/:jobId/status')
async getJobStatus(
@Param('queue') queue: string,
@Param('jobId') jobId: string,
): Promise<JobStatusDto> {
return this.jobsService.getJobStatus(queue, jobId);
}
/**
* Liste les jobs d'une queue avec leur état
*/
@Get(':queue')
async listJobs(
@Param('queue') queue: string,
@Query('state') state?: JobState,
): Promise<JobStatusDto[]> {
return this.jobsService.listJobs(queue, state);
}
}
Phase 4 : Documentation Ops (CA-05 obligatoire)¶
Runbooks requis avec critères d'exécutabilité :
| Runbook | Objectif | Temps max | Prérequis testeur | Critère de succès |
|---|---|---|---|---|
redis-restart.md | Redémarrage Redis | 10 min | Accès SSH, droits sudo | Service UP, jobs préservés |
queue-blocked.md | Déblocage queue saturée | 15 min | Accès SSH | Queue length < seuil |
job-retry.md | Relance manuelle d'un job | 5 min | Accès API/CLI | Job re-traité avec succès |
monitoring-setup.md | Configuration Bull Board | 15 min | Docker installé | UI accessible, jobs visibles |
Structure obligatoire de chaque runbook :
# [Titre]
## Prérequis
- Liste des accès/outils nécessaires
## Diagnostic
- Commandes pour identifier le problème
## Résolution
- Étapes numérotées avec commandes copiables
## Vérification
- Commande(s) de validation du succès
## Rollback
- Procédure de retour arrière si échec
Protocole de validation TC-DOC-01 :
- Sélection testeur : Personne non-experte Redis/BullMQ (ex: dev frontend, PM)
- Environnement : Staging avec accès SSH configuré
- Exécution : Testeur suit le runbook sans aide extérieure
- Mesures :
- Temps d'exécution (chronomètre)
- Nombre de questions posées (doit être 0)
- Résultat final (succès/échec)
- Validation : Runbook validé si exécuté en ≤ temps max sans question
- Traçabilité : Checklist signée avec date, nom testeur, temps mesuré
Observable TC-DOC-01 : Fichier docs/validation/runbooks-validation.md contenant les résultats de validation signés
12. Checklist de validation¶
- Redis déployé avec AOF enabled
- BullMQ module configuré avec retry/backoff
- Event listeners sur failed/stalled
- API
/jobs/:queue/:jobId/statusopérationnelle (CA-02 obligatoire) - Endpoint
/jobs/health/integrityopérationnel (ERR-03 obligatoire) - Mécanisme checkDataIntegrity() implémenté avec état
incomplete - Systemd Restart=always pour tous services
- Tests TC-NOM-01 et TC-NOM-02 passants
- Tests TC-INV-02 (observabilité API) passants
- Tests TC-ERR-03 (force majeure) passants
- Alertes configurées sur failures
- Runbooks rédigés avec structure obligatoire (CA-05)
- Runbooks validés par testeur non-expert (TC-DOC-01)
- Fichier
runbooks-validation.mdsigné avec temps mesurés