2024-01-20
15 min read

Building Scalable Job Queues with NestJS and Bull Queue

Learn how to implement robust background job processing in NestJS using Bull Queue, including job scheduling, retries, progress tracking, and concurrency management.

NestJS
Bull Queue
Redis
TypeScript
Background Jobs
Performance

Background job processing is essential for handling time-consuming tasks in modern applications. In this guide, we'll explore how to implement scalable job queues using NestJS and Bull Queue, with a focus on handling complex processing requirements.

Setting Up Bull Queue

bash
npm install @nestjs/bull bull

Configuring Bull Module

typescript
// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 6379,
        password: process.env.REDIS_PASSWORD,
      },
      defaultJobOptions: {
        attempts: 3,
        backoff: {
          type: 'exponential',
          delay: 1000,
        },
        removeOnComplete: true,
      },
    }),
    // Register queues
    BullModule.registerQueue(
      { name: 'video-processing' },
      { name: 'email' },
      { name: 'notifications' }
    ),
  ],
})
export class AppModule {}

Implementing Job Processors

typescript
// video/video-processor.processor.ts
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { Job } from 'bull';

@Processor('video-processing')
export class VideoProcessor {
  private readonly logger = new Logger(VideoProcessor.name);

  @Process('transcode')
  async handleTranscode(job: Job) {
    this.logger.debug('Start transcoding...');
    this.logger.debug(job.data);

    let progress = 0;
    while (progress < 100) {
      await this.doTranscoding(job.data);
      progress += 10;
      await job.progress(progress);
    }

    return { success: true };
  }

  @Process('thumbnail')
  async handleThumbnail(job: Job) {
    this.logger.debug('Start generating thumbnail...');
    this.logger.debug(job.data);

    await this.generateThumbnail(job.data);
    return { success: true };
  }

  private async doTranscoding(data: any) {
    // Implement actual video transcoding logic
    await new Promise(resolve => setTimeout(resolve, 1000));
  }

  private async generateThumbnail(data: any) {
    // Implement thumbnail generation logic
    await new Promise(resolve => setTimeout(resolve, 500));
  }
}

Adding Job Events and Listeners

typescript
// video/video-processor.processor.ts
import { OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull';

@Processor('video-processing')
export class VideoProcessor {
  @OnQueueActive()
  onActive(job: Job) {
    this.logger.debug(
      `Processing job ${job.id} of type ${job.name}. Data: ${JSON.stringify(job.data)}`,
    );
  }

  @OnQueueCompleted()
  onComplete(job: Job, result: any) {
    this.logger.debug(
      `Completed job ${job.id} of type ${job.name}. Result: ${JSON.stringify(result)}`,
    );
  }

  @OnQueueFailed()
  onError(job: Job<any>, error: any) {
    this.logger.error(
      `Failed job ${job.id} of type ${job.name}: ${error.message}`,
      error.stack,
    );
  }
}

Implementing the Queue Service

typescript
// video/video.service.ts
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';

@Injectable()
export class VideoService {
  constructor(
    @InjectQueue('video-processing') private videoQueue: Queue,
  ) {}

  async processVideo(videoData: any) {
    // Add transcoding job
    const transcodingJob = await this.videoQueue.add(
      'transcode',
      {
        videoId: videoData.id,
        format: videoData.format,
        resolution: videoData.resolution,
      },
      {
        priority: 1,
        attempts: 3,
        backoff: {
          type: 'exponential',
          delay: 1000,
        },
        timeout: 3600000, // 1 hour
      },
    );

    // Add thumbnail job
    const thumbnailJob = await this.videoQueue.add(
      'thumbnail',
      {
        videoId: videoData.id,
        timestamp: videoData.thumbnailTime,
      },
      {
        priority: 2,
        attempts: 2,
      },
    );

    return {
      transcodingJobId: transcodingJob.id,
      thumbnailJobId: thumbnailJob.id,
    };
  }

  async getJobStatus(jobId: string) {
    const job = await this.videoQueue.getJob(jobId);
    if (!job) {
      return { status: 'not_found' };
    }

    const state = await job.getState();
    const progress = job.progress();

    return {
      id: job.id,
      status: state,
      progress,
      data: job.data,
      returnvalue: job.returnvalue,
      failedReason: job.failedReason,
    };
  }
}

Implementing Concurrent Processing

typescript
// video/video-processor.processor.ts
@Processor('video-processing')
export class VideoProcessor {
  constructor() {
    // Configure worker threads
    const worker = new Worker('./video-worker.js', {
      workerData: {
        // Worker configuration
      },
    });

    worker.on('message', (result) => {
      // Handle worker results
    });
  }

  @Process({
    name: 'transcode',
    concurrency: 3, // Process 3 jobs simultaneously
  })
  async handleTranscode(job: Job) {
    // Implementation
  }
}

Implementing Job Scheduling

typescript
// video/video.service.ts
async scheduleVideoProcessing(videoData: any, scheduleTime: Date) {
  const delay = scheduleTime.getTime() - Date.now();
  
  return this.videoQueue.add(
    'transcode',
    {
      videoId: videoData.id,
      format: videoData.format,
    },
    {
      delay,
      repeat: {
        cron: '0 0 * * *', // Daily at midnight
      },
    },
  );
}

Implementing Progress Tracking

typescript
// video/video.gateway.ts
import { WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server } from 'socket.io';
import { OnQueueProgress } from '@nestjs/bull';

@WebSocketGateway()
export class VideoGateway {
  @WebSocketServer()
  server: Server;

  @OnQueueProgress()
  onProgress(job: Job, progress: number) {
    this.server.to(job.data.userId).emit('processingProgress', {
      jobId: job.id,
      progress,
    });
  }
}

Best Practices and Optimization

  • Implement proper error handling and retries
  • Use job priorities for important tasks
  • Implement job cleanup strategies
  • Monitor queue health and performance
  • Implement rate limiting for job processing
  • Use job events for monitoring and logging

By following these patterns and implementing proper error handling and monitoring, you can create a robust job processing system that scales well with your application's needs.