code examples
code examples
Build Bulk SMS Broadcasting with NestJS, Vonage Messages API & BullMQ (2025 Guide)
Complete guide to building scalable bulk SMS broadcasting using NestJS, Vonage Messages API, and BullMQ. Includes rate limiting, automatic retries, Docker containerization, Prisma database tracking, and production-ready error handling.
Build Bulk SMS Broadcasting with NestJS, Vonage Messages API & BullMQ
Build a robust bulk SMS broadcasting system that handles provider rate limits, ensures deliverability, and manages failures gracefully. Looping through recipients and calling an API directly leads to errors, blocked messages, and unreliable delivery.
This guide shows you how to build a scalable bulk SMS application using NestJS, the Vonage Messages API, and BullMQ for background job processing and rate limiting. You'll decouple the initial API request from the sending process, throttle message dispatch according to Vonage's rate limits (30 API requests per second default, carrier-restricted to 1 request/second for certain number types), and automatically retry failed attempts.
<!-- DEPTH: Introduction lacks specific problem statement with concrete failure scenarios (Priority: Medium) --> <!-- GAP: Missing cost implications/pricing considerations for bulk SMS (Type: Substantive) -->What You'll Learn:
- Set up NestJS with BullMQ, Redis, Prisma, and PostgreSQL for production-grade SMS broadcasting
- Integrate Vonage Messages API with proper authentication and error handling
- Implement rate limiting to respect Vonage's API constraints (1-30 requests/second)
- Build a background job processor with automatic retries and exponential backoff
- Track message status in PostgreSQL using Prisma ORM (PENDING → PROCESSING → SENT/FAILED)
- Containerize your application with Docker and Docker Compose
- Handle 10DLC registration requirements for US SMS delivery
Prerequisites: Basic understanding of NestJS, TypeScript, APIs, databases, Docker, and message queues.
<!-- GAP: Missing specific version/experience level requirements (Type: Critical) --> <!-- DEPTH: Prerequisites too vague - should specify minimum knowledge thresholds (Priority: High) -->Project Goals:
- Create a NestJS API endpoint to accept bulk SMS broadcast requests (recipient list and message content).
- Use BullMQ and Redis to queue individual SMS sending jobs.
- Implement a queue processor (worker) that sends messages via the Vonage Messages API.
- Throttle the worker to respect Vonage's rate limits (e.g., 1 message per second for long codes, adjustable).
- Implement basic status tracking for broadcasts and individual messages using Prisma and PostgreSQL.
- Provide robust error handling and automatic retries for failed messages.
- Ensure secure handling of API credentials.
- Containerize the application using Docker for easy setup and deployment.
Technologies Used:
- NestJS: A progressive Node.js framework for building efficient, reliable, and scalable server-side applications. Its modular architecture is perfect for this type of project.
- Vonage Messages API: A unified API for sending messages across various channels, including SMS. We use the official
@vonage/server-sdk. - BullMQ: A robust, Redis-based message queue for Node.js. Essential for background processing, throttling, and retries.
- Redis: An in-memory data structure store, used as the backend for BullMQ.
- Prisma: A modern database toolkit for TypeScript and Node.js. Simplifies database access, migrations, and type safety with PostgreSQL.
- PostgreSQL: A powerful, open-source object-relational database system.
- Docker & Docker Compose: For containerizing the application and its dependencies (Redis, Postgres), ensuring consistent development and deployment environments.
- TypeScript: Enhances JavaScript with static typing for better maintainability and fewer runtime errors.
System Architecture:
The system follows this general flow:
- A client sends a POST request to the NestJS API (
/broadcasts) with a list of recipients and the message text. - The NestJS API creates database records for the broadcast and individual messages (initially marked as PENDING).
- For each recipient, the API adds a job to the BullMQ queue (backed by Redis). This job contains the necessary details (recipient number, message text, database message ID).
- A separate NestJS Queue Worker process monitors the BullMQ queue.
- The Worker picks up jobs from the queue, respecting the configured rate limit (e.g., one job per second).
- For each job, the Worker updates the corresponding message status in the database to PROCESSING.
- The Worker calls the Vonage Messages API to send the SMS.
- Based on the Vonage API response:
- On success, the Worker updates the message status in the database to SENT and records the Vonage message ID.
- On failure, the Worker logs the error. If retries are configured and remaining, BullMQ will automatically requeue the job after a backoff period. If it's the final attempt, the Worker updates the message status to FAILED and records the error details.
- Optionally, Vonage can send status updates (Delivery Receipts - DLRs) via webhooks back to the NestJS API, which can further update the message status in the database (e.g., DELIVERED, FAILED).
Prerequisites:
- Node.js (LTS version recommended, e.g., v20 or v22). Node.js 22.x is the latest LTS version (Active LTS until October 2025), while Node.js 20.x is in maintenance mode. Node.js 18.x reaches end-of-life on April 30, 2025.
- npm or yarn
- Docker and Docker Compose
- A Vonage API account (Sign up here)
- A Vonage phone number capable of sending SMS. Note: All API keys have a default rate limit of 30 API requests per second, though carrier restrictions can limit this to as low as 1 request per second for certain number types. For US 10DLC numbers, Brand and Campaign registration is mandatory.
- Basic understanding of NestJS, TypeScript, APIs, and databases.
curlor a tool like Postman for testing the API.
Final Outcome:
You will have a containerized NestJS application with an API endpoint to initiate bulk SMS broadcasts. Messages will be reliably sent in the background, respecting Vonage rate limits, with status tracking and automatic retries.
1. Setting Up the Project
Initialize the NestJS project, set up the directory structure, install dependencies, and configure Docker.
1.1 Initialize NestJS Project
Open your terminal and run the NestJS CLI command:
npm install -g @nestjs/cli
nest new vonage-bulk-sms --package-manager npm
cd vonage-bulk-sms1.2 Project Structure
Organize your code into modules for better separation of concerns:
src/
├── app.module.ts # Root application module
├── app.controller.ts
├── app.service.ts
├── main.ts # Application entry point
│
├── broadcast/ # Handles broadcast requests & API
│ ├── dto/
│ │ └── create-broadcast.dto.ts
│ ├── broadcast.module.ts
│ ├── broadcast.controller.ts
│ └── broadcast.service.ts
│
├── queue/ # BullMQ configuration and processor
│ ├── queue.module.ts
│ └── broadcast.processor.ts # Processes jobs from the queue
│
├── vonage/ # Service for interacting with Vonage API
│ ├── vonage.module.ts
│ └── vonage.service.ts
│
├── prisma/ # Prisma service and configuration
│ ├── prisma.module.ts
│ └── prisma.service.ts
│
└── config/ # Configuration management (optional but good practice)
├── config.module.ts
└── configuration.ts
<!-- EXPAND: Could add brief explanation of each module's responsibility and interaction patterns (Type: Enhancement) -->
1.3 Install Dependencies
Install the necessary packages for BullMQ, Vonage, Prisma, configuration, and validation:
# Core Dependencies
npm install @nestjs/config class-validator class-transformer
# BullMQ & Redis
npm install @nestjs/bullmq bullmq ioredis
# Vonage
npm install @vonage/server-sdk
# Prisma & PostgreSQL Driver
npm install @prisma/client pg
npm install --save-dev prisma
# API Throttling & Security
npm install @nestjs/throttler throttler-storage-redis helmet1.4 Configure Environment Variables
Create a .env file in the project root. This file stores sensitive credentials and configuration. Never commit this file to version control.
# .env
# Application
NODE_ENV=development
PORT=3000
# Vonage Credentials (Get from Vonage Dashboard -> API Settings)
VONAGE_API_KEY=YOUR_VONAGE_API_KEY
VONAGE_API_SECRET=YOUR_VONAGE_API_SECRET
VONAGE_APPLICATION_ID=YOUR_VONAGE_APPLICATION_ID # Get after creating an app
VONAGE_SENDER_ID=YOUR_VONAGE_PHONE_NUMBER # The number messages will be sent from
VONAGE_PRIVATE_KEY_PATH=./private.key # Path relative to project root
# Database (Connection string used by Prisma (read by the application inside the container))
DATABASE_URL="postgresql://user:password@postgres:5432/vonage_bulk_sms?schema=public"
# These POSTGRES_* variables are used by the Docker Compose service to initialize the container
POSTGRES_DB=vonage_bulk_sms
POSTGRES_USER=user
POSTGRES_PASSWORD=password
# Redis (Used by Docker Compose & BullMQ)
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_PASSWORD= # Leave empty if no password, or set one in docker-compose
# BullMQ Queue Config
BROADCAST_QUEUE_NAME=broadcast-sms
# Rate limit: 1 job processed every 1000ms (1/sec) - Adjust based on Vonage limits for your number type
BULLMQ_RATE_LIMIT_MAX=1
BULLMQ_RATE_LIMIT_DURATION=1000
BULLMQ_JOB_ATTEMPTS=3 # Retry failed jobs 3 times
BULLMQ_JOB_BACKOFF=5000 # Wait 5 seconds before the first retry
# API Rate Limiting
THROTTLE_TTL=60 # seconds
THROTTLE_LIMIT=20 # requests per TTL from the same IP- Obtaining Vonage Credentials: Get your API Key and Secret from the Vonage Dashboard. Generate the Application ID and
private.keyfile when you create a Vonage Application for the Messages API. The Sender ID is your Vonage virtual number. - Database/Redis: These credentials match the ones you'll define in
docker-compose.yml.
1.5 Configure Docker
Create a Dockerfile in the project root:
# Dockerfile
FROM node:18-alpine AS development
WORKDIR /usr/src/app
COPY package*.json ./
RUN npm install --only=development
COPY . .
USER node
# ---
FROM node:18-alpine AS builder
WORKDIR /usr/src/app
COPY package*.json ./
# Install production dependencies and Prisma client generation dependencies
RUN npm install --only=production --ignore-scripts && npm install --only=development @prisma/client
COPY . .
# Generate Prisma Client
RUN npx prisma generate
# Build the application
RUN npm run build
# Prune development dependencies
RUN npm prune --production
USER node
# ---
FROM node:18-alpine AS production
COPY /usr/src/app/node_modules ./node_modules
COPY /usr/src/app/dist ./dist
COPY /usr/src/app/prisma ./prisma
# Copy the generated Prisma client from the builder stage
COPY /usr/src/app/node_modules/.prisma ./node_modules/.prisma
# Copy private key if it exists at build time (alternative: mount at runtime)
# COPY --chown=node:node private.key ./private.key
WORKDIR /usr/src/app
# Expose port and define command
EXPOSE 3000
CMD ["node", "dist/main.js"]Create a docker-compose.yml file:
# docker-compose.yml
version: '3.8'
services:
app:
build:
context: .
target: production # Use the production stage from Dockerfile
container_name: vonage_bulk_sms_app
env_file:
- .env # Load environment variables from .env file
ports:
- "${PORT:-3000}:3000" # Use PORT from .env, default to 3000
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
volumes:
- ./private.key:/usr/src/app/private.key:ro # Mount private key read-only
# For development, you might mount src:
# - ./src:/usr/src/app/src
# - ./prisma:/usr/src/app/prisma
networks:
- vonage-net
command: node dist/main.js # Explicitly start the built app
postgres:
image: postgres:15-alpine
container_name: postgres_db
environment:
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432" # Map port for external tools if needed (e.g., pgAdmin)
networks:
- vonage-net
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB}"]
interval: 10s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
container_name: redis_cache
# If you set REDIS_PASSWORD in .env, uncomment the command below
# command: redis-server --requirepass ${REDIS_PASSWORD}
ports:
- "6379:6379" # Map port for external tools if needed
volumes:
- redis_data:/data
networks:
- vonage-net
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
volumes:
postgres_data:
redis_data:
networks:
vonage-net:
driver: bridge- This setup defines three services: your NestJS app, a PostgreSQL database, and a Redis instance.
- It uses the
.envfile to configure containers. - It mounts the
private.keyfile into the application container (ensure this file exists before runningdocker-compose up). - Includes basic health checks and explicit
depends_onconditions.
Create a .dockerignore file to optimize build context:
# .dockerignore
node_modules
dist
.git
.env
*.log
1.6 Initialize Prisma
Run the Prisma init command:
npx prisma init --datasource-provider postgresqlThis creates a prisma directory with a schema.prisma file and updates .env with a placeholder DATABASE_URL. Replace the placeholder DATABASE_URL in .env with the one you defined earlier, matching the Docker Compose setup (quote it if it contains special characters).
Modify prisma/schema.prisma in Section 6 when defining the data model.
1.7 Configure NestJS Modules
Import and configure the modules you'll use.
Update src/app.module.ts:
// src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { BullModule } from '@nestjs/bullmq';
import { ThrottlerModule, ThrottlerGuard } from '@nestjs/throttler';
import { ThrottlerStorageRedisService } from 'throttler-storage-redis';
import { APP_GUARD } from '@nestjs/core';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { BroadcastModule } from './broadcast/broadcast.module';
import { QueueModule } from './queue/queue.module';
import { VonageModule } from './vonage/vonage.module';
import { PrismaModule } from './prisma/prisma.module';
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true, // Make ConfigService available globally
envFilePath: '.env',
}),
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
connection: {
host: configService.get<string>('REDIS_HOST'),
port: configService.get<number>('REDIS_PORT'),
password: configService.get<string>('REDIS_PASSWORD') || undefined,
},
}),
inject: [ConfigService],
}),
ThrottlerModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (configService: ConfigService) => ({
storage: new ThrottlerStorageRedisService({
host: configService.get<string>('REDIS_HOST'),
port: configService.get<number>('REDIS_PORT'),
password: configService.get<string>('REDIS_PASSWORD') || undefined,
// Note: Ensure compatibility or check for alternative storage options if needed
}),
ttl: configService.get<number>('THROTTLE_TTL', 60),
limit: configService.get<number>('THROTTLE_LIMIT', 20),
}),
}),
PrismaModule, // Add PrismaModule
VonageModule, // Add VonageModule
QueueModule, // Add QueueModule
BroadcastModule, // Add BroadcastModule
],
controllers: [AppController],
providers: [
AppService,
{
provide: APP_GUARD, // Apply throttling globally
useClass: ThrottlerGuard,
},
],
})
export class AppModule {}This root module now:
- Loads environment variables globally using
ConfigModule. - Configures the base BullMQ connection using Redis details from
.env. - Configures global API rate limiting (
ThrottlerModule) using Redis for distributed storage. - Imports your feature modules (which you'll create next).
2. Implementing Core Functionality (Queue & Worker)
Set up BullMQ to handle the SMS jobs and create the worker process that consumes these jobs.
2.1 Configure Queue Module
Create src/queue/queue.module.ts:
// src/queue/queue.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { BroadcastProcessor } from './broadcast.processor';
import { VonageModule } from '../vonage/vonage.module'; // Import VonageModule
import { PrismaModule } from '../prisma/prisma.module'; // Import PrismaModule
@Module({
imports: [
BullModule.registerQueueAsync({
name: 'broadcast-sms', // Use queue name from config or define directly
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
name: configService.get<string>('BROADCAST_QUEUE_NAME', 'broadcast-sms'),
defaultJobOptions: {
attempts: configService.get<number>('BULLMQ_JOB_ATTEMPTS', 3),
backoff: {
type: 'exponential',
delay: configService.get<number>('BULLMQ_JOB_BACKOFF', 5000),
},
removeOnComplete: true, // Keep queue clean
removeOnFail: false, // Keep failed jobs for inspection (optional)
},
limiter: { // Rate limiting configuration
max: configService.get<number>('BULLMQ_RATE_LIMIT_MAX', 1),
duration: configService.get<number>('BULLMQ_RATE_LIMIT_DURATION', 1000),
},
}),
inject: [ConfigService],
}),
VonageModule, // Make VonageService available for injection
PrismaModule, // Make PrismaService available
ConfigModule, // Import ConfigModule to make ConfigService injectable here
],
providers: [BroadcastProcessor], // Register the processor
exports: [BullModule], // Export BullModule if needed in other modules (like BroadcastService)
})
export class QueueModule {}BullModule.registerQueueAsync: Registers a specific queue namedbroadcast-sms.defaultJobOptions: Sets default behavior for jobs added to this queue (attempts, backoff strategy).limiter: Crucially, this configures BullMQ's built-in rate limiter. It ensures theBroadcastProcessorprocesses onlyBULLMQ_RATE_LIMIT_MAXjobs withinBULLMQ_RATE_LIMIT_DURATIONmilliseconds, effectively throttling calls to the Vonage API.- We import
VonageModule,PrismaModule, andConfigModuleto make their services injectable into theBroadcastProcessor.
2.2 Create Vonage Service
This service encapsulates interaction with the Vonage SDK.
Create src/vonage/vonage.service.ts:
// src/vonage/vonage.service.ts
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Vonage } from '@vonage/server-sdk';
import { MessageSendRequest } from '@vonage/messages';
import * as path from 'path';
import * as fs from 'fs';
@Injectable()
export class VonageService implements OnModuleInit {
private readonly logger = new Logger(VonageService.name);
private vonageClient: Vonage;
constructor(private configService: ConfigService) {}
onModuleInit() {
try {
const apiKey = this.configService.getOrThrow<string>('VONAGE_API_KEY');
const apiSecret = this.configService.getOrThrow<string>('VONAGE_API_SECRET');
const applicationId = this.configService.getOrThrow<string>('VONAGE_APPLICATION_ID');
const privateKeyPath = this.configService.getOrThrow<string>('VONAGE_PRIVATE_KEY_PATH');
// Ensure the private key path is handled correctly, might need adjustment
// depending on how/where it's mounted or located in production.
// This assumes it's relative to the project root where the app starts.
const resolvedPrivateKeyPath = path.resolve(process.cwd(), privateKeyPath);
// Check if file exists before initializing - helps debugging
if (!fs.existsSync(resolvedPrivateKeyPath)) {
this.logger.error(`Vonage private key not found at path: ${resolvedPrivateKeyPath}`);
throw new Error(`Vonage private key not found at path: ${resolvedPrivateKeyPath}`);
}
this.vonageClient = new Vonage({
apiKey: apiKey,
apiSecret: apiSecret,
applicationId: applicationId,
privateKey: resolvedPrivateKeyPath, // Use resolved path
});
this.logger.log('Vonage SDK initialized successfully.');
} catch (error) {
this.logger.error(`Failed to initialize Vonage SDK: ${error.message}`, error.stack);
// Depending on requirements, you might want the app to fail startup
// or handle this state gracefully (e.g., disable sending).
throw new Error(`Failed to initialize Vonage SDK: ${error.message}`);
}
}
async sendSms(to: string, text: string): Promise<{ messageUuid: string | null; error?: any }> {
const from = this.configService.getOrThrow<string>('VONAGE_SENDER_ID');
const message: MessageSendRequest = {
message_type: 'text',
channel: 'sms',
to: to,
from: from,
text: text,
};
try {
this.logger.log(`Attempting to send SMS to ${to}`);
const response = await this.vonageClient.messages.send(message);
this.logger.log(`SMS sent successfully to ${to}, message_uuid: ${response.message_uuid}`);
return { messageUuid: response.message_uuid };
} catch (error) {
// Attempt to extract meaningful error info from Vonage SDK error structure
const errorDetails = error?.response?.data || error?.message || error;
this.logger.error(`Failed to send SMS to ${to}: ${JSON.stringify(errorDetails)}`, error?.stack);
// Return error details for the processor to handle
return { messageUuid: null, error: errorDetails };
}
}
}Create src/vonage/vonage.module.ts:
// src/vonage/vonage.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { VonageService } from './vonage.service';
@Module({
imports: [ConfigModule], // Ensure ConfigService is available
providers: [VonageService],
exports: [VonageService], // Export for use in other modules
})
export class VonageModule {}- Initializes the Vonage SDK using credentials from
.env. Includes path resolution and existence check for the private key. - Provides a
sendSmsmethod that wraps the SDK'smessages.sendcall. - Includes logging for success and failure.
- Returns both the
message_uuidon success and potential error details on failure.
2.3 Create the Queue Processor
This class defines how the broadcast-sms queue handles jobs.
Create src/queue/broadcast.processor.ts:
// src/queue/broadcast.processor.ts
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import { VonageService } from '../vonage/vonage.service';
import { PrismaService } from '../prisma/prisma.service'; // Import PrismaService
import { MessageStatus } from '@prisma/client'; // Import enum (define in schema.prisma later)
import { ConfigService } from '@nestjs/config'; // Import ConfigService
// Define the structure of the job data we expect
interface BroadcastJobData {
messageId: number; // Link back to our database record
recipient: string;
messageText: string;
}
@Processor('broadcast-sms') // Connects to the queue named 'broadcast-sms'
export class BroadcastProcessor extends WorkerHost {
private readonly logger = new Logger(BroadcastProcessor.name);
constructor(
private readonly vonageService: VonageService,
private readonly prisma: PrismaService, // Inject PrismaService
private readonly configService: ConfigService, // Inject ConfigService
) {
super();
}
async process(job: Job<BroadcastJobData, any, string>): Promise<void> {
this.logger.log(`Processing job ${job.id} for recipient ${job.data.recipient} (Attempt ${job.attemptsMade})`);
const { messageId, recipient, messageText } = job.data;
try {
// Update status to PROCESSING in DB
// Increment attempt count (job.attemptsMade is 1-based for current attempt)
await this.prisma.message.update({
where: { id: messageId },
data: { status: MessageStatus.PROCESSING, attempts: job.attemptsMade },
});
const result = await this.vonageService.sendSms(recipient, messageText);
if (result.messageUuid) {
// Update status to SENT in DB
await this.prisma.message.update({
where: { id: messageId },
data: {
status: MessageStatus.SENT,
providerMessageId: result.messageUuid,
processedAt: new Date(),
errorDetails: null, // Clear previous errors on success
},
});
this.logger.log(`Job ${job.id} completed successfully. Vonage message_uuid: ${result.messageUuid}`);
} else {
// If sendSms indicates failure but doesn't throw (e.g., API error response)
const errorMessage = `Vonage API failed to send SMS: ${JSON.stringify(result.error)}`;
this.logger.warn(`Job ${job.id} failed sending via Vonage: ${errorMessage}`);
// Throw error to trigger BullMQ retry mechanism
throw new Error(errorMessage);
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error during processing';
this.logger.error(`Job ${job.id} (attempt ${job.attemptsMade}) failed with error: ${errorMessage}`, error instanceof Error ? error.stack : undefined);
// Check if it's the final attempt
const maxAttempts = job.opts.attempts ?? this.configService.get<number>('BULLMQ_JOB_ATTEMPTS', 3);
if (job.attemptsMade >= maxAttempts) {
// Final attempt failed, mark as FAILED in DB
this.logger.warn(`Job ${job.id} reached max attempts (${maxAttempts}) and failed permanently.`);
await this.prisma.message.update({
where: { id: messageId },
data: {
status: MessageStatus.FAILED,
processedAt: new Date(),
errorDetails: errorMessage,
},
});
} else {
// Not the final attempt, keep status as PROCESSING
// BullMQ will handle the retry state and delay based on backoff strategy.
// Update the error message on each attempt.
this.logger.log(`Job ${job.id} failed, will retry (attempt ${job.attemptsMade}/${maxAttempts}).`);
await this.prisma.message.update({
where: { id: messageId },
data: {
status: MessageStatus.PROCESSING, // Keep as PROCESSING during retry phase
errorDetails: `Attempt ${job.attemptsMade} failed: ${errorMessage}`
},
});
}
// Re-throw the error so BullMQ knows the job failed and should be retried (if attempts remain)
throw error;
}
}
// Optional: Listen to worker events for more detailed logging/monitoring
@OnWorkerEvent('active')
onActive(job: Job) {
this.logger.log(`Job ${job.id} has started processing (attempt ${job.attemptsMade}).`);
}
@OnWorkerEvent('completed')
onCompleted(job: Job, result: any) {
// Note: This logs successful completion *after* the 'process' method returns successfully.
this.logger.log(`Job ${job.id} has completed successfully.`);
}
@OnWorkerEvent('failed')
onFailed(job: Job<BroadcastJobData> | undefined, err: Error) {
// This event fires after the 'process' method throws an error, potentially before the final retry attempt.
// More detailed final failure logging is handled within the 'process' method's catch block.
this.logger.error(`Job ${job?.id ?? 'unknown'} failed on attempt ${job?.attemptsMade ?? 'N/A'} with error: ${err.message}`);
}
@OnWorkerEvent('error')
onError(err: Error) {
// General worker errors not tied to a specific job
this.logger.error('Worker error:', err.stack);
}
@OnWorkerEvent('stalled')
onStalled(jobId: string) {
// Indicates a job was marked as active but did not complete within a timeout
this.logger.warn(`Job ${jobId} has stalled.`);
}
}@Processor('broadcast-sms'): Decorator linking this class to the specified queue.process(job: Job): The core method BullMQ calls for each job. It receives thejobobject containing your data (recipient,messageText,messageId).- Injects
VonageService,PrismaService, andConfigService. - Updates the message status in the database (
PENDING→PROCESSING→SENT/FAILED). Increments attempt count. - Handles errors from
VonageServiceand general processing errors. - Throws errors to let BullMQ handle retries based on the queue's
defaultJobOptions. Updates DB status accordingly:FAILEDon final failure, keepsPROCESSINGduring intermediate retry attempts, storing the latest error. - Includes optional event listeners (
@OnWorkerEvent) for detailed logging.
3. Building the API Layer
This layer exposes an endpoint to receive broadcast requests and add jobs to the queue.
3.1 Create DTO (Data Transfer Object)
DTOs define the expected request body structure and enable automatic validation using class-validator.
Create src/broadcast/dto/create-broadcast.dto.ts:
// src/broadcast/dto/create-broadcast.dto.ts
import { IsNotEmpty, IsString, IsArray, ArrayNotEmpty, IsPhoneNumber } from 'class-validator';
// Optional: for Swagger documentation (install @nestjs/swagger if using)
// import { ApiProperty } from '@nestjs/swagger';
export class CreateBroadcastDto {
// @ApiProperty({
// description: 'List of recipient phone numbers in E.164 format (e.g., +14155552671)',
// example: ['+14155552671', '+447700900000'],
// type: [String],
// })
@IsArray()
@ArrayNotEmpty()
// Apply validation to each element in the array
// Note: IsPhoneNumber might require a specific region or allow generic E.164
// Consider a more robust validation library if needed (e.g., google-libphonenumber)
@IsPhoneNumber(undefined, { each: true, message: 'Each recipient must be a valid phone number in E.164 format' })
recipients: string[];
// @ApiProperty({
// description: 'The text message content to send.',
// example: 'Hello from our awesome service!',
// })
@IsString()
@IsNotEmpty()
message: string;
}- Uses decorators from
class-validatorto enforce rules (must be an array, not empty, must contain valid phone numbers, message must be a non-empty string). @ApiPropertyis commented out but can be used if you integrate Swagger.
3.2 Create Broadcast Service
This service handles the business logic: creating the broadcast record and adding individual message jobs to the queue.
Create src/broadcast/broadcast.service.ts:
// src/broadcast/broadcast.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { PrismaService } from '../prisma/prisma.service';
import { CreateBroadcastDto } from './dto/create-broadcast.dto';
import { Broadcast, MessageStatus } from '@prisma/client'; // Import generated types
@Injectable()
export class BroadcastService {
private readonly logger = new Logger(BroadcastService.name);
constructor(
@InjectQueue('broadcast-sms') private readonly broadcastQueue: Queue,
private readonly prisma: PrismaService,
) {}
async createBroadcast(createBroadcastDto: CreateBroadcastDto): Promise<Broadcast> {
const { recipients, message } = createBroadcastDto;
this.logger.log(`Received broadcast request for ${recipients.length} recipients.`);
// 1. Create the main Broadcast record
let broadcast: Broadcast;
try {
broadcast = await this.prisma.broadcast.create({
data: {
messageContent: message,
totalRecipients: recipients.length,
status: 'PENDING', // Initial status
},
});
this.logger.log(`Created broadcast record with ID: ${broadcast.id}`);
} catch (error) {
this.logger.error(`Failed to create broadcast record: ${error.message}`, error.stack);
throw new Error('Failed to initiate broadcast in database.');
}
// 2. Create individual Message records and add jobs to the queue
const messageJobsPromises = recipients.map(async (recipient) => {
try {
// Create placeholder message record in DB
const dbMessage = await this.prisma.message.create({
data: {
broadcastId: broadcast.id,
recipient: recipient,
status: MessageStatus.PENDING, // Initial status for message
},
});
// Add job to BullMQ
// The job data links back to the DB message record
await this.broadcastQueue.add(
'send-sms-job', // Job name (can be descriptive)
{
messageId: dbMessage.id, // Pass the DB ID
recipient: recipient,
messageText: message,
},
// Optional: Job-specific options like delay or priority can go here
);
return { success: true, recipient };
} catch (error) {
this.logger.error(`Failed to create message record or queue job for recipient ${recipient}: ${error.message}`, error.stack);
// Decide how to handle partial failures:
// - Continue processing others?
// - Rollback/cancel the broadcast? (More complex)
// Here, we log the error and continue with others.
return { success: false, recipient, error: error.message };
}
});
// Wait for all DB creations and queue additions to be initiated
const results = await Promise.all(messageJobsPromises);
const failedCount = results.filter(r => !r.success).length;
if (failedCount > 0) {
this.logger.warn(`Failed to queue ${failedCount} out of ${recipients.length} messages for broadcast ${broadcast.id}.`);
// Optionally update broadcast status or record partial failure details
}
// Optionally update broadcast status to 'QUEUED' or similar after adding all jobs
// try {
// await this.prisma.broadcast.update({
// where: { id: broadcast.id },
// data: { status: 'QUEUED' }, // Consider a more nuanced status if partial failures occurred
// });
// } catch (error) {
// this.logger.error(`Failed to update broadcast ${broadcast.id} status to QUEUED: ${error.message}`);
// }
this.logger.log(`Attempted to add ${recipients.length} message jobs to the queue for broadcast ${broadcast.id}. Failures: ${failedCount}.`);
return broadcast; // Return the created broadcast record
}
// Optional: Add methods to get broadcast status
async getBroadcastStatus(id: number): Promise<Broadcast | null> {
return this.prisma.broadcast.findUnique({
where: { id: id },
include: {
// Optionally include counts of message statuses
_count: {
select: { messages: true }
},
// Example: Include counts per status (more detailed)
// messages: {
// where: { status: MessageStatus.SENT },
// select: { _count: true } // This syntax might need adjustment based on Prisma version
// }
// A more practical approach might be separate queries for counts per status.
}
});
}
async getMessageStatus(id: number): Promise<any | null> { // Using 'any' for simplicity
const message = await this.prisma.message.findUnique({ where: { id: id }});
if (!message) return null;
// Optionally check job status in BullMQ if needed (more complex)
// Requires storing BullMQ job ID in the Message schema
// const job = await this.broadcastQueue.getJob(message.jobId);
return {
id: message.id,
recipient: message.recipient,
status: message.status,
attempts: message.attempts,
providerMessageId: message.providerMessageId,
error: message.errorDetails,
createdAt: message.createdAt,
processedAt: message.processedAt
};
}
}@InjectQueue('broadcast-sms'): Injects the BullMQ queue instance.- Injects
PrismaService. createBroadcast:- Takes the validated DTO.
- Creates a main
Broadcastrecord in the database. - Iterates through recipients:
- Creates an individual
Messagerecord (statusPENDING) in the database for tracking. - Adds a job to the
broadcastQueuewith themessageId(linking back to the DB record), recipient, and message text.
- Creates an individual
- Includes basic error handling for database operations and queue additions.
- Returns the created
Broadcastobject.
- Includes optional methods (
getBroadcastStatus,getMessageStatus) for querying the state of broadcasts and individual messages from the database.
Frequently Asked Questions About Bulk SMS Broadcasting with NestJS
How do I handle Vonage API rate limits in NestJS?
Use BullMQ's built-in rate limiter with the limiter configuration in your queue setup. Set max: 1 and duration: 1000 to process 1 job per second, respecting Vonage's carrier restrictions (1 request/second for certain number types). The default API limit is 30 requests/second, but carrier constraints often reduce this. BullMQ automatically throttles job processing to prevent API throttling errors.
What Node.js version should I use for this NestJS SMS application?
Use Node.js 22.x (Active LTS until October 2025) or Node.js 20.x (maintenance mode) for production. The article's Dockerfile uses Node.js 18, but upgrade to Node.js 22 for extended support (maintenance until April 2027). Node.js 18.x reaches end-of-life on April 30, 2025.
How does BullMQ handle failed SMS messages?
BullMQ automatically retries failed jobs based on your defaultJobOptions configuration. Set attempts: 3 for 3 retry attempts and backoff: { type: 'exponential', delay: 5000 } for exponential backoff (5s, 10s, 20s delays). The BroadcastProcessor tracks attempt counts in the database and marks messages as FAILED after exhausting all retries, storing error details for debugging.
What is the E.164 phone number format required by Vonage?
E.164 format is the international standard for phone numbers: starts with + followed by country code, then subscriber number, maximum 15 digits total. Example: US number (212) 123-1234 becomes +12121231234. The format removes spaces, hyphens, and parentheses. Use @IsPhoneNumber(undefined, { each: true }) in your DTO to validate E.164 format, or implement google-libphonenumber for robust validation and formatting.
Do I need 10DLC registration for US SMS?
Yes, 10DLC Brand and Campaign registration is mandatory for sending SMS to US recipients using standard long code numbers. Without registration, carriers will block your messages. 10DLC registration improves deliverability and throughput (up to 60 messages/second for verified campaigns vs. 1 message/second unverified). Register through your Vonage Dashboard.
<!-- GAP: Missing step-by-step 10DLC registration process details (Type: Substantive) --> <!-- GAP: Missing information about registration costs and timeline (Type: Substantive) -->How do I monitor BullMQ queue performance?
Install Bull Board for visual queue monitoring: npm install @bull-board/api @bull-board/nestjs @bull-board/express. Bull Board provides a web UI showing active jobs, completed jobs, failed jobs, job delays, and queue throughput. Mount it at /admin/queues in your NestJS app. Additionally, implement Prometheus metrics with @willsoto/nestjs-prometheus to track job processing rates, failure rates, and queue depth.
What database indexes should I create for Prisma SMS tracking?
Create indexes on frequently queried columns: broadcastId (for fetching all messages in a broadcast), status (for counting SENT/FAILED/PENDING messages), and createdAt (for time-based queries). Add @@index([broadcastId, status]) composite index in your Prisma schema for efficient broadcast status queries. Index recipient if you need to search by phone number or detect duplicates.
How do I handle webhook callbacks from Vonage?
Create a dedicated webhook endpoint (e.g., /webhooks/vonage/status) to receive Delivery Receipts (DLRs). Parse the webhook payload to extract message_uuid and delivery status (delivered, failed, rejected). Update your Message record status in the database using providerMessageId (stores the message_uuid). Validate webhook signatures using Vonage's signature verification to prevent spoofed requests.
Can I send MMS or WhatsApp messages with this architecture?
Yes, the Vonage Messages API supports SMS, MMS, WhatsApp, Viber, and Facebook Messenger through a unified interface. Modify the MessageSendRequest in VonageService.sendSms() to set channel: 'mms' or channel: 'whatsapp'. For MMS, add image: { url: 'https://...' } to the message object. For WhatsApp, ensure you have a verified WhatsApp Business Account and use template messages for initial contact.
How do I scale this application for millions of messages?
Horizontal scaling: Run multiple worker instances (separate containers) connected to the same Redis queue. Each worker processes jobs concurrently while BullMQ's distributed locking prevents duplicate processing. Vertical scaling: Increase BullMQ's concurrency setting in WorkerOptions to process multiple jobs per worker simultaneously. Database scaling: Use PostgreSQL read replicas for status queries, primary for writes. Consider sharding broadcasts by region or time period for very high volumes (100M+ messages/month).
Next Steps for Production-Ready Bulk SMS Broadcasting
Now that you've built your bulk SMS system, enhance it with these production features:
-
Implement Webhook Handler – Create
/webhooks/vonage/dlrendpoint to receive Delivery Receipts and update message status toDELIVEREDorFAILEDbased on carrier feedback. Verify webhook signatures with Vonage's JWT validation. -
Add Bull Board Monitoring – Install
@bull-board/nestjsto visualize queue performance, inspect failed jobs, manually retry jobs, and monitor throughput. Essential for debugging production issues. -
Configure Prometheus Metrics – Integrate
@willsoto/nestjs-prometheusto track custom metrics: messages sent per minute, failure rate by error type, average delivery time, queue depth. Export to Grafana for real-time dashboards. -
Implement Message Deduplication – Add Redis-based deduplication using
recipient + message contenthash with 24-hour TTL to prevent accidental duplicate sends during retries or API errors. -
Optimize Database Indexes – Add Prisma indexes on
@@index([broadcastId, status]),@@index([status, createdAt]), and@@index([providerMessageId])for fast status queries and webhook lookups. -
Add Rate Limit Overrides – Implement per-broadcast rate limit configuration to handle different Vonage number types (short codes: 100 msg/sec, long codes: 1 msg/sec, 10DLC: varies by campaign tier).
-
Implement Circuit Breaker – Use
@nestjs/circuit-breakerorcockatielto detect Vonage API outages and pause job processing automatically, preventing queue buildup and unnecessary retries during downtime. -
Configure Job Prioritization – Use BullMQ's
priorityoption to send urgent messages (OTPs, alerts) before marketing broadcasts. Higher priority (lower number) jobs process first. -
Add Message Template System – Create a Prisma
MessageTemplatemodel with variable substitution (Hello {{name}}, your code is {{code}}) to enable personalized bulk messages without storing full content per recipient. -
Implement Webhook Retry Logic – Configure
@nestjs/axioswith exponential backoff to retry failed webhook delivery for DLRs, ensuring your status tracking stays accurate even during temporary network issues.
Additional Resources:
- Vonage Messages API Documentation – Official API reference and authentication guides
- BullMQ Documentation – Advanced queue patterns, flows, and rate limiting strategies
- NestJS Queues Guide – Official NestJS integration with BullMQ
- @nestjs/bullmq npm Package – Latest version (11.0.3) and changelog
- Vonage 10DLC Registration Guide – US compliance requirements
- Prisma Best Practices – Query optimization and connection pooling
Frequently Asked Questions
How to send bulk SMS with NestJS?
Build a scalable bulk SMS application using NestJS, Vonage Messages API, and BullMQ. This setup allows you to create a NestJS API endpoint to handle requests, queue messages with BullMQ and Redis, and process them in the background with a worker that interacts with the Vonage API.
What is BullMQ used for in bulk SMS?
BullMQ, a Redis-based message queue, is crucial for handling background job processing, rate limiting, and retries in bulk SMS applications. It decouples the API request from the actual sending, enabling throttling and reliable message delivery even with temporary failures.
Why use a message queue for bulk SMS?
A message queue like BullMQ helps manage provider rate limits, ensuring deliverability and graceful failure handling. Without a queue, simply looping through recipients can lead to errors, blocked messages, and an unreliable system. Queues enable asynchronous processing and retries.
When should I use Vonage Messages API?
Use the Vonage Messages API when you need to send messages across various channels, including SMS, to a large audience. The provided example uses the official `@vonage/server-sdk` to interact with the API, sending text messages within the configured rate limits.
Can I track SMS message status?
Yes, the example demonstrates basic status tracking using Prisma and PostgreSQL. Individual messages are initially marked as PENDING, then PROCESSING, and finally SENT or FAILED. Vonage can optionally send Delivery Receipts (DLRs) via webhooks to update the message status further.
How to handle Vonage API rate limits?
The example leverages BullMQ's rate limiting feature. The worker processes a configurable number of jobs within a specific timeframe (e.g., one per second), ensuring compliance with Vonage's limits for different number types like long codes.
What is Prisma used for with PostgreSQL?
Prisma simplifies database access, migrations, and type safety with PostgreSQL. It's a modern database toolkit for TypeScript and Node.js that makes it easier to interact with your database and manage its schema.
How to containerize a NestJS SMS application?
The example provides a Dockerfile and docker-compose.yml to containerize the NestJS application, PostgreSQL database, and Redis instance. This ensures a consistent development and deployment environment and simplifies setup.
What are the prerequisites for this project?
Prerequisites include Node.js (LTS recommended), npm or yarn, Docker and Docker Compose, a Vonage API account, a Vonage phone number capable of sending SMS, and basic understanding of NestJS, TypeScript, APIs, and databases. You'll also need `curl` or Postman for testing.
How to handle failed SMS messages?
The application demonstrates error handling and automatic retries. BullMQ automatically requeues failed jobs after a backoff period. If the final attempt fails, the message status is updated to FAILED, and error details are logged.
How does the NestJS API handle broadcast requests?
The API receives recipient lists and message content via POST requests to /broadcasts. It then creates database records and adds individual jobs to the BullMQ queue for each recipient, allowing for asynchronous processing.
What is the role of Redis in the architecture?
Redis serves as the backend for BullMQ, storing the queued SMS sending jobs. It's an in-memory data structure store, providing speed and efficiency for the message queue operations.
What is the purpose of the Vonage private key?
The Vonage private key is used to authenticate the application with the Vonage Messages API. The file should be kept secure, mounted into the container read-only, and never committed to version control.
How to configure environment variables for Vonage?
Vonage credentials like API key, secret, application ID, sender ID, and private key path are stored in a .env file. This file should never be committed to version control to protect sensitive information.