code examples

Sent logo
Sent TeamMay 3, 2025 / code examples / Article

Build Bulk SMS Broadcasting with NestJS, Vonage Messages API & BullMQ (2025 Guide)

Complete guide to building scalable bulk SMS broadcasting using NestJS, Vonage Messages API, and BullMQ. Includes rate limiting, automatic retries, Docker containerization, Prisma database tracking, and production-ready error handling.

Build Bulk SMS Broadcasting with NestJS, Vonage Messages API & BullMQ

Build a robust bulk SMS broadcasting system that handles provider rate limits, ensures deliverability, and manages failures gracefully. Looping through recipients and calling an API directly leads to errors, blocked messages, and unreliable delivery.

This guide shows you how to build a scalable bulk SMS application using NestJS, the Vonage Messages API, and BullMQ for background job processing and rate limiting. You'll decouple the initial API request from the sending process, throttle message dispatch according to Vonage's rate limits (30 API requests per second default, carrier-restricted to 1 request/second for certain number types), and automatically retry failed attempts.

<!-- DEPTH: Introduction lacks specific problem statement with concrete failure scenarios (Priority: Medium) --> <!-- GAP: Missing cost implications/pricing considerations for bulk SMS (Type: Substantive) -->

What You'll Learn:

  • Set up NestJS with BullMQ, Redis, Prisma, and PostgreSQL for production-grade SMS broadcasting
  • Integrate Vonage Messages API with proper authentication and error handling
  • Implement rate limiting to respect Vonage's API constraints (1-30 requests/second)
  • Build a background job processor with automatic retries and exponential backoff
  • Track message status in PostgreSQL using Prisma ORM (PENDING → PROCESSING → SENT/FAILED)
  • Containerize your application with Docker and Docker Compose
  • Handle 10DLC registration requirements for US SMS delivery

Prerequisites: Basic understanding of NestJS, TypeScript, APIs, databases, Docker, and message queues.

<!-- GAP: Missing specific version/experience level requirements (Type: Critical) --> <!-- DEPTH: Prerequisites too vague - should specify minimum knowledge thresholds (Priority: High) -->

Project Goals:

  • Create a NestJS API endpoint to accept bulk SMS broadcast requests (recipient list and message content).
  • Use BullMQ and Redis to queue individual SMS sending jobs.
  • Implement a queue processor (worker) that sends messages via the Vonage Messages API.
  • Throttle the worker to respect Vonage's rate limits (e.g., 1 message per second for long codes, adjustable).
  • Implement basic status tracking for broadcasts and individual messages using Prisma and PostgreSQL.
  • Provide robust error handling and automatic retries for failed messages.
  • Ensure secure handling of API credentials.
  • Containerize the application using Docker for easy setup and deployment.

Technologies Used:

  • NestJS: A progressive Node.js framework for building efficient, reliable, and scalable server-side applications. Its modular architecture is perfect for this type of project.
  • Vonage Messages API: A unified API for sending messages across various channels, including SMS. We use the official @vonage/server-sdk.
  • BullMQ: A robust, Redis-based message queue for Node.js. Essential for background processing, throttling, and retries.
  • Redis: An in-memory data structure store, used as the backend for BullMQ.
  • Prisma: A modern database toolkit for TypeScript and Node.js. Simplifies database access, migrations, and type safety with PostgreSQL.
  • PostgreSQL: A powerful, open-source object-relational database system.
  • Docker & Docker Compose: For containerizing the application and its dependencies (Redis, Postgres), ensuring consistent development and deployment environments.
  • TypeScript: Enhances JavaScript with static typing for better maintainability and fewer runtime errors.
<!-- EXPAND: Could benefit from technology comparison table showing why these were chosen over alternatives (Type: Enhancement) -->

System Architecture:

The system follows this general flow:

  1. A client sends a POST request to the NestJS API (/broadcasts) with a list of recipients and the message text.
  2. The NestJS API creates database records for the broadcast and individual messages (initially marked as PENDING).
  3. For each recipient, the API adds a job to the BullMQ queue (backed by Redis). This job contains the necessary details (recipient number, message text, database message ID).
  4. A separate NestJS Queue Worker process monitors the BullMQ queue.
  5. The Worker picks up jobs from the queue, respecting the configured rate limit (e.g., one job per second).
  6. For each job, the Worker updates the corresponding message status in the database to PROCESSING.
  7. The Worker calls the Vonage Messages API to send the SMS.
  8. Based on the Vonage API response:
    • On success, the Worker updates the message status in the database to SENT and records the Vonage message ID.
    • On failure, the Worker logs the error. If retries are configured and remaining, BullMQ will automatically requeue the job after a backoff period. If it's the final attempt, the Worker updates the message status to FAILED and records the error details.
  9. Optionally, Vonage can send status updates (Delivery Receipts - DLRs) via webhooks back to the NestJS API, which can further update the message status in the database (e.g., DELIVERED, FAILED).
<!-- EXPAND: Architecture section would benefit from a visual diagram (Type: Enhancement) --> <!-- DEPTH: Missing discussion of failure scenarios and edge cases in architecture flow (Priority: Medium) -->

Prerequisites:

<!-- GAP: Missing minimum hardware/system requirements (Type: Substantive) --> <!-- GAP: Missing information on Vonage account setup process and phone number purchase steps (Type: Critical) -->

Final Outcome:

You will have a containerized NestJS application with an API endpoint to initiate bulk SMS broadcasts. Messages will be reliably sent in the background, respecting Vonage rate limits, with status tracking and automatic retries.


1. Setting Up the Project

Initialize the NestJS project, set up the directory structure, install dependencies, and configure Docker.

1.1 Initialize NestJS Project

Open your terminal and run the NestJS CLI command:

bash
npm install -g @nestjs/cli
nest new vonage-bulk-sms --package-manager npm
cd vonage-bulk-sms
<!-- DEPTH: Missing explanation of flags and options for nest new command (Priority: Low) -->

1.2 Project Structure

Organize your code into modules for better separation of concerns:

src/ ├── app.module.ts # Root application module ├── app.controller.ts ├── app.service.ts ├── main.ts # Application entry point │ ├── broadcast/ # Handles broadcast requests & API │ ├── dto/ │ │ └── create-broadcast.dto.ts │ ├── broadcast.module.ts │ ├── broadcast.controller.ts │ └── broadcast.service.ts │ ├── queue/ # BullMQ configuration and processor │ ├── queue.module.ts │ └── broadcast.processor.ts # Processes jobs from the queue │ ├── vonage/ # Service for interacting with Vonage API │ ├── vonage.module.ts │ └── vonage.service.ts │ ├── prisma/ # Prisma service and configuration │ ├── prisma.module.ts │ └── prisma.service.ts │ └── config/ # Configuration management (optional but good practice) ├── config.module.ts └── configuration.ts <!-- EXPAND: Could add brief explanation of each module's responsibility and interaction patterns (Type: Enhancement) -->

1.3 Install Dependencies

Install the necessary packages for BullMQ, Vonage, Prisma, configuration, and validation:

bash
# Core Dependencies
npm install @nestjs/config class-validator class-transformer

# BullMQ & Redis
npm install @nestjs/bullmq bullmq ioredis

# Vonage
npm install @vonage/server-sdk

# Prisma & PostgreSQL Driver
npm install @prisma/client pg
npm install --save-dev prisma

# API Throttling & Security
npm install @nestjs/throttler throttler-storage-redis helmet
<!-- GAP: Missing package version compatibility matrix or peer dependency warnings (Type: Substantive) --> <!-- DEPTH: No explanation of what each package does and why it's needed (Priority: Medium) -->

1.4 Configure Environment Variables

Create a .env file in the project root. This file stores sensitive credentials and configuration. Never commit this file to version control.

ini
# .env

# Application
NODE_ENV=development
PORT=3000

# Vonage Credentials (Get from Vonage Dashboard -> API Settings)
VONAGE_API_KEY=YOUR_VONAGE_API_KEY
VONAGE_API_SECRET=YOUR_VONAGE_API_SECRET
VONAGE_APPLICATION_ID=YOUR_VONAGE_APPLICATION_ID # Get after creating an app
VONAGE_SENDER_ID=YOUR_VONAGE_PHONE_NUMBER # The number messages will be sent from
VONAGE_PRIVATE_KEY_PATH=./private.key # Path relative to project root

# Database (Connection string used by Prisma (read by the application inside the container))
DATABASE_URL="postgresql://user:password@postgres:5432/vonage_bulk_sms?schema=public"
# These POSTGRES_* variables are used by the Docker Compose service to initialize the container
POSTGRES_DB=vonage_bulk_sms
POSTGRES_USER=user
POSTGRES_PASSWORD=password

# Redis (Used by Docker Compose & BullMQ)
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_PASSWORD= # Leave empty if no password, or set one in docker-compose

# BullMQ Queue Config
BROADCAST_QUEUE_NAME=broadcast-sms
# Rate limit: 1 job processed every 1000ms (1/sec) - Adjust based on Vonage limits for your number type
BULLMQ_RATE_LIMIT_MAX=1
BULLMQ_RATE_LIMIT_DURATION=1000
BULLMQ_JOB_ATTEMPTS=3 # Retry failed jobs 3 times
BULLMQ_JOB_BACKOFF=5000 # Wait 5 seconds before the first retry

# API Rate Limiting
THROTTLE_TTL=60 # seconds
THROTTLE_LIMIT=20 # requests per TTL from the same IP
  • Obtaining Vonage Credentials: Get your API Key and Secret from the Vonage Dashboard. Generate the Application ID and private.key file when you create a Vonage Application for the Messages API. The Sender ID is your Vonage virtual number.
  • Database/Redis: These credentials match the ones you'll define in docker-compose.yml.
<!-- GAP: Missing step-by-step instructions for obtaining Vonage credentials and creating application (Type: Critical) --> <!-- GAP: Missing security best practices for credential management in production (Type: Critical) --> <!-- DEPTH: Insufficient explanation of rate limit values and how to determine optimal settings (Priority: High) -->

1.5 Configure Docker

Create a Dockerfile in the project root:

dockerfile
# Dockerfile
FROM node:18-alpine AS development

WORKDIR /usr/src/app

COPY package*.json ./
RUN npm install --only=development

COPY --chown=node:node . .

USER node

# ---

FROM node:18-alpine AS builder

WORKDIR /usr/src/app

COPY package*.json ./
# Install production dependencies and Prisma client generation dependencies
RUN npm install --only=production --ignore-scripts && npm install --only=development @prisma/client
COPY --chown=node:node . .

# Generate Prisma Client
RUN npx prisma generate

# Build the application
RUN npm run build

# Prune development dependencies
RUN npm prune --production

USER node

# ---

FROM node:18-alpine AS production

COPY --chown=node:node --from=builder /usr/src/app/node_modules ./node_modules
COPY --chown=node:node --from=builder /usr/src/app/dist ./dist
COPY --chown=node:node --from=builder /usr/src/app/prisma ./prisma
# Copy the generated Prisma client from the builder stage
COPY --chown=node:node --from=builder /usr/src/app/node_modules/.prisma ./node_modules/.prisma

# Copy private key if it exists at build time (alternative: mount at runtime)
# COPY --chown=node:node private.key ./private.key

WORKDIR /usr/src/app

# Expose port and define command
EXPOSE 3000
CMD ["node", "dist/main.js"]
<!-- DEPTH: Missing explanation of multi-stage build benefits and security considerations (Priority: Medium) --> <!-- GAP: No discussion of image size optimization or layer caching strategies (Type: Enhancement) -->

Create a docker-compose.yml file:

yaml
# docker-compose.yml
version: '3.8'

services:
  app:
    build:
      context: .
      target: production # Use the production stage from Dockerfile
    container_name: vonage_bulk_sms_app
    env_file:
      - .env # Load environment variables from .env file
    ports:
      - "${PORT:-3000}:3000" # Use PORT from .env, default to 3000
    depends_on:
      postgres:
        condition: service_healthy
      redis:
        condition: service_healthy
    volumes:
      - ./private.key:/usr/src/app/private.key:ro # Mount private key read-only
      # For development, you might mount src:
      # - ./src:/usr/src/app/src
      # - ./prisma:/usr/src/app/prisma
    networks:
      - vonage-net
    command: node dist/main.js # Explicitly start the built app

  postgres:
    image: postgres:15-alpine
    container_name: postgres_db
    environment:
      POSTGRES_DB: ${POSTGRES_DB}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432" # Map port for external tools if needed (e.g., pgAdmin)
    networks:
      - vonage-net
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB}"]
      interval: 10s
      timeout: 5s
      retries: 5

  redis:
    image: redis:7-alpine
    container_name: redis_cache
    # If you set REDIS_PASSWORD in .env, uncomment the command below
    # command: redis-server --requirepass ${REDIS_PASSWORD}
    ports:
      - "6379:6379" # Map port for external tools if needed
    volumes:
      - redis_data:/data
    networks:
      - vonage-net
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

volumes:
  postgres_data:
  redis_data:

networks:
  vonage-net:
    driver: bridge
  • This setup defines three services: your NestJS app, a PostgreSQL database, and a Redis instance.
  • It uses the .env file to configure containers.
  • It mounts the private.key file into the application container (ensure this file exists before running docker-compose up).
  • Includes basic health checks and explicit depends_on conditions.
<!-- GAP: Missing data persistence and backup strategy discussion (Type: Substantive) --> <!-- DEPTH: No explanation of network isolation or security implications of exposed ports (Priority: Medium) -->

Create a .dockerignore file to optimize build context:

# .dockerignore node_modules dist .git .env *.log

1.6 Initialize Prisma

Run the Prisma init command:

bash
npx prisma init --datasource-provider postgresql

This creates a prisma directory with a schema.prisma file and updates .env with a placeholder DATABASE_URL. Replace the placeholder DATABASE_URL in .env with the one you defined earlier, matching the Docker Compose setup (quote it if it contains special characters).

Modify prisma/schema.prisma in Section 6 when defining the data model.

<!-- GAP: Section 6 reference is invalid - no Section 6 exists with Prisma schema definition (Type: Critical) --> <!-- DEPTH: Missing complete Prisma schema definition for Broadcast and Message models (Priority: Critical) -->

1.7 Configure NestJS Modules

Import and configure the modules you'll use.

Update src/app.module.ts:

typescript
// src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { BullModule } from '@nestjs/bullmq';
import { ThrottlerModule, ThrottlerGuard } from '@nestjs/throttler';
import { ThrottlerStorageRedisService } from 'throttler-storage-redis';
import { APP_GUARD } from '@nestjs/core';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { BroadcastModule } from './broadcast/broadcast.module';
import { QueueModule } from './queue/queue.module';
import { VonageModule } from './vonage/vonage.module';
import { PrismaModule } from './prisma/prisma.module';

@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true, // Make ConfigService available globally
      envFilePath: '.env',
    }),
    BullModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: async (configService: ConfigService) => ({
        connection: {
          host: configService.get<string>('REDIS_HOST'),
          port: configService.get<number>('REDIS_PORT'),
          password: configService.get<string>('REDIS_PASSWORD') || undefined,
        },
      }),
      inject: [ConfigService],
    }),
    ThrottlerModule.forRootAsync({
      imports: [ConfigModule],
      inject: [ConfigService],
      useFactory: (configService: ConfigService) => ({
        storage: new ThrottlerStorageRedisService({
           host: configService.get<string>('REDIS_HOST'),
           port: configService.get<number>('REDIS_PORT'),
           password: configService.get<string>('REDIS_PASSWORD') || undefined,
           // Note: Ensure compatibility or check for alternative storage options if needed
        }),
        ttl: configService.get<number>('THROTTLE_TTL', 60),
        limit: configService.get<number>('THROTTLE_LIMIT', 20),
      }),
    }),
    PrismaModule, // Add PrismaModule
    VonageModule,  // Add VonageModule
    QueueModule,   // Add QueueModule
    BroadcastModule, // Add BroadcastModule
  ],
  controllers: [AppController],
  providers: [
    AppService,
    {
      provide: APP_GUARD, // Apply throttling globally
      useClass: ThrottlerGuard,
    },
  ],
})
export class AppModule {}

This root module now:

  • Loads environment variables globally using ConfigModule.
  • Configures the base BullMQ connection using Redis details from .env.
  • Configures global API rate limiting (ThrottlerModule) using Redis for distributed storage.
  • Imports your feature modules (which you'll create next).
<!-- GAP: Missing PrismaModule implementation code (Type: Critical) --> <!-- DEPTH: No explanation of module initialization order or dependency resolution (Priority: Medium) -->

2. Implementing Core Functionality (Queue & Worker)

Set up BullMQ to handle the SMS jobs and create the worker process that consumes these jobs.

2.1 Configure Queue Module

Create src/queue/queue.module.ts:

typescript
// src/queue/queue.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { BroadcastProcessor } from './broadcast.processor';
import { VonageModule } from '../vonage/vonage.module'; // Import VonageModule
import { PrismaModule } from '../prisma/prisma.module'; // Import PrismaModule

@Module({
  imports: [
    BullModule.registerQueueAsync({
      name: 'broadcast-sms', // Use queue name from config or define directly
      imports: [ConfigModule],
      useFactory: async (configService: ConfigService) => ({
        name: configService.get<string>('BROADCAST_QUEUE_NAME', 'broadcast-sms'),
        defaultJobOptions: {
          attempts: configService.get<number>('BULLMQ_JOB_ATTEMPTS', 3),
          backoff: {
            type: 'exponential',
            delay: configService.get<number>('BULLMQ_JOB_BACKOFF', 5000),
          },
          removeOnComplete: true, // Keep queue clean
          removeOnFail: false, // Keep failed jobs for inspection (optional)
        },
        limiter: { // Rate limiting configuration
          max: configService.get<number>('BULLMQ_RATE_LIMIT_MAX', 1),
          duration: configService.get<number>('BULLMQ_RATE_LIMIT_DURATION', 1000),
        },
      }),
      inject: [ConfigService],
    }),
    VonageModule, // Make VonageService available for injection
    PrismaModule, // Make PrismaService available
    ConfigModule, // Import ConfigModule to make ConfigService injectable here
  ],
  providers: [BroadcastProcessor], // Register the processor
  exports: [BullModule], // Export BullModule if needed in other modules (like BroadcastService)
})
export class QueueModule {}
  • BullModule.registerQueueAsync: Registers a specific queue named broadcast-sms.
  • defaultJobOptions: Sets default behavior for jobs added to this queue (attempts, backoff strategy).
  • limiter: Crucially, this configures BullMQ's built-in rate limiter. It ensures the BroadcastProcessor processes only BULLMQ_RATE_LIMIT_MAX jobs within BULLMQ_RATE_LIMIT_DURATION milliseconds, effectively throttling calls to the Vonage API.
  • We import VonageModule, PrismaModule, and ConfigModule to make their services injectable into the BroadcastProcessor.
<!-- DEPTH: Missing discussion of removeOnComplete vs removeOnFail tradeoffs (Priority: Medium) --> <!-- EXPAND: Could add examples of different backoff strategies and when to use each (Type: Enhancement) -->

2.2 Create Vonage Service

This service encapsulates interaction with the Vonage SDK.

Create src/vonage/vonage.service.ts:

typescript
// src/vonage/vonage.service.ts
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Vonage } from '@vonage/server-sdk';
import { MessageSendRequest } from '@vonage/messages';
import * as path from 'path';
import * as fs from 'fs';

@Injectable()
export class VonageService implements OnModuleInit {
  private readonly logger = new Logger(VonageService.name);
  private vonageClient: Vonage;

  constructor(private configService: ConfigService) {}

  onModuleInit() {
    try {
      const apiKey = this.configService.getOrThrow<string>('VONAGE_API_KEY');
      const apiSecret = this.configService.getOrThrow<string>('VONAGE_API_SECRET');
      const applicationId = this.configService.getOrThrow<string>('VONAGE_APPLICATION_ID');
      const privateKeyPath = this.configService.getOrThrow<string>('VONAGE_PRIVATE_KEY_PATH');

      // Ensure the private key path is handled correctly, might need adjustment
      // depending on how/where it's mounted or located in production.
      // This assumes it's relative to the project root where the app starts.
      const resolvedPrivateKeyPath = path.resolve(process.cwd(), privateKeyPath);

      // Check if file exists before initializing - helps debugging
      if (!fs.existsSync(resolvedPrivateKeyPath)) {
         this.logger.error(`Vonage private key not found at path: ${resolvedPrivateKeyPath}`);
         throw new Error(`Vonage private key not found at path: ${resolvedPrivateKeyPath}`);
      }

      this.vonageClient = new Vonage({
        apiKey: apiKey,
        apiSecret: apiSecret,
        applicationId: applicationId,
        privateKey: resolvedPrivateKeyPath, // Use resolved path
      });
      this.logger.log('Vonage SDK initialized successfully.');

    } catch (error) {
        this.logger.error(`Failed to initialize Vonage SDK: ${error.message}`, error.stack);
       // Depending on requirements, you might want the app to fail startup
       // or handle this state gracefully (e.g., disable sending).
       throw new Error(`Failed to initialize Vonage SDK: ${error.message}`);
    }
  }

  async sendSms(to: string, text: string): Promise<{ messageUuid: string | null; error?: any }> {
    const from = this.configService.getOrThrow<string>('VONAGE_SENDER_ID');
    const message: MessageSendRequest = {
      message_type: 'text',
      channel: 'sms',
      to: to,
      from: from,
      text: text,
    };

    try {
      this.logger.log(`Attempting to send SMS to ${to}`);
      const response = await this.vonageClient.messages.send(message);
      this.logger.log(`SMS sent successfully to ${to}, message_uuid: ${response.message_uuid}`);
      return { messageUuid: response.message_uuid };
    } catch (error) {
      // Attempt to extract meaningful error info from Vonage SDK error structure
      const errorDetails = error?.response?.data || error?.message || error;
      this.logger.error(`Failed to send SMS to ${to}: ${JSON.stringify(errorDetails)}`, error?.stack);
      // Return error details for the processor to handle
      return { messageUuid: null, error: errorDetails };
    }
  }
}

Create src/vonage/vonage.module.ts:

typescript
// src/vonage/vonage.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { VonageService } from './vonage.service';

@Module({
  imports: [ConfigModule], // Ensure ConfigService is available
  providers: [VonageService],
  exports: [VonageService], // Export for use in other modules
})
export class VonageModule {}
  • Initializes the Vonage SDK using credentials from .env. Includes path resolution and existence check for the private key.
  • Provides a sendSms method that wraps the SDK's messages.send call.
  • Includes logging for success and failure.
  • Returns both the message_uuid on success and potential error details on failure.
<!-- DEPTH: Missing explanation of different Vonage authentication methods (JWT vs API Key) (Priority: Medium) --> <!-- GAP: No discussion of message length limits and message segmentation (Type: Substantive) --> <!-- GAP: Missing error code mapping and retry strategy guidance (Type: Substantive) -->

2.3 Create the Queue Processor

This class defines how the broadcast-sms queue handles jobs.

Create src/queue/broadcast.processor.ts:

typescript
// src/queue/broadcast.processor.ts
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import { VonageService } from '../vonage/vonage.service';
import { PrismaService } from '../prisma/prisma.service'; // Import PrismaService
import { MessageStatus } from '@prisma/client'; // Import enum (define in schema.prisma later)
import { ConfigService } from '@nestjs/config'; // Import ConfigService

// Define the structure of the job data we expect
interface BroadcastJobData {
  messageId: number; // Link back to our database record
  recipient: string;
  messageText: string;
}

@Processor('broadcast-sms') // Connects to the queue named 'broadcast-sms'
export class BroadcastProcessor extends WorkerHost {
  private readonly logger = new Logger(BroadcastProcessor.name);

  constructor(
    private readonly vonageService: VonageService,
    private readonly prisma: PrismaService, // Inject PrismaService
    private readonly configService: ConfigService, // Inject ConfigService
  ) {
    super();
  }

  async process(job: Job<BroadcastJobData, any, string>): Promise<void> {
    this.logger.log(`Processing job ${job.id} for recipient ${job.data.recipient} (Attempt ${job.attemptsMade})`);

    const { messageId, recipient, messageText } = job.data;

    try {
      // Update status to PROCESSING in DB
      // Increment attempt count (job.attemptsMade is 1-based for current attempt)
      await this.prisma.message.update({
        where: { id: messageId },
        data: { status: MessageStatus.PROCESSING, attempts: job.attemptsMade },
      });

      const result = await this.vonageService.sendSms(recipient, messageText);

      if (result.messageUuid) {
        // Update status to SENT in DB
        await this.prisma.message.update({
          where: { id: messageId },
          data: {
            status: MessageStatus.SENT,
            providerMessageId: result.messageUuid,
            processedAt: new Date(),
            errorDetails: null, // Clear previous errors on success
          },
        });
        this.logger.log(`Job ${job.id} completed successfully. Vonage message_uuid: ${result.messageUuid}`);
      } else {
        // If sendSms indicates failure but doesn't throw (e.g., API error response)
        const errorMessage = `Vonage API failed to send SMS: ${JSON.stringify(result.error)}`;
        this.logger.warn(`Job ${job.id} failed sending via Vonage: ${errorMessage}`);
        // Throw error to trigger BullMQ retry mechanism
        throw new Error(errorMessage);
      }
    } catch (error) {
      const errorMessage = error instanceof Error ? error.message : 'Unknown error during processing';
      this.logger.error(`Job ${job.id} (attempt ${job.attemptsMade}) failed with error: ${errorMessage}`, error instanceof Error ? error.stack : undefined);

      // Check if it's the final attempt
      const maxAttempts = job.opts.attempts ?? this.configService.get<number>('BULLMQ_JOB_ATTEMPTS', 3);

      if (job.attemptsMade >= maxAttempts) {
        // Final attempt failed, mark as FAILED in DB
        this.logger.warn(`Job ${job.id} reached max attempts (${maxAttempts}) and failed permanently.`);
        await this.prisma.message.update({
          where: { id: messageId },
          data: {
            status: MessageStatus.FAILED,
            processedAt: new Date(),
            errorDetails: errorMessage,
          },
        });
      } else {
          // Not the final attempt, keep status as PROCESSING
          // BullMQ will handle the retry state and delay based on backoff strategy.
          // Update the error message on each attempt.
          this.logger.log(`Job ${job.id} failed, will retry (attempt ${job.attemptsMade}/${maxAttempts}).`);
          await this.prisma.message.update({
             where: { id: messageId },
             data: {
                 status: MessageStatus.PROCESSING, // Keep as PROCESSING during retry phase
                 errorDetails: `Attempt ${job.attemptsMade} failed: ${errorMessage}`
             },
          });
      }
      // Re-throw the error so BullMQ knows the job failed and should be retried (if attempts remain)
      throw error;
    }
  }

  // Optional: Listen to worker events for more detailed logging/monitoring
  @OnWorkerEvent('active')
  onActive(job: Job) {
    this.logger.log(`Job ${job.id} has started processing (attempt ${job.attemptsMade}).`);
  }

  @OnWorkerEvent('completed')
  onCompleted(job: Job, result: any) {
    // Note: This logs successful completion *after* the 'process' method returns successfully.
    this.logger.log(`Job ${job.id} has completed successfully.`);
  }

  @OnWorkerEvent('failed')
  onFailed(job: Job<BroadcastJobData> | undefined, err: Error) {
    // This event fires after the 'process' method throws an error, potentially before the final retry attempt.
    // More detailed final failure logging is handled within the 'process' method's catch block.
    this.logger.error(`Job ${job?.id ?? 'unknown'} failed on attempt ${job?.attemptsMade ?? 'N/A'} with error: ${err.message}`);
  }

  @OnWorkerEvent('error')
  onError(err: Error) {
    // General worker errors not tied to a specific job
    this.logger.error('Worker error:', err.stack);
  }

   @OnWorkerEvent('stalled')
    onStalled(jobId: string) {
        // Indicates a job was marked as active but did not complete within a timeout
        this.logger.warn(`Job ${jobId} has stalled.`);
    }
}
  • @Processor('broadcast-sms'): Decorator linking this class to the specified queue.
  • process(job: Job): The core method BullMQ calls for each job. It receives the job object containing your data (recipient, messageText, messageId).
  • Injects VonageService, PrismaService, and ConfigService.
  • Updates the message status in the database (PENDINGPROCESSINGSENT / FAILED). Increments attempt count.
  • Handles errors from VonageService and general processing errors.
  • Throws errors to let BullMQ handle retries based on the queue's defaultJobOptions. Updates DB status accordingly: FAILED on final failure, keeps PROCESSING during intermediate retry attempts, storing the latest error.
  • Includes optional event listeners (@OnWorkerEvent) for detailed logging.
<!-- DEPTH: Missing discussion of concurrency settings and how to scale workers (Priority: High) --> <!-- GAP: No explanation of stalled job recovery strategy or timeout configuration (Type: Substantive) --> <!-- GAP: Missing transaction handling for database updates (Type: Substantive) -->

3. Building the API Layer

This layer exposes an endpoint to receive broadcast requests and add jobs to the queue.

3.1 Create DTO (Data Transfer Object)

DTOs define the expected request body structure and enable automatic validation using class-validator.

Create src/broadcast/dto/create-broadcast.dto.ts:

typescript
// src/broadcast/dto/create-broadcast.dto.ts
import { IsNotEmpty, IsString, IsArray, ArrayNotEmpty, IsPhoneNumber } from 'class-validator';
// Optional: for Swagger documentation (install @nestjs/swagger if using)
// import { ApiProperty } from '@nestjs/swagger';

export class CreateBroadcastDto {
  // @ApiProperty({
  //   description: 'List of recipient phone numbers in E.164 format (e.g., +14155552671)',
  //   example: ['+14155552671', '+447700900000'],
  //   type: [String],
  // })
  @IsArray()
  @ArrayNotEmpty()
  // Apply validation to each element in the array
  // Note: IsPhoneNumber might require a specific region or allow generic E.164
  // Consider a more robust validation library if needed (e.g., google-libphonenumber)
  @IsPhoneNumber(undefined, { each: true, message: 'Each recipient must be a valid phone number in E.164 format' })
  recipients: string[];

  // @ApiProperty({
  //   description: 'The text message content to send.',
  //   example: 'Hello from our awesome service!',
  // })
  @IsString()
  @IsNotEmpty()
  message: string;
}
  • Uses decorators from class-validator to enforce rules (must be an array, not empty, must contain valid phone numbers, message must be a non-empty string).
  • @ApiProperty is commented out but can be used if you integrate Swagger.
<!-- DEPTH: Missing validation for message length limits and character encoding (Priority: High) --> <!-- GAP: No discussion of recipient list size limits or pagination strategy (Type: Substantive) --> <!-- EXPAND: Could add example of custom validators for specialized phone number formats (Type: Enhancement) -->

3.2 Create Broadcast Service

This service handles the business logic: creating the broadcast record and adding individual message jobs to the queue.

Create src/broadcast/broadcast.service.ts:

typescript
// src/broadcast/broadcast.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { PrismaService } from '../prisma/prisma.service';
import { CreateBroadcastDto } from './dto/create-broadcast.dto';
import { Broadcast, MessageStatus } from '@prisma/client'; // Import generated types

@Injectable()
export class BroadcastService {
  private readonly logger = new Logger(BroadcastService.name);

  constructor(
    @InjectQueue('broadcast-sms') private readonly broadcastQueue: Queue,
    private readonly prisma: PrismaService,
  ) {}

  async createBroadcast(createBroadcastDto: CreateBroadcastDto): Promise<Broadcast> {
    const { recipients, message } = createBroadcastDto;
    this.logger.log(`Received broadcast request for ${recipients.length} recipients.`);

    // 1. Create the main Broadcast record
    let broadcast: Broadcast;
    try {
      broadcast = await this.prisma.broadcast.create({
        data: {
          messageContent: message,
          totalRecipients: recipients.length,
          status: 'PENDING', // Initial status
        },
      });
      this.logger.log(`Created broadcast record with ID: ${broadcast.id}`);
    } catch (error) {
        this.logger.error(`Failed to create broadcast record: ${error.message}`, error.stack);
        throw new Error('Failed to initiate broadcast in database.');
    }


    // 2. Create individual Message records and add jobs to the queue
    const messageJobsPromises = recipients.map(async (recipient) => {
      try {
        // Create placeholder message record in DB
        const dbMessage = await this.prisma.message.create({
          data: {
            broadcastId: broadcast.id,
            recipient: recipient,
            status: MessageStatus.PENDING, // Initial status for message
          },
        });

        // Add job to BullMQ
        // The job data links back to the DB message record
        await this.broadcastQueue.add(
          'send-sms-job', // Job name (can be descriptive)
          {
            messageId: dbMessage.id, // Pass the DB ID
            recipient: recipient,
            messageText: message,
          },
          // Optional: Job-specific options like delay or priority can go here
        );
        return { success: true, recipient };
      } catch (error) {
          this.logger.error(`Failed to create message record or queue job for recipient ${recipient}: ${error.message}`, error.stack);
          // Decide how to handle partial failures:
          // - Continue processing others?
          // - Rollback/cancel the broadcast? (More complex)
          // Here, we log the error and continue with others.
          return { success: false, recipient, error: error.message };
      }
    });

    // Wait for all DB creations and queue additions to be initiated
    const results = await Promise.all(messageJobsPromises);
    const failedCount = results.filter(r => !r.success).length;

    if (failedCount > 0) {
        this.logger.warn(`Failed to queue ${failedCount} out of ${recipients.length} messages for broadcast ${broadcast.id}.`);
        // Optionally update broadcast status or record partial failure details
    }

    // Optionally update broadcast status to 'QUEUED' or similar after adding all jobs
    // try {
    //    await this.prisma.broadcast.update({
    //       where: { id: broadcast.id },
    //       data: { status: 'QUEUED' }, // Consider a more nuanced status if partial failures occurred
    //    });
    // } catch (error) {
    //     this.logger.error(`Failed to update broadcast ${broadcast.id} status to QUEUED: ${error.message}`);
    // }

    this.logger.log(`Attempted to add ${recipients.length} message jobs to the queue for broadcast ${broadcast.id}. Failures: ${failedCount}.`);

    return broadcast; // Return the created broadcast record
  }

   // Optional: Add methods to get broadcast status
   async getBroadcastStatus(id: number): Promise<Broadcast | null> {
    return this.prisma.broadcast.findUnique({
        where: { id: id },
        include: {
            // Optionally include counts of message statuses
            _count: {
                select: { messages: true }
            },
           // Example: Include counts per status (more detailed)
           // messages: {
           //    where: { status: MessageStatus.SENT },
           //    select: { _count: true } // This syntax might need adjustment based on Prisma version
           // }
           // A more practical approach might be separate queries for counts per status.
        }
    });
   }

    async getMessageStatus(id: number): Promise<any | null> { // Using 'any' for simplicity
     const message = await this.prisma.message.findUnique({ where: { id: id }});
     if (!message) return null;

     // Optionally check job status in BullMQ if needed (more complex)
     // Requires storing BullMQ job ID in the Message schema
     // const job = await this.broadcastQueue.getJob(message.jobId);

     return {
       id: message.id,
       recipient: message.recipient,
       status: message.status,
       attempts: message.attempts,
       providerMessageId: message.providerMessageId,
       error: message.errorDetails,
       createdAt: message.createdAt,
       processedAt: message.processedAt
     };
   }
}
  • @InjectQueue('broadcast-sms'): Injects the BullMQ queue instance.
  • Injects PrismaService.
  • createBroadcast:
    • Takes the validated DTO.
    • Creates a main Broadcast record in the database.
    • Iterates through recipients:
      • Creates an individual Message record (status PENDING) in the database for tracking.
      • Adds a job to the broadcastQueue with the messageId (linking back to the DB record), recipient, and message text.
    • Includes basic error handling for database operations and queue additions.
    • Returns the created Broadcast object.
  • Includes optional methods (getBroadcastStatus, getMessageStatus) for querying the state of broadcasts and individual messages from the database.
<!-- GAP: Missing BroadcastController implementation (Type: Critical) --> <!-- GAP: Missing BroadcastModule definition (Type: Critical) --> <!-- DEPTH: No discussion of transaction handling for partial broadcast failures (Priority: High) --> <!-- GAP: Missing duplicate detection strategy for repeated broadcasts (Type: Substantive) --> <!-- EXPAND: Could add example of batch insertion optimization for large recipient lists (Type: Enhancement) -->

Frequently Asked Questions About Bulk SMS Broadcasting with NestJS

How do I handle Vonage API rate limits in NestJS?

Use BullMQ's built-in rate limiter with the limiter configuration in your queue setup. Set max: 1 and duration: 1000 to process 1 job per second, respecting Vonage's carrier restrictions (1 request/second for certain number types). The default API limit is 30 requests/second, but carrier constraints often reduce this. BullMQ automatically throttles job processing to prevent API throttling errors.

<!-- DEPTH: FAQ answer lacks concrete examples of different rate limit configurations (Priority: Medium) -->

What Node.js version should I use for this NestJS SMS application?

Use Node.js 22.x (Active LTS until October 2025) or Node.js 20.x (maintenance mode) for production. The article's Dockerfile uses Node.js 18, but upgrade to Node.js 22 for extended support (maintenance until April 2027). Node.js 18.x reaches end-of-life on April 30, 2025.

How does BullMQ handle failed SMS messages?

BullMQ automatically retries failed jobs based on your defaultJobOptions configuration. Set attempts: 3 for 3 retry attempts and backoff: { type: 'exponential', delay: 5000 } for exponential backoff (5s, 10s, 20s delays). The BroadcastProcessor tracks attempt counts in the database and marks messages as FAILED after exhausting all retries, storing error details for debugging.

<!-- DEPTH: Missing examples of different backoff strategies and when to use each (Priority: Medium) -->

What is the E.164 phone number format required by Vonage?

E.164 format is the international standard for phone numbers: starts with + followed by country code, then subscriber number, maximum 15 digits total. Example: US number (212) 123-1234 becomes +12121231234. The format removes spaces, hyphens, and parentheses. Use @IsPhoneNumber(undefined, { each: true }) in your DTO to validate E.164 format, or implement google-libphonenumber for robust validation and formatting.

<!-- DEPTH: Answer contradicts itself - E.164 requires + prefix but says no + prefix (Priority: High) -->

Do I need 10DLC registration for US SMS?

Yes, 10DLC Brand and Campaign registration is mandatory for sending SMS to US recipients using standard long code numbers. Without registration, carriers will block your messages. 10DLC registration improves deliverability and throughput (up to 60 messages/second for verified campaigns vs. 1 message/second unverified). Register through your Vonage Dashboard.

<!-- GAP: Missing step-by-step 10DLC registration process details (Type: Substantive) --> <!-- GAP: Missing information about registration costs and timeline (Type: Substantive) -->

How do I monitor BullMQ queue performance?

Install Bull Board for visual queue monitoring: npm install @bull-board/api @bull-board/nestjs @bull-board/express. Bull Board provides a web UI showing active jobs, completed jobs, failed jobs, job delays, and queue throughput. Mount it at /admin/queues in your NestJS app. Additionally, implement Prometheus metrics with @willsoto/nestjs-prometheus to track job processing rates, failure rates, and queue depth.

<!-- GAP: Missing code example for Bull Board integration (Type: Substantive) -->

What database indexes should I create for Prisma SMS tracking?

Create indexes on frequently queried columns: broadcastId (for fetching all messages in a broadcast), status (for counting SENT/FAILED/PENDING messages), and createdAt (for time-based queries). Add @@index([broadcastId, status]) composite index in your Prisma schema for efficient broadcast status queries. Index recipient if you need to search by phone number or detect duplicates.

How do I handle webhook callbacks from Vonage?

Create a dedicated webhook endpoint (e.g., /webhooks/vonage/status) to receive Delivery Receipts (DLRs). Parse the webhook payload to extract message_uuid and delivery status (delivered, failed, rejected). Update your Message record status in the database using providerMessageId (stores the message_uuid). Validate webhook signatures using Vonage's signature verification to prevent spoofed requests.

<!-- GAP: Missing code example for webhook endpoint implementation (Type: Substantive) --> <!-- GAP: Missing webhook signature verification implementation details (Type: Critical) -->

Can I send MMS or WhatsApp messages with this architecture?

Yes, the Vonage Messages API supports SMS, MMS, WhatsApp, Viber, and Facebook Messenger through a unified interface. Modify the MessageSendRequest in VonageService.sendSms() to set channel: 'mms' or channel: 'whatsapp'. For MMS, add image: { url: 'https://...' } to the message object. For WhatsApp, ensure you have a verified WhatsApp Business Account and use template messages for initial contact.

<!-- EXPAND: Could add code examples for MMS and WhatsApp message sending (Type: Enhancement) -->

How do I scale this application for millions of messages?

Horizontal scaling: Run multiple worker instances (separate containers) connected to the same Redis queue. Each worker processes jobs concurrently while BullMQ's distributed locking prevents duplicate processing. Vertical scaling: Increase BullMQ's concurrency setting in WorkerOptions to process multiple jobs per worker simultaneously. Database scaling: Use PostgreSQL read replicas for status queries, primary for writes. Consider sharding broadcasts by region or time period for very high volumes (100M+ messages/month).

<!-- DEPTH: Scaling answer lacks specific concurrency values and performance benchmarks (Priority: Medium) --> <!-- GAP: Missing discussion of Redis clustering for high availability (Type: Substantive) -->

Next Steps for Production-Ready Bulk SMS Broadcasting

Now that you've built your bulk SMS system, enhance it with these production features:

  1. Implement Webhook Handler – Create /webhooks/vonage/dlr endpoint to receive Delivery Receipts and update message status to DELIVERED or FAILED based on carrier feedback. Verify webhook signatures with Vonage's JWT validation.

  2. Add Bull Board Monitoring – Install @bull-board/nestjs to visualize queue performance, inspect failed jobs, manually retry jobs, and monitor throughput. Essential for debugging production issues.

  3. Configure Prometheus Metrics – Integrate @willsoto/nestjs-prometheus to track custom metrics: messages sent per minute, failure rate by error type, average delivery time, queue depth. Export to Grafana for real-time dashboards.

  4. Implement Message Deduplication – Add Redis-based deduplication using recipient + message content hash with 24-hour TTL to prevent accidental duplicate sends during retries or API errors.

  5. Optimize Database Indexes – Add Prisma indexes on @@index([broadcastId, status]), @@index([status, createdAt]), and @@index([providerMessageId]) for fast status queries and webhook lookups.

  6. Add Rate Limit Overrides – Implement per-broadcast rate limit configuration to handle different Vonage number types (short codes: 100 msg/sec, long codes: 1 msg/sec, 10DLC: varies by campaign tier).

  7. Implement Circuit Breaker – Use @nestjs/circuit-breaker or cockatiel to detect Vonage API outages and pause job processing automatically, preventing queue buildup and unnecessary retries during downtime.

  8. Configure Job Prioritization – Use BullMQ's priority option to send urgent messages (OTPs, alerts) before marketing broadcasts. Higher priority (lower number) jobs process first.

  9. Add Message Template System – Create a Prisma MessageTemplate model with variable substitution (Hello {{name}}, your code is {{code}}) to enable personalized bulk messages without storing full content per recipient.

  10. Implement Webhook Retry Logic – Configure @nestjs/axios with exponential backoff to retry failed webhook delivery for DLRs, ensuring your status tracking stays accurate even during temporary network issues.

<!-- GAP: Missing complete Prisma schema definition referenced throughout the article (Type: Critical) --> <!-- GAP: Missing main.ts configuration and startup logic (Type: Critical) --> <!-- GAP: Missing testing strategy and example tests (Type: Substantive) --> <!-- GAP: Missing deployment instructions and environment-specific configuration (Type: Substantive) -->

Additional Resources:

Frequently Asked Questions

How to send bulk SMS with NestJS?

Build a scalable bulk SMS application using NestJS, Vonage Messages API, and BullMQ. This setup allows you to create a NestJS API endpoint to handle requests, queue messages with BullMQ and Redis, and process them in the background with a worker that interacts with the Vonage API.

What is BullMQ used for in bulk SMS?

BullMQ, a Redis-based message queue, is crucial for handling background job processing, rate limiting, and retries in bulk SMS applications. It decouples the API request from the actual sending, enabling throttling and reliable message delivery even with temporary failures.

Why use a message queue for bulk SMS?

A message queue like BullMQ helps manage provider rate limits, ensuring deliverability and graceful failure handling. Without a queue, simply looping through recipients can lead to errors, blocked messages, and an unreliable system. Queues enable asynchronous processing and retries.

When should I use Vonage Messages API?

Use the Vonage Messages API when you need to send messages across various channels, including SMS, to a large audience. The provided example uses the official `@vonage/server-sdk` to interact with the API, sending text messages within the configured rate limits.

Can I track SMS message status?

Yes, the example demonstrates basic status tracking using Prisma and PostgreSQL. Individual messages are initially marked as PENDING, then PROCESSING, and finally SENT or FAILED. Vonage can optionally send Delivery Receipts (DLRs) via webhooks to update the message status further.

How to handle Vonage API rate limits?

The example leverages BullMQ's rate limiting feature. The worker processes a configurable number of jobs within a specific timeframe (e.g., one per second), ensuring compliance with Vonage's limits for different number types like long codes.

What is Prisma used for with PostgreSQL?

Prisma simplifies database access, migrations, and type safety with PostgreSQL. It's a modern database toolkit for TypeScript and Node.js that makes it easier to interact with your database and manage its schema.

How to containerize a NestJS SMS application?

The example provides a Dockerfile and docker-compose.yml to containerize the NestJS application, PostgreSQL database, and Redis instance. This ensures a consistent development and deployment environment and simplifies setup.

What are the prerequisites for this project?

Prerequisites include Node.js (LTS recommended), npm or yarn, Docker and Docker Compose, a Vonage API account, a Vonage phone number capable of sending SMS, and basic understanding of NestJS, TypeScript, APIs, and databases. You'll also need `curl` or Postman for testing.

How to handle failed SMS messages?

The application demonstrates error handling and automatic retries. BullMQ automatically requeues failed jobs after a backoff period. If the final attempt fails, the message status is updated to FAILED, and error details are logged.

How does the NestJS API handle broadcast requests?

The API receives recipient lists and message content via POST requests to /broadcasts. It then creates database records and adds individual jobs to the BullMQ queue for each recipient, allowing for asynchronous processing.

What is the role of Redis in the architecture?

Redis serves as the backend for BullMQ, storing the queued SMS sending jobs. It's an in-memory data structure store, providing speed and efficiency for the message queue operations.

What is the purpose of the Vonage private key?

The Vonage private key is used to authenticate the application with the Vonage Messages API. The file should be kept secure, mounted into the container read-only, and never committed to version control.

How to configure environment variables for Vonage?

Vonage credentials like API key, secret, application ID, sender ID, and private key path are stored in a .env file. This file should never be committed to version control to protect sensitive information.