Background job processing is essential for handling time-consuming tasks in modern applications. In this guide, we'll explore how to implement scalable job queues using NestJS and Bull Queue, with a focus on handling complex processing requirements.
Setting Up Bull Queue
bash
npm install @nestjs/bull bull
Configuring Bull Module
typescript
// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
password: process.env.REDIS_PASSWORD,
},
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: true,
},
}),
// Register queues
BullModule.registerQueue(
{ name: 'video-processing' },
{ name: 'email' },
{ name: 'notifications' }
),
],
})
export class AppModule {}
Implementing Job Processors
typescript
// video/video-processor.processor.ts
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { Job } from 'bull';
@Processor('video-processing')
export class VideoProcessor {
private readonly logger = new Logger(VideoProcessor.name);
@Process('transcode')
async handleTranscode(job: Job) {
this.logger.debug('Start transcoding...');
this.logger.debug(job.data);
let progress = 0;
while (progress < 100) {
await this.doTranscoding(job.data);
progress += 10;
await job.progress(progress);
}
return { success: true };
}
@Process('thumbnail')
async handleThumbnail(job: Job) {
this.logger.debug('Start generating thumbnail...');
this.logger.debug(job.data);
await this.generateThumbnail(job.data);
return { success: true };
}
private async doTranscoding(data: any) {
// Implement actual video transcoding logic
await new Promise(resolve => setTimeout(resolve, 1000));
}
private async generateThumbnail(data: any) {
// Implement thumbnail generation logic
await new Promise(resolve => setTimeout(resolve, 500));
}
}
Adding Job Events and Listeners
typescript
// video/video-processor.processor.ts
import { OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull';
@Processor('video-processing')
export class VideoProcessor {
@OnQueueActive()
onActive(job: Job) {
this.logger.debug(
`Processing job ${job.id} of type ${job.name}. Data: ${JSON.stringify(job.data)}`,
);
}
@OnQueueCompleted()
onComplete(job: Job, result: any) {
this.logger.debug(
`Completed job ${job.id} of type ${job.name}. Result: ${JSON.stringify(result)}`,
);
}
@OnQueueFailed()
onError(job: Job<any>, error: any) {
this.logger.error(
`Failed job ${job.id} of type ${job.name}: ${error.message}`,
error.stack,
);
}
}
Implementing the Queue Service
typescript
// video/video.service.ts
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
@Injectable()
export class VideoService {
constructor(
@InjectQueue('video-processing') private videoQueue: Queue,
) {}
async processVideo(videoData: any) {
// Add transcoding job
const transcodingJob = await this.videoQueue.add(
'transcode',
{
videoId: videoData.id,
format: videoData.format,
resolution: videoData.resolution,
},
{
priority: 1,
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
timeout: 3600000, // 1 hour
},
);
// Add thumbnail job
const thumbnailJob = await this.videoQueue.add(
'thumbnail',
{
videoId: videoData.id,
timestamp: videoData.thumbnailTime,
},
{
priority: 2,
attempts: 2,
},
);
return {
transcodingJobId: transcodingJob.id,
thumbnailJobId: thumbnailJob.id,
};
}
async getJobStatus(jobId: string) {
const job = await this.videoQueue.getJob(jobId);
if (!job) {
return { status: 'not_found' };
}
const state = await job.getState();
const progress = job.progress();
return {
id: job.id,
status: state,
progress,
data: job.data,
returnvalue: job.returnvalue,
failedReason: job.failedReason,
};
}
}
Implementing Concurrent Processing
typescript
// video/video-processor.processor.ts
@Processor('video-processing')
export class VideoProcessor {
constructor() {
// Configure worker threads
const worker = new Worker('./video-worker.js', {
workerData: {
// Worker configuration
},
});
worker.on('message', (result) => {
// Handle worker results
});
}
@Process({
name: 'transcode',
concurrency: 3, // Process 3 jobs simultaneously
})
async handleTranscode(job: Job) {
// Implementation
}
}
Implementing Job Scheduling
typescript
// video/video.service.ts
async scheduleVideoProcessing(videoData: any, scheduleTime: Date) {
const delay = scheduleTime.getTime() - Date.now();
return this.videoQueue.add(
'transcode',
{
videoId: videoData.id,
format: videoData.format,
},
{
delay,
repeat: {
cron: '0 0 * * *', // Daily at midnight
},
},
);
}
Implementing Progress Tracking
typescript
// video/video.gateway.ts
import { WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server } from 'socket.io';
import { OnQueueProgress } from '@nestjs/bull';
@WebSocketGateway()
export class VideoGateway {
@WebSocketServer()
server: Server;
@OnQueueProgress()
onProgress(job: Job, progress: number) {
this.server.to(job.data.userId).emit('processingProgress', {
jobId: job.id,
progress,
});
}
}
Best Practices and Optimization
- Implement proper error handling and retries
- Use job priorities for important tasks
- Implement job cleanup strategies
- Monitor queue health and performance
- Implement rate limiting for job processing
- Use job events for monitoring and logging
By following these patterns and implementing proper error handling and monitoring, you can create a robust job processing system that scales well with your application's needs.