code examples
code examples
MessageBird Bulk SMS Tutorial: Build Broadcasting System with Node.js & Express
Build a scalable bulk SMS system with MessageBird API, Express.js, and PostgreSQL. Handle 50-recipient batches, retries, and job tracking. Production-ready tutorial.
Build a production-ready bulk SMS broadcasting system using the MessageBird API with Node.js and Express.js. This tutorial shows you how to create a scalable SMS API that handles thousands of recipients, implements batch processing (50 recipients per batch), includes retry logic, and tracks delivery status with PostgreSQL.
You'll implement a complete bulk SMS solution with job tracking, webhook integration, error handling, and status monitoring – solving the challenge of reliably notifying large user groups without overwhelming your infrastructure or the MessageBird API.
Technologies Used:
- Node.js: JavaScript runtime for server-side applications
- Express.js: Minimal web application framework for Node.js
- MessageBird: Communication platform API for SMS, Voice, and WhatsApp (using SMS API and Node.js SDK)
- PostgreSQL: Open-source relational database for tracking jobs and recipient status
- Prisma: Modern database toolkit simplifying database access and migrations
- dotenv: Environment variable loader
- winston: Structured logging library
- async-retry: Retry logic implementation
- express-rate-limit: API rate limiting middleware
- helmet: Security headers middleware
- express-validator: Input validation middleware
System Architecture:
graph LR
A[Client/User] -- API Request (POST /api/broadcast/sms) --> B(Express API Server);
B -- Validate Request & Create Job --> C{Database (PostgreSQL)};
B -- Batch Recipients & Send (Async) --> D(Messaging Service);
D -- Uses MessageBird SDK --> E[MessageBird API];
E -- Sends SMS --> F((Recipients));
E -- Sends Status Webhook --> B;
B -- Updates Job/Recipient Status --> C;
G[Monitoring/Logging] -- Observes --> B;
G -- Observes --> C;(Note: Mermaid diagram rendering depends on the platform displaying this article.)
How It Works:
- Your client sends a POST request to
/api/broadcast/smswith recipients and message content - The Express API validates the request and creates a job in PostgreSQL
- The Messaging Service splits recipients into 50-person batches
- Each batch is sent to MessageBird API with exponential backoff retry logic
- MessageBird sends the SMS to recipients and returns delivery status
- Webhooks notify your server of delivery events
- The database updates job and recipient statuses
- Winston logs all operations for monitoring and debugging
Prerequisites:
Before starting, ensure you have:
- Node.js 14+ and npm (or yarn) installed on your system
- A MessageBird account with an API Access Key (get your key here)
- PostgreSQL 12+ database access
- Basic familiarity with Node.js, Express, and REST APIs
- Docker installed (optional, for easier database setup)
What You'll Build:
- A functional Express.js application with proper structure
- An API endpoint (
/api/broadcast/sms) for triggering bulk SMS campaigns - MessageBird integration sending SMS efficiently in 50-recipient batches
- Database persistence tracking broadcast jobs and recipient statuses
- Security measures including API key authentication, rate limiting, and input validation
- Robust error handling with exponential backoff retry logic
- Structured logging with Winston for monitoring and debugging
- Deployment-ready configuration with graceful shutdown
Time Estimate: 2–3 hours | Skill Level: Intermediate
Set Up Your Bulk SMS Broadcasting Project
Initialize your Node.js project and configure the basic structure and dependencies. By the end of this section, you'll have a complete project skeleton with all required dependencies and a running Express server.
-
Create Project Directory:
bashmkdir node-messagebird-bulk-sms cd node-messagebird-bulk-sms -
Initialize Node.js Project:
bashnpm init -y -
Install Core Dependencies:
bashnpm install express dotenv messagebird @prisma/client winston async-retry express-rate-limit helmet express-validatorexpress: Web frameworkdotenv: Environment variable loadermessagebird: Official MessageBird Node.js SDK@prisma/client: Prisma database clientwinston: Structured loggingasync-retry: Retry mechanismexpress-rate-limit: API rate limitinghelmet: Security headersexpress-validator: Input validation
-
Install Development Dependencies:
bashnpm install -D nodemon prisma jest supertest nocknodemon: Auto-restart server during developmentprisma: Prisma CLI tooljest,supertest,nock: Testing tools for unit, integration, and E2E tests
-
Set up Project Structure: Create the following directories and files:
textnode-messagebird-bulk-sms/ ├── prisma/ │ └── schema.prisma ├── src/ │ ├── controllers/ │ │ ├── broadcastController.js │ │ └── webhookController.js │ ├── routes/ │ │ ├── broadcastRoutes.js │ │ └── webhookRoutes.js │ ├── services/ │ │ └── messagingService.js │ ├── middleware/ │ │ ├── authMiddleware.js │ │ ├── errorMiddleware.js │ │ └── webhookVerificationMiddleware.js # Placeholder │ ├── utils/ │ │ ├── logger.js │ │ └── prismaClient.js │ └── app.js ├── tests/ # Optional │ └── services/ │ └── messagingService.test.js ├── .env ├── .gitignore ├── Dockerfile # Optional ├── package.json └── server.js
File Responsibilities:
| Directory/File | Purpose |
|---|---|
prisma/schema.prisma | Database schema definition and migrations |
src/controllers/ | Request handling and validation logic |
src/routes/ | API endpoint definitions and routing |
src/services/ | Business logic for SMS sending and batch processing |
src/middleware/ | Authentication, error handling, and webhook verification |
src/utils/ | Logging and database client configuration |
server.js | Application entry point with graceful shutdown |
src/app.js | Express configuration and middleware setup |
-
Configure
nodemonand Scripts: Update thescriptssection in yourpackage.json:json// package.json { // ... other configurations "scripts": { "start": "node server.js", "dev": "nodemon server.js", "test": "jest", "prisma:dev": "prisma migrate dev", "prisma:deploy": "prisma migrate deploy", "prisma:studio": "prisma studio", "prisma:generate": "prisma generate" } // ... } -
Create
.gitignore:text# .gitignore node_modules/ .env dist/ npm-debug.log* yarn-debug.log* yarn-error.log* coverage/ prisma/migrations/*.sql # Exclude generated SQL if desired -
Set up Basic Express Server (
server.js):javascript// server.js require('dotenv').config(); const http = require('http'); const app = require('./src/app'); const logger = require('./src/utils/logger'); const prisma = require('./src/utils/prismaClient'); const PORT = process.env.PORT || 3000; const server = http.createServer(app); server.listen(PORT, () => { console.log(`Server running on port ${PORT}`); }); // Graceful shutdown ensures active requests complete and database connections close properly const signals = { 'SIGHUP': 1, 'SIGINT': 2, 'SIGTERM': 15 }; Object.keys(signals).forEach((signal) => { process.on(signal, () => { logger.info(`Received ${signal}, closing HTTP server...`); server.close(() => { logger.info('HTTP server closed.'); prisma.$disconnect().then(() => { logger.info('Database connection closed.'); process.exit(128 + signals[signal]); }).catch(err => { logger.error('Error disconnecting database:', err); process.exit(1); }); }); }); }); -
Configure Express Application (
src/app.js):javascript// src/app.js const express = require('express'); const helmet = require('helmet'); const broadcastRoutes = require('./routes/broadcastRoutes'); const webhookRoutes = require('./routes/webhookRoutes'); // Add webhook routes const { errorMiddleware } = require('./middleware/errorMiddleware'); const logger = require('./utils/logger'); const prisma = require('./utils/prismaClient'); // Needed for health check const app = express(); // Security Middleware protects against XSS, clickjacking, MIME sniffing, etc. app.use(helmet()); // Configure 'trust proxy' if behind a load balancer/proxy (AWS ELB, nginx) for accurate rate limiting // app.set('trust proxy', 1); // Body Parsers app.use(express.json({ limit: '1mb' })); // Adjust limit as needed app.use(express.urlencoded({ extended: true })); // Request Logging Middleware app.use((req, res, next) => { logger.info(`${req.method} ${req.originalUrl}`, { ip: req.ip }); res.on('finish', () => { logger.info(`${res.statusCode} ${res.statusMessage}; ${res.get('Content-Length') || 0}b sent`, { url: req.originalUrl }); }); next(); }); // Health Check Endpoint app.get('/health', async (req, res) => { try { // Optional: Check DB connection await prisma.$queryRaw`SELECT 1`; res.status(200).json({ status: 'UP', timestamp: new Date().toISOString(), database: 'connected' }); } catch (dbError) { logger.error('Health check failed - database connection error:', dbError); res.status(503).json({ status: 'DOWN', timestamp: new Date().toISOString(), database: 'disconnected', error: dbError.message }); } }); // API Routes app.use('/api/broadcast', broadcastRoutes); app.use('/webhooks', webhookRoutes); // Register webhook routes // Centralized Error Handling Middleware (must be last) app.use(errorMiddleware); module.exports = app; -
Environment Variables (
.env):Create a
.envfile in the root directory. Add.envto.gitignoreto protect sensitive credentials.env# .env # Server Configuration PORT=3000 NODE_ENV=development # development, production LOG_LEVEL=info # debug, info, warn, error # MessageBird Configuration MESSAGEBIRD_API_KEY=YOUR_LIVE_MESSAGEBIRD_API_KEY MESSAGEBIRD_ORIGINATOR=YourSenderID MESSAGEBIRD_WEBHOOK_SIGNING_KEY=YOUR_MESSAGEBIRD_WEBHOOK_SECRET # For webhook verification # Database Configuration (Prisma standard) DATABASE_URL="postgresql://user:password@localhost:5432/bulk_sms?schema=public" # Security INTERNAL_API_KEY=a_very_secret_key_for_internal_use
How to Get Your MessageBird Credentials:
- Sign up at MessageBird Dashboard
- Navigate to Developers → API Access
- Copy your Live API Key (starts with
live_) - Configure your sender ID (alphanumeric, max 11 characters, or phone number)
- Generate a webhook signing key for webhook verification
Security Notes:
- Never commit
.envto version control - Use different API keys for development and production
- Rotate API keys regularly
- Use environment-specific secrets management in production (AWS Secrets Manager, HashiCorp Vault)
-
Setup Logging (
src/utils/logger.js):javascript// src/utils/logger.js const winston = require('winston'); const logger = winston.createLogger({ level: process.env.LOG_LEVEL || 'info', format: winston.format.combine( winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), winston.format.errors({ stack: true }), // Log stack traces winston.format.splat(), winston.format.json() // Essential for log aggregation tools ), defaultMeta: { service: 'bulk-sms-service' }, // Optional metadata transports: [ // In production, consider transports for file rotation or sending to log services // new winston.transports.File({ filename: 'error.log', level: 'error' }), // new winston.transports.File({ filename: 'combined.log' }), ], // Log unhandled exceptions and rejections exceptionHandlers: [ new winston.transports.Console({ // Also log exceptions to console format: winston.format.combine(winston.format.colorize(), winston.format.simple()) }), // new winston.transports.File({ filename: 'exceptions.log' }) ], rejectionHandlers: [ new winston.transports.Console({ // Also log rejections to console format: winston.format.combine(winston.format.colorize(), winston.format.simple()) }), // new winston.transports.File({ filename: 'rejections.log' }) ] }); // If we're not in production then log to the `console` with the format: // `${info.level}: ${info.message} JSON.stringify({ ...rest }) ` if (process.env.NODE_ENV !== 'production') { logger.add(new winston.transports.Console({ format: winston.format.combine( winston.format.colorize(), winston.format.printf(({ level, message, timestamp, stack, ...metadata }) => { let msg = `${timestamp} ${level}: ${message}`; if (stack) { msg += `\n${stack}`; } if (Object.keys(metadata).length > 0) { // Only stringify if metadata is not empty and not just defaultMeta const filteredMeta = Object.keys(metadata) .filter(key => key !== 'service') // Exclude defaultMeta if needed .reduce((obj, key) => { obj[key] = metadata[key]; return obj; }, {}); if (Object.keys(filteredMeta).length > 0) { msg += ` ${JSON.stringify(filteredMeta)}`; } } return msg; }) ) })); } module.exports = logger;
Log Levels:
error– System failures requiring immediate attentionwarn– Issues that don't stop execution but need reviewinfo– General application events (default)debug– Detailed diagnostic information
Production Logging: Integrate with CloudWatch, DataDog, or LogDNA by adding Winston transports for your chosen service.
-
Setup Prisma Client (
src/utils/prismaClient.js):javascript// src/utils/prismaClient.js const { PrismaClient } = require('@prisma/client'); const logger = require('./logger'); const prisma = new PrismaClient({ log: [ { emit: 'event', level: 'query' }, { emit: 'stdout', level: 'info' }, { emit: 'stdout', level: 'warn' }, { emit: 'stdout', level: 'error' }, ], }); // Optional: Log Prisma queries prisma.$on('query', (e) => { logger.debug(`Query: ${e.query}`, { params: e.params, duration: e.duration }); }); module.exports = prisma;
Implement the Bulk SMS Sending Service
This service handles the core SMS sending logic with batch processing, retry mechanisms, and database updates. The design splits large recipient lists into 50-person batches to comply with MessageBird API limits and implements exponential backoff for transient failures.
-
Create Messaging Service (
src/services/messagingService.js):javascript// src/services/messagingService.js const { initClient } = require('messagebird'); const retry = require('async-retry'); const prisma = require('../utils/prismaClient'); const logger = require('../utils/logger'); // Initialize MessageBird client let messagebird; try { if (!process.env.MESSAGEBIRD_API_KEY) { throw new Error('MESSAGEBIRD_API_KEY environment variable not set.'); } messagebird = initClient(process.env.MESSAGEBIRD_API_KEY); logger.info('MessageBird client initialized successfully.'); } catch (error) { logger.error('Failed to initialize MessageBird client:', error); // Throwing allows higher-level handling (e.g., prevent app start or log critical error) // process.exit(1); // Avoid process.exit in modules if possible throw error; // Let the main application handler decide how to proceed } // MessageBird allows up to 50 recipients per API call const BATCH_SIZE = 50; /** * Send SMS to a batch of recipients using MessageBird. * @param {string[]} batchRecipients - Recipient phone numbers (max 50) * @param {string} messageBody - SMS message content * @param {string} originator - Sender ID (phone number or alphanumeric, max 11 chars) * @returns {Promise<object>} - MessageBird API response */ async function sendBatch(batchRecipients, messageBody, originator) { const params = { originator: originator, recipients: batchRecipients, body: messageBody, // reference: Can be added here if needed for webhook correlation, e.g. `job-${jobId}-batch-${batchIndex}` // Ensure jobId and batchIndex are passed to sendBatch if using reference. }; return new Promise((resolve, reject) => { if (!messagebird) { // This should ideally not happen if initialization check is robust return reject(new Error('MessageBird client not initialized.')); } messagebird.messages.create(params, (err, response) => { if (err) { logger.error('MessageBird API Error:', { code: err.code, message: err.message, errors: err.errors }); // Attach recipient info to the error for better debugging in retry logic err.recipients = batchRecipients; return reject(err); } // The response.id here is typically for the overall batch request. // Individual message tracking often relies on webhooks carrying specific message IDs. logger.info(`Batch sent successfully via MessageBird. Batch Response ID: ${response?.id}`); resolve(response); // Contains details including recipient statuses }); }); } /** * Send SMS to multiple recipients with batch processing and retries. * @param {number} broadcastJobId - Broadcast job ID in database * @param {string[]} recipients - All recipient phone numbers * @param {string} messageBody - SMS message content * @param {string} originator - Sender ID * @returns {Promise<void>} */ async function sendBulkSms(broadcastJobId, recipients, messageBody, originator) { logger.info(`Starting bulk SMS send for job ID: ${broadcastJobId}. Total recipients: ${recipients.length}`, { jobId: broadcastJobId }); let successfulBatches = 0; let failedBatches = 0; for (let i = 0; i < recipients.length; i += BATCH_SIZE) { const batch = recipients.slice(i, i + BATCH_SIZE); const batchIndex = Math.floor(i / BATCH_SIZE) + 1; logger.info(`Processing batch ${batchIndex} with ${batch.length} recipients.`, { jobId: broadcastJobId, batch: batchIndex }); try { await retry( async (bail, attemptNumber) => { logger.info(`Attempt ${attemptNumber} for batch ${batchIndex}`, { jobId: broadcastJobId, batch: batchIndex, attempt: attemptNumber }); try { const response = await sendBatch(batch, messageBody, originator); // --- Database Update Logic --- if (response && response.recipients && response.recipients.items) { const updates = response.recipients.items.map(item => ({ where: { broadcastJobId_phoneNumber: { broadcastJobId: broadcastJobId, phoneNumber: String(item.recipient) // Ensure string comparison } }, data: { initialStatus: item.status, // e.g., 'sent', 'scheduled' statusTimestamp: item.statusDatetime ? new Date(item.statusDatetime) : new Date(), messageId: response.id // Store batch message ID for reference } })); if (updates.length > 0) { // Perform updates in bulk within a transaction const result = await prisma.$transaction( updates.map(update => prisma.recipientStatus.update(update)) ); logger.info(`Successfully processed batch ${batchIndex}. Updated initial status for ${result.length} recipients.`, { jobId: broadcastJobId, batch: batchIndex, count: result.length }); } else { logger.warn(`No recipient status updates to perform for batch ${batchIndex}.`, { jobId: broadcastJobId, batch: batchIndex }); } } else { logger.warn(`No recipient status details found in MessageBird response for batch ${batchIndex}.`, { jobId: broadcastJobId, batch: batchIndex, response }); } // --- End Database Update Logic --- } catch (error) { logger.warn(`Error sending batch ${batchIndex} on attempt ${attemptNumber}: ${error.message}`, { jobId: broadcastJobId, batch: batchIndex, attempt: attemptNumber, error }); // Decide which errors are retryable (e.g., network, temporary server errors) // MessageBird specific errors might need checking (e.g., 4xx are usually not retryable) const isRetryable = !error.statusCode || error.statusCode >= 500 || (error.code && ['ETIMEDOUT', 'ECONNRESET', 'ECONNREFUSED'].includes(error.code)); if (!isRetryable) { logger.error(`Unrecoverable error for batch ${batchIndex}. Stopping retries.`, { jobId: broadcastJobId, batch: batchIndex, error }); // Mark recipients as failed immediately try { await prisma.recipientStatus.updateMany({ where: { broadcastJobId: broadcastJobId, phoneNumber: { in: batch.map(String) }}, data: { initialStatus: 'failed', statusTimestamp: new Date(), errorMessage: `Non-retryable error: ${error.message}` } }); logger.info(`Marked ${batch.length} recipients as failed due to non-retryable error in batch ${batchIndex}.`, { jobId: broadcastJobId, batch: batchIndex }); } catch (dbError) { logger.error('Database error updating status for non-retryable batch failure:', { jobId: broadcastJobId, batch: batchIndex, dbError }); } bail(error); // Stop retrying via async-retry return; // Exit async block } throw error; // Throw error to signal retry library } }, { retries: 3, // Number of retries factor: 2, // Exponential backoff factor minTimeout: 1000, // Initial delay 1s maxTimeout: 5000, // Max delay 5s onRetry: (error, attemptNumber) => { logger.warn(`Retrying batch ${batchIndex} (Attempt ${attemptNumber}) due to error: ${error.message}`, { jobId: broadcastJobId, batch: batchIndex, attempt: attemptNumber }); } } ); successfulBatches++; } catch (error) { // This catch block executes only after all retries have failed for a batch logger.error(`Failed to send batch ${batchIndex} after multiple retries: ${error.message}`, { jobId: broadcastJobId, batch: batchIndex, error }); failedBatches++; // --- Database Error Update Logic (final failure) --- try { await prisma.recipientStatus.updateMany({ where: { broadcastJobId: broadcastJobId, phoneNumber: { in: batch.map(String) }}, data: { initialStatus: 'failed', statusTimestamp: new Date(), errorMessage: `Failed after retries: ${error.message}` } }); logger.warn(`Marked ${batch.length} recipients as failed for batch ${batchIndex} after retries.`, { jobId: broadcastJobId, batch: batchIndex, count: batch.length }); } catch (dbError) { logger.error('Database error during final batch failure update:', { jobId: broadcastJobId, batch: batchIndex, dbError }); } // --- End Database Error Update Logic --- // Continue to the next batch } // Optional: Add a small delay between batches if hitting external rate limits // await new Promise(resolve => setTimeout(resolve, 200)); } logger.info(`Finished processing all batches for job ID: ${broadcastJobId}. Successful: ${successfulBatches}, Failed: ${failedBatches}`, { jobId: broadcastJobId }); // Update overall job status - more sophisticated logic could analyze recipient statuses let finalJobStatus = 'processing_complete'; // Indicates sending attempts finished if (failedBatches > 0 && successfulBatches === 0) { finalJobStatus = 'failed'; // No batches succeeded } else if (failedBatches > 0) { finalJobStatus = 'partially_failed'; // Some batches failed } // A true 'completed' status might wait for all final delivery webhooks try { await prisma.broadcastJob.update({ where: { id: broadcastJobId }, data: { status: finalJobStatus, completedAt: new Date() } // Mark completion time }); logger.info(`Updated final job status for job ${broadcastJobId} to '${finalJobStatus}'`, { jobId: broadcastJobId, status: finalJobStatus }); } catch (dbError) { logger.error(`Failed to update final job status for job ${broadcastJobId}:`, { jobId: broadcastJobId, dbError }); } } module.exports = { sendBulkSms, };
Retry Configuration Explained:
| Parameter | Value | Why |
|---|---|---|
retries | 3 | Allows up to 4 total attempts (1 initial + 3 retries) |
factor | 2 | Exponential backoff: 1s → 2s → 4s delays |
minTimeout | 1000 ms | Initial delay before first retry |
maxTimeout | 5000 ms | Cap on delay to prevent excessive waits |
Error Classification:
- Retryable (5xx, network errors): Temporary failures, retry with backoff
- Non-retryable (4xx): Invalid phone numbers, authentication issues – fail immediately
Throughput: With 50-recipient batches and 3-second average API response, expect ~1,000 messages/minute. Adjust batch delays if you hit MessageBird rate limits.
Build the API Layer
Create RESTful endpoints for triggering SMS broadcasts and checking job status. The API follows standard REST conventions: POST for creating jobs, GET for retrieving status.
-
Create Authentication Middleware (
src/middleware/authMiddleware.js):javascript// src/middleware/authMiddleware.js const logger = require('../utils/logger'); function apiKeyAuth(req, res, next) { const apiKey = req.headers['x-api-key']; const expectedApiKey = process.env.INTERNAL_API_KEY; if (!expectedApiKey) { logger.error('INTERNAL_API_KEY is not set in environment variables. API is insecure.'); // Avoid exposing internal configuration details in the response return res.status(500).json({ message: 'Internal Server Error' }); } if (!apiKey || apiKey !== expectedApiKey) { logger.warn('Unauthorized API access attempt denied.', { ip: req.ip, keyProvided: !!apiKey }); return res.status(401).json({ message: 'Unauthorized' }); // Keep it simple } next(); // API Key is valid } module.exports = { apiKeyAuth };
Why API Key Authentication? This tutorial uses simple API key authentication for clarity. For production systems handling sensitive data, consider JWT tokens or OAuth 2.0. Add rate limiting with express-rate-limit to prevent abuse:
const rateLimit = require('express-rate-limit');
const limiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100 // limit each IP to 100 requests per window
});
app.use('/api/broadcast', limiter);-
Create Broadcast Controller (
src/controllers/broadcastController.js):javascript// src/controllers/broadcastController.js const { validationResult, body } = require('express-validator'); const { sendBulkSms } = require('../services/messagingService'); const prisma = require('../utils/prismaClient'); const logger = require('../utils/logger'); // Validation rules const validateBroadcastRequest = [ body('recipients').isArray({ min: 1 }).withMessage('Recipients must be a non-empty array.'), // Validate E.164 format: +[country code][number], 7–15 digits total body('recipients.*') .isString().withMessage('Each recipient must be a string.') .trim() .matches(/^\+[1-9]\d{1,14}$/).withMessage('Invalid phone number. Use E.164 format (e.g., +14155552671).'), body('messageBody').isString().trim().notEmpty().withMessage('Message body cannot be empty.'), body('originator').optional().isString().trim().notEmpty().isLength({ max: 11 }).withMessage('Originator must be a non-empty string up to 11 characters if provided.') ]; /** * Handle POST request to send bulk SMS. */ async function handleSendBulkSms(req, res, next) { const errors = validationResult(req); if (!errors.isEmpty()) { logger.warn('Broadcast request validation failed.', { errors: errors.array() }); return res.status(400).json({ errors: errors.array() }); } const { recipients, messageBody } = req.body; // Use originator from request body or fallback to env var const originator = req.body.originator || process.env.MESSAGEBIRD_ORIGINATOR; if (!originator) { logger.error('Originator is missing. Configure MESSAGEBIRD_ORIGINATOR env var or provide in request.'); return res.status(400).json({ message: 'Originator must be provided either in the request body or as MESSAGEBIRD_ORIGINATOR environment variable.' }); } // Deduplicate recipients const uniqueRecipients = [...new Set(recipients.map(r => r.trim()))]; // Trim before deduplicating if (uniqueRecipients.length !== recipients.length) { logger.info(`Duplicate recipients removed. Original: ${recipients.length}, Unique: ${uniqueRecipients.length}`); } let broadcastJob; try { // 1. Create Broadcast Job and Recipient Status records in DB broadcastJob = await prisma.broadcastJob.create({ data: { messageBody: messageBody, originator: originator, status: 'pending', // Initial status totalRecipients: uniqueRecipients.length, recipients: { create: uniqueRecipients.map(phone => ({ phoneNumber: String(phone), // Ensure string initialStatus: 'pending' })) } }, // include: { recipients: true } // Avoid including all recipients by default }); logger.info(`Created broadcast job ${broadcastJob.id} with ${uniqueRecipients.length} unique recipients.`, { jobId: broadcastJob.id }); // 2. Trigger async sending (DO NOT await here) // Use setImmediate or process.nextTick to ensure response is sent before heavy processing starts setImmediate(() => { sendBulkSms(broadcastJob.id, uniqueRecipients, messageBody, originator) .catch(error => { // Catch errors specifically from the async sendBulkSms invocation logger.error(`Background processing initiation failed for job ${broadcastJob.id}:`, { jobId: broadcastJob.id, error }); // Update job status to reflect failure during processing start prisma.broadcastJob.update({ where: { id: broadcastJob.id }, data: { status: 'failed_to_start', errorMessage: error.message || 'Unknown error during async start' } }).catch(dbError => logger.error('DB error updating job status on async start failure:', { jobId: broadcastJob.id, dbError })); }); }); // 3. Respond immediately res.status(202).json({ message: 'Broadcast job accepted and is being processed.', jobId: broadcastJob.id, status: 'pending', recipientCount: uniqueRecipients.length }); } catch (error) { logger.error('Error creating broadcast job or initiating send:', { error }); // Handle potential DB errors during job creation if (broadcastJob && broadcastJob.id) { // Attempt to mark the job as failed if it was partially created try { await prisma.broadcastJob.update({ where: { id: broadcastJob.id }, data: { status: 'failed', errorMessage: 'Failed during job creation/initiation: ' + error.message } }); } catch (dbUpdateError) { logger.error('DB error updating job status after initial creation failure:', { jobId: broadcastJob.id, dbUpdateError }); } } next(error); // Pass to central error handler } } /** * Handle GET request to check job status. */ async function handleGetJobStatus(req, res, next) { const jobId = parseInt(req.params.jobId, 10); if (isNaN(jobId)) { return res.status(400).json({ message: 'Invalid Job ID format.' }); } try { const job = await prisma.broadcastJob.findUnique({ where: { id: jobId }, include: { // Optionally include summary counts of recipient statuses _count: { select: { recipients: true } }, // Example: Include counts per status (more advanced) // recipients: { // select: { finalStatus: true }, // Assuming you have a finalStatus field updated by webhooks // } } }); if (!job) { logger.warn(`Job status requested for non-existent job ID: ${jobId}`); return res.status(404).json({ message: `Job with ID ${jobId} not found.` }); } // // Example: Aggregate recipient statuses if needed (can be slow for large jobs) // let statusSummary = {}; // if (job.recipients) { // statusSummary = job.recipients.reduce((acc, recipient) => { // const status = recipient.finalStatus || recipient.initialStatus || 'unknown'; // acc[status] = (acc[status] || 0) + 1; // return acc; // }, {}); // } res.status(200).json({ jobId: job.id, status: job.status, totalRecipients: job.totalRecipients, createdAt: job.createdAt, completedAt: job.completedAt, // recipientStatusCount: statusSummary, // Include aggregated counts if calculated recipientCount: job._count?.recipients, // Total count from _count errorMessage: job.errorMessage }); } catch (error) { logger.error(`Error fetching status for job ID ${jobId}:`, { jobId, error }); next(error); // Pass to central error handler } } module.exports = { validateBroadcastRequest, handleSendBulkSms, handleGetJobStatus };
Async Processing vs. Message Queues:
This tutorial uses setImmediate() for simplicity. For production systems with high volume:
- Use Bull or BullMQ with Redis for durable job queues
- Separate worker processes for better fault isolation
- Add job prioritization for urgent messages
- Implement dead letter queues for failed jobs
Pagination for Large Jobs: For jobs with 10,000+ recipients, implement pagination on the status endpoint:
const page = parseInt(req.query.page) || 1;
const limit = parseInt(req.query.limit) || 100;
const skip = (page - 1) * limit;
const recipients = await prisma.recipientStatus.findMany({
where: { broadcastJobId: jobId },
skip,
take: limit
});