Aller au contenu

ProbatioVault - Spécification Worker Réplication

PD-6: Worker Async OVH → AWS Glacier (À implémenter)


1. Objectif

Implémenter un worker en arrière-plan qui: 1. Surveille les nouveaux uploads sur OVH S3 (documents-hot) 2. Réplique automatiquement vers AWS Glacier Deep Archive 3. Applique Object Lock COMPLIANCE (50 ans par défaut) 4. Gère les tags (type document, rétention légale) 5. Nettoie optionnellement OVH après réplication (configurable)


2. Architecture Worker

2.1 Technologies

┌────────────────────────────────────────┐
│ Worker Replication (NestJS)            │
│                                        │
│ ├─ Bull Queue (Redis)                 │
│ │  └─ Jobs: replication-job           │
│ │                                      │
│ ├─ Cron (schedule)                    │
│ │  └─ Every 5 minutes                 │
│ │                                      │
│ ├─ S3 SDK                             │
│ │  ├─ OVH S3 client                  │
│ │  └─ AWS S3 client                  │
│ │                                      │
│ └─ PostgreSQL (tracking)              │
│    └─ Table: replication_status       │
└────────────────────────────────────────┘

Stack technique: - NestJS: Framework backend - Bull: Queue jobs (basé sur Redis) - AWS SDK v3: Client S3/Glacier - node-s3-client: Client OVH S3 - PostgreSQL: Tracking état réplication

2.2 Flux de Réplication

┌─────────────────────────────────────────────────────────┐
│ STEP 1: Détection nouveaux fichiers                    │
└─────────────────┬───────────────────────────────────────┘
         ┌────────────────┐
         │ OVH S3 Bucket  │
         │ documents-hot  │
         └────────┬───────┘
                  │ List Objects (last 5 min)
┌─────────────────────────────────────────────────────────┐
│ STEP 2: Filtrage objets non-répliqués                  │
│                                                         │
│ SELECT key FROM documents                               │
│ WHERE key NOT IN (                                      │
│   SELECT s3_key FROM replication_status                 │
│   WHERE status = 'replicated'                           │
│ )                                                       │
└─────────────────┬───────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ STEP 3: Enqueue jobs (Bull Queue)                      │
│                                                         │
│ foreach (unreplicated_object) {                         │
│   queue.add('replication-job', {                        │
│     s3_key: object.key,                                 │
│     bucket_source: 'documents-hot',                     │
│     bucket_dest: 'documents-cold',                      │
│     metadata: object.metadata,                          │
│   });                                                   │
│ }                                                       │
└─────────────────┬───────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ STEP 4: Worker traite jobs (concurrent)                │
│                                                         │
│ Job Handler:                                            │
│ 1. Download from OVH S3                                 │
│ 2. Parse metadata (tags, document type)                 │
│ 3. Compute retention (based on document type)           │
│ 4. Upload to AWS S3 with Object Lock                    │
│ 5. Verify upload (checksum comparison)                  │
│ 6. Update replication_status table                      │
│ 7. (Optional) Delete from OVH if configured             │
└─────────────────┬───────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ STEP 5: Confirmation et logs                           │
│                                                         │
│ INSERT INTO replication_status (                        │
│   s3_key, status, replicated_at, aws_version_id         │
│ ) VALUES (?, 'replicated', NOW(), ?)                    │
│                                                         │
│ INSERT INTO audit_logs (                                │
│   action, metadata                                      │
│ ) VALUES ('REPLICATION_SUCCESS', {...})                 │
└─────────────────────────────────────────────────────────┘

3. Schéma Base de Données

3.1 Table replication_status

CREATE TABLE vault_secure.replication_status (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

  -- Identification objet
  s3_key VARCHAR(1024) NOT NULL UNIQUE, -- Key S3 (ex: documents/user-123/doc-456.pdf.enc)
  document_id UUID, -- Lien vers table documents (optionnel)
  user_id UUID, -- Lien vers user (pour tracking)

  -- Source et destination
  bucket_source VARCHAR(255) NOT NULL, -- 'documents-hot' (OVH)
  bucket_dest VARCHAR(255) NOT NULL,   -- 'documents-cold' (AWS)

  -- État réplication
  status VARCHAR(50) NOT NULL, -- 'pending', 'in_progress', 'replicated', 'failed'
  replicated_at TIMESTAMPTZ, -- Date réplication réussie
  failed_at TIMESTAMPTZ, -- Date dernière erreur
  retry_count INT DEFAULT 0, -- Nombre de tentatives
  error_message TEXT, -- Message erreur si failed

  -- Metadata AWS
  aws_version_id VARCHAR(255), -- Version ID AWS S3
  aws_etag VARCHAR(255), -- ETag AWS (checksum)
  aws_object_lock_until TIMESTAMPTZ, -- Date expiration Object Lock

  -- Metadata OVH
  ovh_etag VARCHAR(255), -- ETag OVH (comparaison intégrité)
  ovh_size BIGINT, -- Taille fichier (bytes)

  -- Audit
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Index pour requêtes fréquentes
CREATE INDEX idx_replication_status_status ON vault_secure.replication_status(status);
CREATE INDEX idx_replication_status_user_id ON vault_secure.replication_status(user_id);
CREATE INDEX idx_replication_status_replicated_at ON vault_secure.replication_status(replicated_at);

-- RLS (sécurité)
ALTER TABLE vault_secure.replication_status ENABLE ROW LEVEL SECURITY;

CREATE POLICY replication_status_admin_only
ON vault_secure.replication_status
FOR ALL
TO authenticated
USING (
  current_setting('app.user_role', true) = 'admin'
);

4. Implémentation NestJS

4.1 Module ReplicationWorker

Fichier: src/modules/replication/replication-worker.module.ts

import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { TypeOrmModule } from '@nestjs/typeorm';

import { ReplicationStatus } from './entities/replication-status.entity';
import { ReplicationWorkerService } from './services/replication-worker.service';
import { ReplicationProcessor } from './processors/replication.processor';

@Module({
  imports: [
    // Bull Queue
    BullModule.registerQueue({
      name: 'replication',
      defaultJobOptions: {
        attempts: 3,
        backoff: {
          type: 'exponential',
          delay: 5000, // 5s, 25s, 125s
        },
        removeOnComplete: 100, // Garder 100 derniers jobs réussis
        removeOnFail: 1000,    // Garder 1000 derniers jobs échoués
      },
    }),

    // TypeORM
    TypeOrmModule.forFeature([ReplicationStatus]),
  ],
  providers: [
    ReplicationWorkerService,
    ReplicationProcessor,
  ],
  exports: [ReplicationWorkerService],
})
export class ReplicationWorkerModule {}

4.2 Service ReplicationWorker

Fichier: src/modules/replication/services/replication-worker.service.ts

import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import { Cron, CronExpression } from '@nestjs/schedule';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { S3Client, ListObjectsV2Command } from '@aws-sdk/client-s3';

import { ReplicationStatus } from '../entities/replication-status.entity';

@Injectable()
export class ReplicationWorkerService {
  private readonly logger = new Logger(ReplicationWorkerService.name);

  private ovhS3Client: S3Client;
  private awsS3Client: S3Client;

  constructor(
    @InjectQueue('replication') private replicationQueue: Queue,
    @InjectRepository(ReplicationStatus)
    private replicationRepo: Repository<ReplicationStatus>,
  ) {
    // Init clients S3
    this.ovhS3Client = new S3Client({
      region: process.env.OVH_S3_REGION,
      endpoint: process.env.OVH_S3_ENDPOINT,
      credentials: {
        accessKeyId: process.env.OVH_S3_ACCESS_KEY_ID,
        secretAccessKey: process.env.OVH_S3_SECRET_ACCESS_KEY,
      },
    });

    this.awsS3Client = new S3Client({
      region: process.env.AWS_S3_REGION,
      credentials: {
        accessKeyId: process.env.AWS_S3_ACCESS_KEY_ID,
        secretAccessKey: process.env.AWS_S3_SECRET_ACCESS_KEY,
      },
    });
  }

  /**
   * Cron: Détecte nouveaux fichiers OVH toutes les 5 minutes
   */
  @Cron(CronExpression.EVERY_5_MINUTES)
  async scanNewDocuments() {
    this.logger.log('Starting scan for new documents on OVH S3...');

    try {
      // Liste tous les objets OVH S3
      const listCommand = new ListObjectsV2Command({
        Bucket: process.env.OVH_S3_BUCKET_DOCUMENTS,
        MaxKeys: 1000, // Traiter par batch de 1000
      });

      const response = await this.ovhS3Client.send(listCommand);

      if (!response.Contents || response.Contents.length === 0) {
        this.logger.log('No objects found in OVH S3');
        return;
      }

      // Filtrer objets non-répliqués
      const unreplicatedKeys = await this.filterUnreplicated(
        response.Contents.map((obj) => obj.Key),
      );

      this.logger.log(`Found ${unreplicatedKeys.length} unreplicated objects`);

      // Enqueue jobs
      for (const key of unreplicatedKeys) {
        await this.replicationQueue.add('replicate-document', {
          s3_key: key,
          bucket_source: process.env.OVH_S3_BUCKET_DOCUMENTS,
          bucket_dest: process.env.AWS_S3_BUCKET_DOCUMENTS_COLD,
        });
      }

      this.logger.log(`Enqueued ${unreplicatedKeys.length} replication jobs`);
    } catch (error) {
      this.logger.error('Error scanning OVH S3', error.stack);
    }
  }

  /**
   * Filtre les clés S3 non-répliquées (pas dans table replication_status)
   */
  private async filterUnreplicated(keys: string[]): Promise<string[]> {
    const replicated = await this.replicationRepo.find({
      where: {
        s3_key: In(keys),
        status: 'replicated',
      },
      select: ['s3_key'],
    });

    const replicatedKeys = new Set(replicated.map((r) => r.s3_key));

    return keys.filter((key) => !replicatedKeys.has(key));
  }
}

4.3 Processor Réplication

Fichier: src/modules/replication/processors/replication.processor.ts

import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { Job } from 'bull';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import {
  S3Client,
  GetObjectCommand,
  PutObjectCommand,
  HeadObjectCommand,
} from '@aws-sdk/client-s3';

import { ReplicationStatus } from '../entities/replication-status.entity';

@Processor('replication')
export class ReplicationProcessor {
  private readonly logger = new Logger(ReplicationProcessor.name);

  private ovhS3Client: S3Client;
  private awsS3Client: S3Client;

  constructor(
    @InjectRepository(ReplicationStatus)
    private replicationRepo: Repository<ReplicationStatus>,
  ) {
    // Init clients S3 (même config que service)
    this.ovhS3Client = new S3Client({ /* ... */ });
    this.awsS3Client = new S3Client({ /* ... */ });
  }

  @Process('replicate-document')
  async handleReplication(job: Job) {
    const { s3_key, bucket_source, bucket_dest } = job.data;

    this.logger.log(`Processing replication for ${s3_key}`);

    try {
      // 1. Créer entrée replication_status (status = 'in_progress')
      let replicationStatus = await this.replicationRepo.findOne({
        where: { s3_key },
      });

      if (!replicationStatus) {
        replicationStatus = this.replicationRepo.create({
          s3_key,
          bucket_source,
          bucket_dest,
          status: 'in_progress',
        });
      } else {
        replicationStatus.status = 'in_progress';
        replicationStatus.retry_count += 1;
      }

      await this.replicationRepo.save(replicationStatus);

      // 2. Download depuis OVH S3
      const getCommand = new GetObjectCommand({
        Bucket: bucket_source,
        Key: s3_key,
      });

      const ovhResponse = await this.ovhS3Client.send(getCommand);
      const fileBuffer = await this.streamToBuffer(ovhResponse.Body);

      // 3. Parse metadata (tags, document type)
      const metadata = ovhResponse.Metadata || {};
      const documentType = metadata['document-type'] || 'general';

      // 4. Compute retention (basé sur type document)
      const retentionYears = this.getRetentionYears(documentType);
      const retainUntilDate = new Date();
      retainUntilDate.setFullYear(retainUntilDate.getFullYear() + retentionYears);

      // 5. Upload vers AWS S3 avec Object Lock
      const putCommand = new PutObjectCommand({
        Bucket: bucket_dest,
        Key: s3_key,
        Body: fileBuffer,
        Metadata: metadata,
        ObjectLockMode: 'COMPLIANCE',
        ObjectLockRetainUntilDate: retainUntilDate,
        Tagging: `DocumentType=${documentType}&RetentionYears=${retentionYears}`,
      });

      const awsResponse = await this.awsS3Client.send(putCommand);

      // 6. Vérifier intégrité (compare ETags)
      const headCommand = new HeadObjectCommand({
        Bucket: bucket_dest,
        Key: s3_key,
      });

      const awsHead = await this.awsS3Client.send(headCommand);

      if (awsHead.ETag !== ovhResponse.ETag) {
        throw new Error('ETag mismatch: integrity check failed');
      }

      // 7. Update replication_status (success)
      replicationStatus.status = 'replicated';
      replicationStatus.replicated_at = new Date();
      replicationStatus.aws_version_id = awsResponse.VersionId;
      replicationStatus.aws_etag = awsHead.ETag;
      replicationStatus.aws_object_lock_until = retainUntilDate;
      replicationStatus.ovh_etag = ovhResponse.ETag;
      replicationStatus.ovh_size = fileBuffer.length;

      await this.replicationRepo.save(replicationStatus);

      this.logger.log(`Replication successful for ${s3_key}`);

      // 8. (Optionnel) Supprimer de OVH si configuré
      if (process.env.DELETE_FROM_OVH_AFTER_REPLICATION === 'true') {
        // TODO: Delete from OVH (PD-XX)
      }
    } catch (error) {
      this.logger.error(`Replication failed for ${s3_key}`, error.stack);

      // Update status = 'failed'
      await this.replicationRepo.update(
        { s3_key },
        {
          status: 'failed',
          failed_at: new Date(),
          error_message: error.message,
        },
      );

      throw error; // Retry automatique (Bull)
    }
  }

  /**
   * Détermine durée rétention basée sur type document
   */
  private getRetentionYears(documentType: string): number {
    const retentionMap = {
      payslip: 50,   // Bulletins de paie (Code travail)
      invoice: 10,   // Factures (Code commerce)
      contract: 10,  // Contrats
      general: 50,   // Par défaut (sécurité maximale)
    };

    return retentionMap[documentType] || 50;
  }

  /**
   * Convert stream to buffer
   */
  private async streamToBuffer(stream: any): Promise<Buffer> {
    const chunks = [];
    for await (const chunk of stream) {
      chunks.push(chunk);
    }
    return Buffer.concat(chunks);
  }
}

5. Configuration

5.1 Variables d'Environnement

Fichier: .env

# OVH S3 (source)
OVH_S3_ENDPOINT=https://s3.gra.cloud.ovh.net
OVH_S3_REGION=GRA
OVH_S3_ACCESS_KEY_ID=<from terraform output>
OVH_S3_SECRET_ACCESS_KEY=<from terraform output>
OVH_S3_BUCKET_DOCUMENTS=probatiovault-documents-hot-dev

# AWS S3/Glacier (destination)
AWS_S3_ENDPOINT=https://s3.eu-west-3.amazonaws.com
AWS_S3_REGION=eu-west-3
AWS_S3_ACCESS_KEY_ID=<from terraform output>
AWS_S3_SECRET_ACCESS_KEY=<from terraform output>
AWS_S3_BUCKET_DOCUMENTS_COLD=probatiovault-documents-cold-dev

# Worker configuration
REPLICATION_CRON_SCHEDULE=*/5 * * * * # Every 5 minutes
REPLICATION_CONCURRENCY=5 # Traiter 5 jobs en parallèle
DELETE_FROM_OVH_AFTER_REPLICATION=false # Ne PAS supprimer OVH (keep HOT)

# Redis (Bull Queue)
REDIS_HOST=localhost
REDIS_PORT=6379

5.2 Déploiement Docker

Fichier: docker-compose.replication.yml

version: '3.8'

services:
  replication-worker:
    build:
      context: ./ProbatioVault-backend
      dockerfile: Dockerfile
    container_name: probatiovault-replication-worker
    command: npm run start:worker # Script spécifique worker
    environment:
      - NODE_ENV=production
      - OVH_S3_ENDPOINT=${OVH_S3_ENDPOINT}
      - OVH_S3_ACCESS_KEY_ID=${OVH_S3_ACCESS_KEY_ID}
      - OVH_S3_SECRET_ACCESS_KEY=${OVH_S3_SECRET_ACCESS_KEY}
      - AWS_S3_ACCESS_KEY_ID=${AWS_S3_ACCESS_KEY_ID}
      - AWS_S3_SECRET_ACCESS_KEY=${AWS_S3_SECRET_ACCESS_KEY}
    depends_on:
      - redis
      - postgres
    restart: unless-stopped

  redis:
    image: redis:7-alpine
    container_name: probatiovault-redis
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data
    restart: unless-stopped

volumes:
  redis-data:

6. Tests

6.1 Test Unitaire (Processor)

Fichier: src/modules/replication/processors/replication.processor.spec.ts

describe('ReplicationProcessor', () => {
  let processor: ReplicationProcessor;
  let replicationRepo: MockRepository<ReplicationStatus>;
  let ovhS3Client: jest.Mocked<S3Client>;
  let awsS3Client: jest.Mocked<S3Client>;

  beforeEach(() => {
    // Mock repository et clients S3
    replicationRepo = createMockRepository();
    ovhS3Client = createMockS3Client();
    awsS3Client = createMockS3Client();

    processor = new ReplicationProcessor(replicationRepo);
    processor['ovhS3Client'] = ovhS3Client;
    processor['awsS3Client'] = awsS3Client;
  });

  it('should replicate document successfully', async () => {
    const job = {
      data: {
        s3_key: 'test-document.pdf.enc',
        bucket_source: 'documents-hot',
        bucket_dest: 'documents-cold',
      },
    };

    // Mock OVH GetObject
    ovhS3Client.send.mockResolvedValueOnce({
      Body: Buffer.from('encrypted content'),
      ETag: '"abc123"',
      Metadata: { 'document-type': 'payslip' },
    });

    // Mock AWS PutObject
    awsS3Client.send.mockResolvedValueOnce({
      VersionId: 'v1',
      ETag: '"abc123"',
    });

    // Mock AWS HeadObject
    awsS3Client.send.mockResolvedValueOnce({
      ETag: '"abc123"',
    });

    await processor.handleReplication(job);

    // Verify replication status saved
    expect(replicationRepo.save).toHaveBeenCalledWith(
      expect.objectContaining({
        status: 'replicated',
        aws_version_id: 'v1',
      }),
    );
  });

  it('should retry on failure', async () => {
    const job = {
      data: {
        s3_key: 'test-document.pdf.enc',
        bucket_source: 'documents-hot',
        bucket_dest: 'documents-cold',
      },
    };

    // Mock OVH GetObject failure
    ovhS3Client.send.mockRejectedValueOnce(new Error('Network error'));

    await expect(processor.handleReplication(job)).rejects.toThrow('Network error');

    // Verify status = 'failed'
    expect(replicationRepo.update).toHaveBeenCalledWith(
      { s3_key: 'test-document.pdf.enc' },
      expect.objectContaining({
        status: 'failed',
        error_message: 'Network error',
      }),
    );
  });
});

6.2 Test E2E (Réplication complète)

# 1. Upload fichier test sur OVH
aws s3 cp test-document.pdf s3://probatiovault-documents-hot-dev/test.pdf \
  --endpoint-url https://s3.gra.cloud.ovh.net \
  --metadata document-type=payslip

# 2. Attendre worker (max 5 minutes)
sleep 300

# 3. Vérifier réplication AWS
aws s3api get-object-retention \
  --bucket probatiovault-documents-cold-dev \
  --key test.pdf

# Résultat attendu:
# {
#   "Retention": {
#     "Mode": "COMPLIANCE",
#     "RetainUntilDate": "2074-01-19T00:00:00Z"
#   }
# }

# 4. Vérifier table replication_status
psql probatiovault -c "SELECT * FROM vault_secure.replication_status WHERE s3_key = 'test.pdf';"

# Résultat attendu: status = 'replicated'

7. Monitoring

7.1 Métriques Bull Queue

// Dashboard Bull Board (UI)
import { BullBoardModule } from 'bull-board/nestjs';

@Module({
  imports: [
    BullBoardModule.forRoot({
      route: '/admin/queues',
      adapter: ExpressAdapter,
    }),
  ],
})

// Accessible à: http://localhost:3000/admin/queues

7.2 Logs et Alertes

// Logger structuré
this.logger.log({
  event: 'REPLICATION_SUCCESS',
  s3_key: key,
  duration_ms: Date.now() - startTime,
  aws_version_id: versionId,
});

// Alerte si queue > 1000 jobs
@Cron(CronExpression.EVERY_10_MINUTES)
async checkQueueSize() {
  const jobCounts = await this.replicationQueue.getJobCounts();

  if (jobCounts.waiting > 1000) {
    this.logger.error('Replication queue overloaded', { jobCounts });
    // TODO: Send alert (PD-XX)
  }
}

Document rédigé le: 2024-01-19 Version: 1.0 (spécification) Implémentation: PD-6 (à créer) Prérequis: PD-4 (buckets créés)