Sending SMS messages to a large audience requires careful planning to handle provider rate limits, ensure deliverability, and manage failures gracefully. Simply looping through recipients and calling an API will quickly lead to errors, blocked messages, and an unreliable system.
This guide demonstrates how to build a robust and scalable bulk SMS broadcasting application using NestJS, the Vonage Messages API, and BullMQ for background job processing and rate limiting. By leveraging a message queue, we decouple the initial API request from the actual sending process, allowing us to throttle message dispatch according to Vonage's limitations and retry failed attempts automatically.
Project Goals:
- Create a NestJS API endpoint to accept bulk SMS broadcast requests (recipient list and message content).
- Use BullMQ and Redis to queue individual SMS sending jobs.
- Implement a queue processor (worker) that sends messages via the Vonage Messages API.
- Throttle the worker to respect Vonage's rate limits (e.g., 1 message per second for long codes, adjustable).
- Implement basic status tracking for broadcasts and individual messages using Prisma and PostgreSQL.
- Provide robust error handling and automatic retries for failed messages.
- Ensure secure handling of API credentials.
- Containerize the application using Docker for easy setup and deployment.
Technologies Used:
- NestJS: A progressive Node.js framework for building efficient, reliable, and scalable server-side applications. Its modular architecture is perfect for this type of project.
- Vonage Messages API: A unified API for sending messages across various channels, including SMS. We use the official
@vonage/server-sdk
. - BullMQ: A robust, Redis-based message queue for Node.js. Essential for background processing, throttling, and retries.
- Redis: An in-memory data structure store, used as the backend for BullMQ.
- Prisma: A modern database toolkit for TypeScript and Node.js. Simplifies database access, migrations, and type safety with PostgreSQL.
- PostgreSQL: A powerful, open-source object-relational database system.
- Docker & Docker Compose: For containerizing the application and its dependencies (Redis, Postgres), ensuring consistent development and deployment environments.
- TypeScript: Enhances JavaScript with static typing for better maintainability and fewer runtime errors.
System Architecture:
The system follows this general flow:
- A client sends a POST request to the NestJS API (
/broadcasts
) with a list of recipients and the message text. - The NestJS API creates database records for the broadcast and individual messages (initially marked as PENDING).
- For each recipient, the API adds a job to the BullMQ queue (backed by Redis). This job contains the necessary details (recipient number, message text, database message ID).
- A separate NestJS Queue Worker process monitors the BullMQ queue.
- The Worker picks up jobs from the queue, respecting the configured rate limit (e.g., one job per second).
- For each job, the Worker updates the corresponding message status in the database to PROCESSING.
- The Worker calls the Vonage Messages API to send the SMS.
- Based on the Vonage API response:
- On success, the Worker updates the message status in the database to SENT and records the Vonage message ID.
- On failure, the Worker logs the error. If retries are configured and remaining, BullMQ will automatically requeue the job after a backoff period. If it's the final attempt, the Worker updates the message status to FAILED and records the error details.
- Optionally, Vonage can send status updates (Delivery Receipts - DLRs) via webhooks back to the NestJS API, which can further update the message status in the database (e.g., DELIVERED, FAILED).
Prerequisites:
- Node.js (LTS version recommended, e.g., v18 or v20)
- npm or yarn
- Docker and Docker Compose
- A Vonage API account (Sign up here)
- A Vonage phone number capable of sending SMS.
- Basic understanding of NestJS, TypeScript, APIs, and databases.
curl
or a tool like Postman for testing the API.
Final Outcome:
You will have a containerized NestJS application with an API endpoint to initiate bulk SMS broadcasts. Messages will be reliably sent in the background, respecting Vonage rate limits, with status tracking and automatic retries.
1. Setting Up the Project
Let's initialize the NestJS project, set up the directory structure, install dependencies, and configure Docker.
1.1 Initialize NestJS Project
Open your terminal and run the NestJS CLI command:
npm install -g @nestjs/cli
nest new vonage-bulk-sms --package-manager npm
cd vonage-bulk-sms
1.2 Project Structure
We'll organize our code into modules for better separation of concerns:
src/
├── app.module.ts # Root application module
├── app.controller.ts
├── app.service.ts
├── main.ts # Application entry point
│
├── broadcast/ # Handles broadcast requests & API
│ ├── dto/
│ │ └── create-broadcast.dto.ts
│ ├── broadcast.module.ts
│ ├── broadcast.controller.ts
│ └── broadcast.service.ts
│
├── queue/ # BullMQ configuration and processor
│ ├── queue.module.ts
│ └── broadcast.processor.ts # Processes jobs from the queue
│
├── vonage/ # Service for interacting with Vonage API
│ ├── vonage.module.ts
│ └── vonage.service.ts
│
├── prisma/ # Prisma service and configuration
│ ├── prisma.module.ts
│ └── prisma.service.ts
│
└── config/ # Configuration management (optional but good practice)
├── config.module.ts
└── configuration.ts
1.3 Install Dependencies
Install the necessary packages for BullMQ, Vonage, Prisma, configuration, and validation:
# Core Dependencies
npm install @nestjs/config class-validator class-transformer
# BullMQ & Redis
npm install @nestjs/bullmq bullmq ioredis
# Vonage
npm install @vonage/server-sdk
# Prisma & PostgreSQL Driver
npm install @prisma/client pg
npm install --save-dev prisma
# API Throttling & Security
npm install @nestjs/throttler throttler-storage-redis helmet
1.4 Configure Environment Variables
Create a .env
file in the project root. This file will store sensitive credentials and configuration. Never commit this file to version control.
# .env
# Application
NODE_ENV=development
PORT=3000
# Vonage Credentials (Get from Vonage Dashboard -> API Settings)
VONAGE_API_KEY=YOUR_VONAGE_API_KEY
VONAGE_API_SECRET=YOUR_VONAGE_API_SECRET
VONAGE_APPLICATION_ID=YOUR_VONAGE_APPLICATION_ID # Get after creating an app
VONAGE_SENDER_ID=YOUR_VONAGE_PHONE_NUMBER # The number messages will be sent from
VONAGE_PRIVATE_KEY_PATH=./private.key # Path relative to project root
# Database (Connection string used by Prisma (read by the application inside the container))
DATABASE_URL="postgresql://user:password@postgres:5432/vonage_bulk_sms?schema=public"
# These POSTGRES_* variables are used by the Docker Compose service to initialize the container
POSTGRES_DB=vonage_bulk_sms
POSTGRES_USER=user
POSTGRES_PASSWORD=password
# Redis (Used by Docker Compose & BullMQ)
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_PASSWORD= # Leave empty if no password, or set one in docker-compose
# BullMQ Queue Config
BROADCAST_QUEUE_NAME=broadcast-sms
# Rate limit: 1 job processed every 1000ms (1/sec) - Adjust based on Vonage limits for your number type
BULLMQ_RATE_LIMIT_MAX=1
BULLMQ_RATE_LIMIT_DURATION=1000
BULLMQ_JOB_ATTEMPTS=3 # Retry failed jobs 3 times
BULLMQ_JOB_BACKOFF=5000 # Wait 5 seconds before the first retry
# API Rate Limiting
THROTTLE_TTL=60 # seconds
THROTTLE_LIMIT=20 # requests per TTL from the same IP
- Obtaining Vonage Credentials: You'll get the API Key and Secret from your Vonage Dashboard. The Application ID and
private.key
file are generated later when creating a Vonage Application specifically for the Messages API. The Sender ID is the Vonage virtual number you'll send messages from. - Database/Redis: These credentials match the ones we'll define in
docker-compose.yml
.
1.5 Configure Docker
Create a Dockerfile
in the project root:
# Dockerfile
FROM node:18-alpine AS development
WORKDIR /usr/src/app
COPY package*.json ./
RUN npm install --only=development
COPY . .
USER node
# ---
FROM node:18-alpine AS builder
WORKDIR /usr/src/app
COPY package*.json ./
# Install production dependencies and Prisma client generation dependencies
RUN npm install --only=production --ignore-scripts && npm install --only=development @prisma/client
COPY . .
# Generate Prisma Client
RUN npx prisma generate
# Build the application
RUN npm run build
# Prune development dependencies
RUN npm prune --production
USER node
# ---
FROM node:18-alpine AS production
COPY /usr/src/app/node_modules ./node_modules
COPY /usr/src/app/dist ./dist
COPY /usr/src/app/prisma ./prisma
# Copy the generated Prisma client from the builder stage
COPY /usr/src/app/node_modules/.prisma ./node_modules/.prisma
# Copy private key if it exists at build time (alternative: mount at runtime)
# COPY --chown=node:node private.key ./private.key
WORKDIR /usr/src/app
# Expose port and define command
EXPOSE 3000
CMD ["node", "dist/main.js"]
Create a docker-compose.yml
file:
# docker-compose.yml
version: '3.8'
services:
app:
build:
context: .
target: production # Use the production stage from Dockerfile
container_name: vonage_bulk_sms_app
env_file:
- .env # Load environment variables from .env file
ports:
- "${PORT:-3000}:3000" # Use PORT from .env, default to 3000
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
volumes:
- ./private.key:/usr/src/app/private.key:ro # Mount private key read-only
# For development, you might mount src:
# - ./src:/usr/src/app/src
# - ./prisma:/usr/src/app/prisma
networks:
- vonage-net
command: node dist/main.js # Explicitly start the built app
postgres:
image: postgres:15-alpine
container_name: postgres_db
environment:
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432" # Map port for external tools if needed (e.g., pgAdmin)
networks:
- vonage-net
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB}"]
interval: 10s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
container_name: redis_cache
# If you set REDIS_PASSWORD in .env, uncomment the command below
# command: redis-server --requirepass ${REDIS_PASSWORD}
ports:
- "6379:6379" # Map port for external tools if needed
volumes:
- redis_data:/data
networks:
- vonage-net
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
volumes:
postgres_data:
redis_data:
networks:
vonage-net:
driver: bridge
- This setup defines three services: our NestJS app, a PostgreSQL database, and a Redis instance.
- It uses the
.env
file to configure containers. - It mounts the
private.key
file into the application container (make sure this file exists before runningdocker-compose up
). - Includes basic health checks and explicit
depends_on
conditions.
Create a .dockerignore
file to optimize build context:
# .dockerignore
node_modules
dist
.git
.env
*.log
1.6 Initialize Prisma
Run the Prisma init command:
npx prisma init --datasource-provider postgresql
This creates a prisma
directory with a schema.prisma
file and updates .env
with a placeholder DATABASE_URL
. Replace the placeholder DATABASE_URL
in .env
with the one we defined earlier, matching the Docker Compose setup (ensure it's quoted if it contains special characters).
Modify prisma/schema.prisma
later (Section 6) when defining the data model.
1.7 Configure NestJS Modules
We need to import and configure the modules we'll be using.
Update src/app.module.ts
:
// src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { BullModule } from '@nestjs/bullmq';
import { ThrottlerModule, ThrottlerGuard } from '@nestjs/throttler';
import { ThrottlerStorageRedisService } from 'throttler-storage-redis';
import { APP_GUARD } from '@nestjs/core';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { BroadcastModule } from './broadcast/broadcast.module';
import { QueueModule } from './queue/queue.module';
import { VonageModule } from './vonage/vonage.module';
import { PrismaModule } from './prisma/prisma.module';
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true, // Make ConfigService available globally
envFilePath: '.env',
}),
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
connection: {
host: configService.get<string>('REDIS_HOST'),
port: configService.get<number>('REDIS_PORT'),
password: configService.get<string>('REDIS_PASSWORD') || undefined,
},
}),
inject: [ConfigService],
}),
ThrottlerModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (configService: ConfigService) => ({
storage: new ThrottlerStorageRedisService({
host: configService.get<string>('REDIS_HOST'),
port: configService.get<number>('REDIS_PORT'),
password: configService.get<string>('REDIS_PASSWORD') || undefined,
// Note: Ensure compatibility or check for alternative storage options if needed
}),
ttl: configService.get<number>('THROTTLE_TTL', 60),
limit: configService.get<number>('THROTTLE_LIMIT', 20),
}),
}),
PrismaModule, // Add PrismaModule
VonageModule, // Add VonageModule
QueueModule, // Add QueueModule
BroadcastModule, // Add BroadcastModule
],
controllers: [AppController],
providers: [
AppService,
{
provide: APP_GUARD, // Apply throttling globally
useClass: ThrottlerGuard,
},
],
})
export class AppModule {}
This root module now:
- Loads environment variables globally using
ConfigModule
. - Configures the base BullMQ connection using Redis details from
.env
. - Configures global API rate limiting (
ThrottlerModule
) using Redis for distributed storage. - Imports our feature modules (which we'll create next).
2. Implementing Core Functionality (Queue & Worker)
Now, let's set up BullMQ to handle the SMS jobs and create the worker process that consumes these jobs.
2.1 Configure Queue Module
Create src/queue/queue.module.ts
:
// src/queue/queue.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { BroadcastProcessor } from './broadcast.processor';
import { VonageModule } from '../vonage/vonage.module'; // Import VonageModule
import { PrismaModule } from '../prisma/prisma.module'; // Import PrismaModule
@Module({
imports: [
BullModule.registerQueueAsync({
name: 'broadcast-sms', // Use queue name from config or define directly
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
name: configService.get<string>('BROADCAST_QUEUE_NAME', 'broadcast-sms'),
defaultJobOptions: {
attempts: configService.get<number>('BULLMQ_JOB_ATTEMPTS', 3),
backoff: {
type: 'exponential',
delay: configService.get<number>('BULLMQ_JOB_BACKOFF', 5000),
},
removeOnComplete: true, // Keep queue clean
removeOnFail: false, // Keep failed jobs for inspection (optional)
},
limiter: { // Rate limiting configuration
max: configService.get<number>('BULLMQ_RATE_LIMIT_MAX', 1),
duration: configService.get<number>('BULLMQ_RATE_LIMIT_DURATION', 1000),
},
}),
inject: [ConfigService],
}),
VonageModule, // Make VonageService available for injection
PrismaModule, // Make PrismaService available
ConfigModule, // Import ConfigModule to make ConfigService injectable here
],
providers: [BroadcastProcessor], // Register the processor
exports: [BullModule], // Export BullModule if needed in other modules (like BroadcastService)
})
export class QueueModule {}
BullModule.registerQueueAsync
: Registers a specific queue namedbroadcast-sms
.defaultJobOptions
: Sets default behavior for jobs added to this queue (attempts, backoff strategy).limiter
: Crucially, this configures BullMQ's built-in rate limiter. It ensures theBroadcastProcessor
will only processBULLMQ_RATE_LIMIT_MAX
jobs withinBULLMQ_RATE_LIMIT_DURATION
milliseconds, effectively throttling calls to the Vonage API.- We import
VonageModule
,PrismaModule
, andConfigModule
to make their services injectable into theBroadcastProcessor
.
2.2 Create Vonage Service
This service encapsulates interaction with the Vonage SDK.
Create src/vonage/vonage.service.ts
:
// src/vonage/vonage.service.ts
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Vonage } from '@vonage/server-sdk';
import { MessageSendRequest } from '@vonage/messages';
import * as path from 'path';
import * as fs from 'fs';
@Injectable()
export class VonageService implements OnModuleInit {
private readonly logger = new Logger(VonageService.name);
private vonageClient: Vonage;
constructor(private configService: ConfigService) {}
onModuleInit() {
try {
const apiKey = this.configService.getOrThrow<string>('VONAGE_API_KEY');
const apiSecret = this.configService.getOrThrow<string>('VONAGE_API_SECRET');
const applicationId = this.configService.getOrThrow<string>('VONAGE_APPLICATION_ID');
const privateKeyPath = this.configService.getOrThrow<string>('VONAGE_PRIVATE_KEY_PATH');
// Ensure the private key path is handled correctly, might need adjustment
// depending on how/where it's mounted or located in production.
// This assumes it's relative to the project root where the app starts.
const resolvedPrivateKeyPath = path.resolve(process.cwd(), privateKeyPath);
// Check if file exists before initializing - helps debugging
if (!fs.existsSync(resolvedPrivateKeyPath)) {
this.logger.error(`Vonage private key not found at path: ${resolvedPrivateKeyPath}`);
throw new Error(`Vonage private key not found at path: ${resolvedPrivateKeyPath}`);
}
this.vonageClient = new Vonage({
apiKey: apiKey,
apiSecret: apiSecret,
applicationId: applicationId,
privateKey: resolvedPrivateKeyPath, // Use resolved path
});
this.logger.log('Vonage SDK initialized successfully.');
} catch (error) {
this.logger.error(`Failed to initialize Vonage SDK: ${error.message}`, error.stack);
// Depending on requirements, you might want the app to fail startup
// or handle this state gracefully (e.g., disable sending).
throw new Error(`Failed to initialize Vonage SDK: ${error.message}`);
}
}
async sendSms(to: string, text: string): Promise<{ messageUuid: string | null; error?: any }> {
const from = this.configService.getOrThrow<string>('VONAGE_SENDER_ID');
const message: MessageSendRequest = {
message_type: 'text',
channel: 'sms',
to: to,
from: from,
text: text,
};
try {
this.logger.log(`Attempting to send SMS to ${to}`);
const response = await this.vonageClient.messages.send(message);
this.logger.log(`SMS sent successfully to ${to}, message_uuid: ${response.message_uuid}`);
return { messageUuid: response.message_uuid };
} catch (error) {
// Attempt to extract meaningful error info from Vonage SDK error structure
const errorDetails = error?.response?.data || error?.message || error;
this.logger.error(`Failed to send SMS to ${to}: ${JSON.stringify(errorDetails)}`, error?.stack);
// Return error details for the processor to handle
return { messageUuid: null, error: errorDetails };
}
}
}
Create src/vonage/vonage.module.ts
:
// src/vonage/vonage.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { VonageService } from './vonage.service';
@Module({
imports: [ConfigModule], // Ensure ConfigService is available
providers: [VonageService],
exports: [VonageService], // Export for use in other modules
})
export class VonageModule {}
- Initializes the Vonage SDK using credentials from
.env
. Includes path resolution and existence check for the private key. - Provides a
sendSms
method that wraps the SDK'smessages.send
call. - Includes logging for success and failure.
- Returns both the
message_uuid
on success and potential error details on failure.
2.3 Create the Queue Processor
This class defines how jobs from the broadcast-sms
queue are handled.
Create src/queue/broadcast.processor.ts
:
// src/queue/broadcast.processor.ts
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import { VonageService } from '../vonage/vonage.service';
import { PrismaService } from '../prisma/prisma.service'; // Import PrismaService
import { MessageStatus } from '@prisma/client'; // Import enum (define in schema.prisma later)
import { ConfigService } from '@nestjs/config'; // Import ConfigService
// Define the structure of the job data we expect
interface BroadcastJobData {
messageId: number; // Link back to our database record
recipient: string;
messageText: string;
}
@Processor('broadcast-sms') // Connects to the queue named 'broadcast-sms'
export class BroadcastProcessor extends WorkerHost {
private readonly logger = new Logger(BroadcastProcessor.name);
constructor(
private readonly vonageService: VonageService,
private readonly prisma: PrismaService, // Inject PrismaService
private readonly configService: ConfigService, // Inject ConfigService
) {
super();
}
async process(job: Job<BroadcastJobData, any, string>): Promise<void> {
this.logger.log(`Processing job ${job.id} for recipient ${job.data.recipient} (Attempt ${job.attemptsMade})`);
const { messageId, recipient, messageText } = job.data;
try {
// Update status to PROCESSING in DB
// Increment attempt count (job.attemptsMade is 1-based for current attempt)
await this.prisma.message.update({
where: { id: messageId },
data: { status: MessageStatus.PROCESSING, attempts: job.attemptsMade },
});
const result = await this.vonageService.sendSms(recipient, messageText);
if (result.messageUuid) {
// Update status to SENT in DB
await this.prisma.message.update({
where: { id: messageId },
data: {
status: MessageStatus.SENT,
providerMessageId: result.messageUuid,
processedAt: new Date(),
errorDetails: null, // Clear previous errors on success
},
});
this.logger.log(`Job ${job.id} completed successfully. Vonage message_uuid: ${result.messageUuid}`);
} else {
// If sendSms indicates failure but doesn't throw (e.g., API error response)
const errorMessage = `Vonage API failed to send SMS: ${JSON.stringify(result.error)}`;
this.logger.warn(`Job ${job.id} failed sending via Vonage: ${errorMessage}`);
// Throw error to trigger BullMQ retry mechanism
throw new Error(errorMessage);
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error during processing';
this.logger.error(`Job ${job.id} (attempt ${job.attemptsMade}) failed with error: ${errorMessage}`, error instanceof Error ? error.stack : undefined);
// Check if it's the final attempt
const maxAttempts = job.opts.attempts ?? this.configService.get<number>('BULLMQ_JOB_ATTEMPTS', 3);
if (job.attemptsMade >= maxAttempts) {
// Final attempt failed, mark as FAILED in DB
this.logger.warn(`Job ${job.id} reached max attempts (${maxAttempts}) and failed permanently.`);
await this.prisma.message.update({
where: { id: messageId },
data: {
status: MessageStatus.FAILED,
processedAt: new Date(),
errorDetails: errorMessage,
},
});
} else {
// Not the final attempt, keep status as PROCESSING
// BullMQ will handle the retry state and delay based on backoff strategy.
// Update the error message on each attempt.
this.logger.log(`Job ${job.id} failed, will retry (attempt ${job.attemptsMade}/${maxAttempts}).`);
await this.prisma.message.update({
where: { id: messageId },
data: {
status: MessageStatus.PROCESSING, // Keep as PROCESSING during retry phase
errorDetails: `Attempt ${job.attemptsMade} failed: ${errorMessage}`
},
});
}
// Re-throw the error so BullMQ knows the job failed and should be retried (if attempts remain)
throw error;
}
}
// Optional: Listen to worker events for more detailed logging/monitoring
@OnWorkerEvent('active')
onActive(job: Job) {
this.logger.log(`Job ${job.id} has started processing (attempt ${job.attemptsMade}).`);
}
@OnWorkerEvent('completed')
onCompleted(job: Job, result: any) {
// Note: This logs successful completion *after* the 'process' method returns successfully.
this.logger.log(`Job ${job.id} has completed successfully.`);
}
@OnWorkerEvent('failed')
onFailed(job: Job<BroadcastJobData> | undefined, err: Error) {
// This event fires after the 'process' method throws an error, potentially before the final retry attempt.
// More detailed final failure logging is handled within the 'process' method's catch block.
this.logger.error(`Job ${job?.id ?? 'unknown'} failed on attempt ${job?.attemptsMade ?? 'N/A'} with error: ${err.message}`);
}
@OnWorkerEvent('error')
onError(err: Error) {
// General worker errors not tied to a specific job
this.logger.error('Worker error:', err.stack);
}
@OnWorkerEvent('stalled')
onStalled(jobId: string) {
// Indicates a job was marked as active but did not complete within a timeout
this.logger.warn(`Job ${jobId} has stalled.`);
}
}
@Processor('broadcast-sms')
: Decorator linking this class to the specified queue.process(job: Job)
: The core method called by BullMQ for each job. It receives thejob
object containing our data (recipient
,messageText
,messageId
).- Injects
VonageService
,PrismaService
, andConfigService
. - Updates the message status in the database (
PENDING
->PROCESSING
->SENT
/FAILED
). Increments attempt count. - Handles errors from
VonageService
and general processing errors. - Throws errors to let BullMQ handle retries based on the queue's
defaultJobOptions
. Updates DB status accordingly:FAILED
on final failure, keepsPROCESSING
during intermediate retry attempts, storing the latest error. - Includes optional event listeners (
@OnWorkerEvent
) for detailed logging.
3. Building the API Layer
This layer exposes an endpoint to receive broadcast requests and add jobs to the queue.
3.1 Create DTO (Data Transfer Object)
DTOs define the expected request body structure and enable automatic validation using class-validator
.
Create src/broadcast/dto/create-broadcast.dto.ts
:
// src/broadcast/dto/create-broadcast.dto.ts
import { IsNotEmpty, IsString, IsArray, ArrayNotEmpty, IsPhoneNumber } from 'class-validator';
// Optional: for Swagger documentation (install @nestjs/swagger if using)
// import { ApiProperty } from '@nestjs/swagger';
export class CreateBroadcastDto {
// @ApiProperty({
// description: 'List of recipient phone numbers in E.164 format (e.g., +14155552671)',
// example: ['+14155552671', '+447700900000'],
// type: [String],
// })
@IsArray()
@ArrayNotEmpty()
// Apply validation to each element in the array
// Note: IsPhoneNumber might require a specific region or allow generic E.164
// Consider a more robust validation library if needed (e.g., google-libphonenumber)
@IsPhoneNumber(undefined, { each: true, message: 'Each recipient must be a valid phone number in E.164 format' })
recipients: string[];
// @ApiProperty({
// description: 'The text message content to send.',
// example: 'Hello from our awesome service!',
// })
@IsString()
@IsNotEmpty()
message: string;
}
- Uses decorators from
class-validator
to enforce rules (must be an array, not empty, must contain valid phone numbers, message must be a non-empty string). @ApiProperty
is commented out but can be used if Swagger is integrated.
3.2 Create Broadcast Service
This service handles the business logic: creating the broadcast record and adding individual message jobs to the queue.
Create src/broadcast/broadcast.service.ts
:
// src/broadcast/broadcast.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { PrismaService } from '../prisma/prisma.service';
import { CreateBroadcastDto } from './dto/create-broadcast.dto';
import { Broadcast, MessageStatus } from '@prisma/client'; // Import generated types
@Injectable()
export class BroadcastService {
private readonly logger = new Logger(BroadcastService.name);
constructor(
@InjectQueue('broadcast-sms') private readonly broadcastQueue: Queue,
private readonly prisma: PrismaService,
) {}
async createBroadcast(createBroadcastDto: CreateBroadcastDto): Promise<Broadcast> {
const { recipients, message } = createBroadcastDto;
this.logger.log(`Received broadcast request for ${recipients.length} recipients.`);
// 1. Create the main Broadcast record
let broadcast: Broadcast;
try {
broadcast = await this.prisma.broadcast.create({
data: {
messageContent: message,
totalRecipients: recipients.length,
status: 'PENDING', // Initial status
},
});
this.logger.log(`Created broadcast record with ID: ${broadcast.id}`);
} catch (error) {
this.logger.error(`Failed to create broadcast record: ${error.message}`, error.stack);
throw new Error('Failed to initiate broadcast in database.');
}
// 2. Create individual Message records and add jobs to the queue
const messageJobsPromises = recipients.map(async (recipient) => {
try {
// Create placeholder message record in DB
const dbMessage = await this.prisma.message.create({
data: {
broadcastId: broadcast.id,
recipient: recipient,
status: MessageStatus.PENDING, // Initial status for message
},
});
// Add job to BullMQ
// The job data links back to the DB message record
await this.broadcastQueue.add(
'send-sms-job', // Job name (can be descriptive)
{
messageId: dbMessage.id, // Pass the DB ID
recipient: recipient,
messageText: message,
},
// Optional: Job-specific options like delay or priority can go here
);
return { success: true, recipient };
} catch (error) {
this.logger.error(`Failed to create message record or queue job for recipient ${recipient}: ${error.message}`, error.stack);
// Decide how to handle partial failures:
// - Continue processing others?
// - Rollback/cancel the broadcast? (More complex)
// Here, we log the error and continue with others.
return { success: false, recipient, error: error.message };
}
});
// Wait for all DB creations and queue additions to be initiated
const results = await Promise.all(messageJobsPromises);
const failedCount = results.filter(r => !r.success).length;
if (failedCount > 0) {
this.logger.warn(`Failed to queue ${failedCount} out of ${recipients.length} messages for broadcast ${broadcast.id}.`);
// Optionally update broadcast status or record partial failure details
}
// Optionally update broadcast status to 'QUEUED' or similar after adding all jobs
// try {
// await this.prisma.broadcast.update({
// where: { id: broadcast.id },
// data: { status: 'QUEUED' }, // Consider a more nuanced status if partial failures occurred
// });
// } catch (error) {
// this.logger.error(`Failed to update broadcast ${broadcast.id} status to QUEUED: ${error.message}`);
// }
this.logger.log(`Attempted to add ${recipients.length} message jobs to the queue for broadcast ${broadcast.id}. Failures: ${failedCount}.`);
return broadcast; // Return the created broadcast record
}
// Optional: Add methods to get broadcast status
async getBroadcastStatus(id: number): Promise<Broadcast | null> {
return this.prisma.broadcast.findUnique({
where: { id: id },
include: {
// Optionally include counts of message statuses
_count: {
select: { messages: true }
},
// Example: Include counts per status (more detailed)
// messages: {
// where: { status: MessageStatus.SENT },
// select: { _count: true } // This syntax might need adjustment based on Prisma version
// }
// A more practical approach might be separate queries for counts per status.
}
});
}
async getMessageStatus(id: number): Promise<any | null> { // Using 'any' for simplicity
const message = await this.prisma.message.findUnique({ where: { id: id }});
if (!message) return null;
// Optionally check job status in BullMQ if needed (more complex)
// Requires storing BullMQ job ID in the Message schema
// const job = await this.broadcastQueue.getJob(message.jobId);
return {
id: message.id,
recipient: message.recipient,
status: message.status,
attempts: message.attempts,
providerMessageId: message.providerMessageId,
error: message.errorDetails,
createdAt: message.createdAt,
processedAt: message.processedAt
};
}
}
@InjectQueue('broadcast-sms')
: Injects the BullMQ queue instance.- Injects
PrismaService
. createBroadcast
:- Takes the validated DTO.
- Creates a main
Broadcast
record in the database. - Iterates through recipients:
- Creates an individual
Message
record (statusPENDING
) in the database for tracking. - Adds a job to the
broadcastQueue
with themessageId
(linking back to the DB record), recipient, and message text.
- Creates an individual
- Includes basic error handling for database operations and queue additions.
- Returns the created
Broadcast
object.
- Includes optional methods (
getBroadcastStatus
,getMessageStatus
) for querying the state of broadcasts and individual messages from the database.