code examples
code examples
Building Asynchronous Tasks with NestJS, AWS SNS, and SQS
A guide to implementing scalable and reliable asynchronous tasks, notifications, and reminders in NestJS using AWS SNS and SQS for decoupling and background processing.
Project Overview and Goals
In modern applications, performing tasks asynchronously is crucial for maintaining responsiveness and scalability. Operations like sending email notifications, processing uploaded files, generating reports, or sending scheduled reminders shouldn't block the main application thread or delay user responses.
This guide demonstrates how to build a system using NestJS that offloads such tasks to a background worker process facilitated by AWS SNS and SQS.
What We Will Build:
- A NestJS application capable of publishing messages (events or commands) to an AWS SNS topic.
- An AWS SQS queue subscribed to the SNS topic, acting as a durable buffer for messages.
- A NestJS-based consumer service that polls the SQS queue, processes messages reliably, and stores processed message information in a simple SQLite database for idempotency/tracking.
- API endpoints to trigger message publishing.
Problem Solved:
- Decoupling: Separates the task initiation (e.g., user signup, reminder creation) from the task execution (e.g., sending a welcome email, dispatching the reminder notification).
- Scalability: SQS queues can handle large volumes of messages, and consumer instances can be scaled independently based on queue depth.
- Reliability: SQS ensures messages are stored durably until successfully processed. SNS provides reliable fan-out capabilities if multiple consumers need the same message.
- Improved Performance: Prevents long-running tasks from impacting the performance of user-facing APIs.
Technologies Involved:
- NestJS: A progressive Node.js framework for building efficient, reliable, and scalable server-side applications using TypeScript. Chosen for its modular architecture, dependency injection, and excellent TypeScript support.
- AWS SNS (Simple Notification Service): A managed pub/sub messaging service. Used here to publish events/messages to a central topic.
- AWS SQS (Simple Queue Service): A managed message queuing service. Used to subscribe to the SNS topic and reliably buffer messages for processing by our consumer.
- AWS SDK for JavaScript v3: Used within the NestJS application to interact with SNS and SQS APIs.
- TypeORM & SQLite: An ORM (Object-Relational Mapper) and a simple file-based database used for persisting processed message IDs to ensure idempotency.
sqs-consumer: A Node.js library simplifying the process of polling and processing messages from an SQS queue.
System Architecture:
graph LR
A[Client / API Request] --> B(NestJS API / Publisher);
B -- Publish Message --> C{AWS SNS Topic};
C -- Fan-out --> D[AWS SQS Queue];
E(NestJS Consumer Service) -- Polls Queue --> D;
E -- Processes Message --> F[(SQLite Database)];
E -- Deletes Message --> D;
style B fill:#f9f,stroke:#333,stroke-width:2px
style E fill:#f9f,stroke:#333,stroke-width:2px
style C fill:#FF9900,stroke:#333,stroke-width:2px
style D fill:#FF9900,stroke:#333,stroke-width:2px
style F fill:#ccf,stroke:#333,stroke-width:2px- Flow: A request triggers the NestJS Publisher, which sends a message to SNS. SNS fans it out to the subscribed SQS Queue. The NestJS Consumer polls the queue, retrieves a message, processes it (optionally interacting with a database), and finally deletes the message from the queue upon successful processing.
Prerequisites:
- Node.js (LTS version recommended) and npm/yarn installed.
- An AWS account with IAM user credentials (Access Key ID, Secret Access Key) configured with permissions for SNS (Publish) and SQS (ReceiveMessage, DeleteMessage, GetQueueAttributes).
- AWS CLI installed and configured (optional, but helpful for verification).
- Basic understanding of TypeScript and NestJS concepts (Modules, Services, Controllers).
- Docker (optional, for containerized development/deployment).
1. Setting up the Project
Let's bootstrap our NestJS project and install the necessary dependencies.
Step 1: Create a New NestJS Project
Open your terminal and run the NestJS CLI command:
npx @nestjs/cli new nestjs-sns-sqs-remindersChoose your preferred package manager (npm or yarn). When prompted, navigate into the newly created directory:
cd nestjs-sns-sqs-remindersStep 2: Install Dependencies
We need several packages for AWS integration, database interaction, configuration, and SQS polling:
# AWS SDK v3 Clients for SNS and SQS
npm install @aws-sdk/client-sns @aws-sdk/client-sqs
# SQS Consumer library
npm install sqs-consumer
# TypeORM for database interaction and SQLite driver
npm install @nestjs/typeorm typeorm sqlite3
# Configuration management and UUID generation
npm install @nestjs/config uuid dotenv
npm install --save-dev @types/uuid # Types for uuidStep 3: Project Structure
NestJS promotes a modular structure. We'll create dedicated modules for SNS publishing and SQS consuming logic. Your src directory might eventually look something like this:
src/
├── app.controller.ts
├── app.module.ts
├── app.service.ts
├── main.ts
├── config/ # Configuration setup (optional, but good practice)
│ └── configuration.ts
├── database/ # Database related files
│ ├── entities/
│ │ └── processed-message.entity.ts
│ └── daos/
│ └── processed-message.dao.ts
├── sns/ # SNS Publisher module
│ ├── sns.module.ts
│ ├── sns.publisher.service.ts
│ └── sns.controller.ts # Example controller to trigger publishing
└── sqs/ # SQS Consumer module
├── sqs.module.ts
├── sqs.consumer.service.ts
└── utils/
└── constants.ts # Shared constants like MessageSourceTypeWe'll create these files as we go.
2. AWS Setup: Creating SNS Topic and SQS Queue
Before writing code, we need the necessary AWS resources.
Step 1: Create an SQS Queue
- Log in to the AWS Management Console.
- Navigate to the Simple Queue Service (SQS) console.
- Click Create queue.
- Type: Choose Standard. Standard queues offer higher throughput and at-least-once delivery, suitable for many reminder/task scenarios. FIFO queues guarantee order and exactly-once processing but have lower throughput limits.
- Name: Enter a descriptive name, e.g.,
nestjs-reminders-queue. - Configuration: Review default settings like Visibility timeout (e.g., 30 seconds – how long a message is hidden after being received before it becomes visible again if not deleted) and Message retention period (e.g., 4 days). Adjust if needed.
- Access policy: We will configure this in the next step via the SNS subscription, so leave the basic setting for now.
- Click Create queue.
- Important: Once created, note down the Queue URL and the ARN from the queue details page. You'll need these for your
.envfile and the SNS subscription.
Step 2: Create an SNS Topic
- Navigate to the Simple Notification Service (SNS) console.
- Click Topics in the left navigation pane.
- Click Create topic.
- Type: Choose Standard. Matches the SQS queue type.
- Name: Enter a descriptive name, e.g.,
nestjs-tasks-topic. - Leave other settings as default for now.
- Click Create topic.
- Important: Note down the Topic ARN.
Step 3: Subscribe SQS Queue to SNS Topic
- Go to the details page of the SNS Topic you just created (
nestjs-tasks-topic). - Scroll down to the Subscriptions tab and click Create subscription.
- Topic ARN: Should be pre-filled.
- Protocol: Select Amazon SQS.
- Endpoint: Paste the ARN of the SQS queue (
nestjs-reminders-queue) you created earlier. - Enable raw message delivery: It's often simpler to enable this. If enabled, the exact message you publish to SNS is delivered to SQS without SNS metadata wrapping. If disabled (default), the SQS message body will be a JSON structure containing the SNS message details. We'll assume raw delivery is enabled for simplicity in the primary consumer code path shown in this guide.
- Click Create subscription.
Step 4: Grant SNS Permission to Send to SQS (Crucial!)
SQS needs to allow the SNS topic to send messages to it.
- Go back to the SQS Console and select your queue (
nestjs-reminders-queue). - Go to the Access policy tab and click Edit.
- Use the Policy generator or manually add a statement to the existing policy's
Statementarray. The goal is to add a statement like this (replace placeholders):
{
""Sid"": ""AllowSNSPublishToSQS"",
""Effect"": ""Allow"",
""Principal"": {
""Service"": ""sns.amazonaws.com""
},
""Action"": ""SQS:SendMessage"",
""Resource"": ""YOUR_SQS_QUEUE_ARN"",
""Condition"": {
""ArnEquals"": {
""aws:SourceArn"": ""YOUR_SNS_TOPIC_ARN""
}
}
}- Ensure the final policy JSON is valid. Replace
YOUR_SQS_QUEUE_ARNwith your actual Queue ARN andYOUR_SNS_TOPIC_ARNwith your actual Topic ARN. - Click Save.
- Why this policy? It explicitly grants the SNS service (
sns.amazonaws.com) permission to perform theSQS:SendMessageaction on your specific SQS queue resource, but only if the message originates from your specific SNS topic. This follows the principle of least privilege.
3. Environment Configuration
Securely manage your AWS credentials and resource identifiers using environment variables.
Step 1: Create .env File
In the root directory of your project, create a file named .env:
# .env
# AWS Credentials - Use IAM user keys, NOT root keys
AWS_ACCESS_KEY_ID=YOUR_AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY=YOUR_AWS_SECRET_ACCESS_KEY
AWS_REGION=us-east-1 # Or your preferred AWS region
# AWS Resource Identifiers
SNS_TOPIC_ARN=YOUR_SNS_TOPIC_ARN # ARN copied from SNS console
SQS_QUEUE_URL=YOUR_SQS_QUEUE_URL # URL copied from SQS console- Replace placeholders with the actual values obtained from your IAM user configuration and the AWS console (SNS Topic ARN, SQS Queue URL).
- Security: Add
.envto your.gitignorefile immediately to prevent committing credentials.
Step 2: Load Environment Variables using ConfigModule
NestJS provides a ConfigModule to load and manage environment variables. Modify your main application module (src/app.module.ts):
// src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { AppController } from './app.controller';
import { AppService } from './app.service';
// Import other modules later (SnsModule, SqsModule, TypeOrmModule)
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true, // Make ConfigService available globally
envFilePath: '.env', // Specify the env file path
}),
// ... other modules will be added here
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}isGlobal: truemakes theConfigServiceinjectable in any module without needing to importConfigModuleeverywhere.envFilePath: '.env'explicitly tells the module where to find your variables.
4. Database Setup (SQLite & TypeORM)
We'll use a simple SQLite database via TypeORM to keep track of processed message IDs. This helps prevent processing the same message multiple times if errors occur or if SQS delivers a message more than once (which can happen with Standard queues).
Step 1: Define the Entity
Create an entity representing a processed message record.
// src/database/entities/processed-message.entity.ts
import { Entity, PrimaryGeneratedColumn, Column, CreateDateColumn, Index } from 'typeorm';
@Entity({ name: 'processed_messages' })
export class ProcessedMessage {
@PrimaryGeneratedColumn()
id: number;
@Index({ unique: true }) // Ensure messageId is unique
@Column({ type: 'varchar', length: 255 })
messageId: string; // The unique ID from the SQS message
@Column({ type: 'text' })
messageBody: string; // Store the original message body for reference
@Column({ type: 'varchar', length: 50, nullable: true })
sourceType?: string; // e.g., 'SNS', 'SQS' - optional tracking
@CreateDateColumn()
processedAt: Date;
}- We add a
uniqueindex onmessageIdto leverage the database for enforcing idempotency.
Step 2: Create the Data Access Object (DAO)
Create a service to handle database interactions for this entity.
// src/database/daos/processed-message.dao.ts
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, QueryFailedError } from 'typeorm';
import { ProcessedMessage } from '../entities/processed-message.entity';
@Injectable()
export class ProcessedMessageDao {
private readonly logger = new Logger(ProcessedMessageDao.name);
constructor(
@InjectRepository(ProcessedMessage)
private readonly repository: Repository<ProcessedMessage>,
) {}
async recordMessageProcessed(
messageId: string,
messageBody: string,
sourceType?: string,
): Promise<ProcessedMessage | null> {
try {
const newMessage = this.repository.create({
messageId,
messageBody,
sourceType,
});
return await this.repository.save(newMessage);
} catch (error) {
// Check if it's a unique constraint violation (already processed)
// Note: Specific error code ('SQLITE_CONSTRAINT') might differ for other DBs (e.g., '23505' for PostgreSQL)
if (error instanceof QueryFailedError && error.driverError?.code === 'SQLITE_CONSTRAINT') {
this.logger.warn(`Message ID ${messageId} has already been processed.`);
return null; // Indicate already processed
}
this.logger.error(`Failed to save processed message ${messageId}: ${error.message}`, error.stack);
throw error; // Re-throw other errors
}
}
async hasBeenProcessed(messageId: string): Promise<boolean> {
const count = await this.repository.count({ where: { messageId } });
return count > 0;
}
}- The
recordMessageProcessedmethod attempts to save the record. If it fails due to the unique constraint onmessageId, it means the message was already processed, and it returnsnull. Other errors are re-thrown. hasBeenProcessedprovides a direct check.
Step 3: Configure TypeORM Module
Update src/app.module.ts to configure TypeORM and make the ProcessedMessage entity and DAO available.
// src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { TypeOrmModule } from '@nestjs/typeorm'; // Import TypeOrmModule
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ProcessedMessage } from './database/entities/processed-message.entity'; // Import Entity
import { ProcessedMessageDao } from './database/daos/processed-message.dao'; // Import DAO
// Import other modules later (SnsModule, SqsModule)
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
envFilePath: '.env',
}),
TypeOrmModule.forRootAsync({ // Use forRootAsync for dependency injection
imports: [ConfigModule], // Import ConfigModule here
useFactory: (configService: ConfigService) => ({
type: 'sqlite',
database: 'processed_messages.db', // Name of the SQLite file
entities: [ProcessedMessage], // Register the entity
synchronize: true, // DEV ONLY: Auto-creates schema. Disable in prod and use migrations.
logging: configService.get<string>('NODE_ENV') !== 'production', // Log SQL in dev
}),
inject: [ConfigService], // Inject ConfigService into the factory
}),
TypeOrmModule.forFeature([ProcessedMessage]), // Make repository available
// ... other modules will be added here
],
controllers: [AppController],
providers: [AppService, ProcessedMessageDao], // Provide the DAO
exports: [ProcessedMessageDao], // Export DAO if needed by other modules
})
export class AppModule {}- We use
forRootAsyncto potentially injectConfigServiceif needed for database configuration (though not strictly necessary for this simple SQLite setup). synchronize: trueis convenient for development as it automatically creates/updates the database schema based on your entities. IMPORTANT: Set this tofalsein production and use TypeORM migrations to manage schema changes safely.TypeOrmModule.forFeature([ProcessedMessage])makes theRepository<ProcessedMessage>injectable.- We provide and export
ProcessedMessageDaoso it can be used by the SQS consumer later.
Note on Production Databases: While SQLite is excellent for tutorials and local development, the idempotency pattern demonstrated here is production-ready. However, SQLite itself has limitations regarding concurrency and scalability under heavy load. For production environments handling significant traffic, consider using more robust database systems like PostgreSQL, MySQL, or managed NoSQL services like AWS DynamoDB.
5. SNS Publisher Implementation
This module will handle publishing messages to our SNS topic.
Step 1: Create SNS Publisher Service
This service encapsulates the logic for interacting with the AWS SNS client.
// src/sns/sns.publisher.service.ts
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { SNSClient, PublishCommand, PublishCommandInput } from '@aws-sdk/client-sns';
import { v4 as uuid } from 'uuid';
@Injectable()
export class SnsPublisherService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(SnsPublisherService.name);
private snsClient: SNSClient;
private topicArn: string;
constructor(private configService: ConfigService) {}
onModuleInit() {
const region = this.configService.get<string>('AWS_REGION');
this.topicArn = this.configService.get<string>('SNS_TOPIC_ARN');
if (!region || !this.topicArn) {
throw new Error('AWS Region or SNS Topic ARN not configured in environment variables.');
}
this.snsClient = new SNSClient({
region: region,
credentials: {
accessKeyId: this.configService.get<string>('AWS_ACCESS_KEY_ID'),
secretAccessKey: this.configService.get<string>('AWS_SECRET_ACCESS_KEY'),
},
});
this.logger.log('SNS Client Initialized');
}
async publish<T>(message: T, messageGroupId?: string): Promise<string | undefined> {
// Ensure message is stringified JSON for structured data
const messageBody = typeof message === 'string' ? message : JSON.stringify(message);
const params: PublishCommandInput = {
TopicArn: this.topicArn,
Message: messageBody,
// MessageGroupId and MessageDeduplicationId are required for FIFO topics
// If using a Standard topic, they are optional but can help with ordering/deduplication within SQS FIFO subscribers
// MessageGroupId: messageGroupId || 'default-group', // Example group ID
// MessageDeduplicationId: uuid(), // Use content-based hashing or a unique ID if needed for FIFO
};
// Remove FIFO parameters if not needed (e.g., for Standard topics)
// Check if your topic ARN ends with .fifo before including these
// if (!this.topicArn.endsWith('.fifo')) {
// delete params.MessageGroupId;
// delete params.MessageDeduplicationId;
// }
try {
const command = new PublishCommand(params);
const response = await this.snsClient.send(command);
this.logger.log(`Message published to SNS Topic ${this.topicArn}. Message ID: ${response.MessageId}`);
return response.MessageId;
} catch (error) {
this.logger.error(`Error publishing message to SNS Topic ${this.topicArn}: ${error.message}`, error.stack);
// Implement retry logic or specific error handling here if needed
throw error; // Re-throw to allow upstream handling
}
}
onModuleDestroy() {
this.snsClient?.destroy();
this.logger.log('SNS Client Destroyed');
}
}- The service initializes the
SNSClientusing credentials and region fromConfigService. - The
publishmethod takes a message (object or string), stringifies it if necessary, and sends it usingPublishCommand. - It includes commented-out logic for
MessageGroupIdandMessageDeduplicationIdwhich are primarily for FIFO topics/queues but demonstrates their usage. Adjust or remove based on your topic type. - Basic logging and error handling are included.
OnModuleInitensures the client is ready,OnModuleDestroycleans up resources.
Step 2: Create Example Controller (Optional)
Create a simple controller to trigger the publisher via an HTTP request.
// src/sns/sns.controller.ts
import { Controller, Post, Body, HttpCode, HttpStatus } from '@nestjs/common';
import { SnsPublisherService } from './sns.publisher.service';
interface PublishMessageDto {
message: any; // Can be any structure you want to send
type: 'reminder' | 'notification' | 'task'; // Example field
}
@Controller('messages') // Example endpoint prefix
export class SnsController {
constructor(private readonly snsPublisherService: SnsPublisherService) {}
@Post('publish')
@HttpCode(HttpStatus.ACCEPTED) // Use 202 Accepted for async operations
async publishMessage(@Body() body: PublishMessageDto) {
// You might add validation using class-validator here
const messageId = await this.snsPublisherService.publish(body);
return {
status: 'Message submitted for publishing',
messageId: messageId,
};
}
}- This provides a
/messages/publishendpoint to test sending messages. - It uses
HttpStatus.ACCEPTED(202) as publishing is an asynchronous operation trigger.
Step 3: Create SNS Module
Define the module to encapsulate the SNS components.
// src/sns/sns.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config'; // Import if not global
import { SnsPublisherService } from './sns.publisher.service';
import { SnsController } from './sns.controller';
@Module({
imports: [ConfigModule], // Only needed if ConfigModule isn't global
controllers: [SnsController],
providers: [SnsPublisherService],
exports: [SnsPublisherService], // Export service if needed elsewhere
})
export class SnsModule {}Step 4: Import SNS Module into AppModule
Add SnsModule to the imports array in src/app.module.ts.
// src/app.module.ts
// ... other imports
import { SnsModule } from './sns/sns.module'; // Import SNS module
import { SqsModule } from './sqs/sqs.module'; // Placeholder for later
import { TypeOrmModule } from '@nestjs/typeorm';
import { ConfigModule } from '@nestjs/config';
import { ProcessedMessage } from './database/entities/processed-message.entity';
import { ProcessedMessageDao } from './database/daos/processed-message.dao';
import { AppController } from './app.controller';
import { AppService } from './app.service';
@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true, envFilePath: '.env' }),
TypeOrmModule.forRootAsync({ /* ... TypeORM config ... */ }),
TypeOrmModule.forFeature([ProcessedMessage]),
SnsModule, // Add SnsModule
// SqsModule, // Add SqsModule later
],
controllers: [AppController],
providers: [AppService, ProcessedMessageDao],
exports: [ProcessedMessageDao],
})
export class AppModule {}6. SQS Consumer Implementation
This module will poll the SQS queue, process messages, and ensure idempotency using the database.
Step 1: Define Constants (Optional)
It can be helpful to define constants for message source types.
// src/sqs/utils/constants.ts
export enum MessageSourceType {
SQS = 'SQS', // Indicates direct SQS message or Raw SNS message
SNS = 'SNS', // Indicates message originated from SNS (non-raw delivery)
UNKNOWN = 'UNKNOWN',
}Step 2: Create SQS Consumer Service
This service uses the sqs-consumer library to handle polling, processing, and deleting messages.
// src/sqs/sqs.consumer.service.ts
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { SQSClient, Message, DeleteMessageCommand } from '@aws-sdk/client-sqs';
import { Consumer } from 'sqs-consumer';
import { ProcessedMessageDao } from '../database/daos/processed-message.dao';
import { MessageSourceType } from './utils/constants';
@Injectable()
export class SqsConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(SqsConsumerService.name);
private consumer: Consumer;
private sqsClient: SQSClient;
private queueUrl: string;
constructor(
private configService: ConfigService,
private processedMessageDao: ProcessedMessageDao, // Inject DAO
) {}
onModuleInit() {
const region = this.configService.get<string>('AWS_REGION');
this.queueUrl = this.configService.get<string>('SQS_QUEUE_URL');
if (!region || !this.queueUrl) {
throw new Error('AWS Region or SQS Queue URL not configured.');
}
// Create an SQS client instance for the consumer and for manual deletion.
this.sqsClient = new SQSClient({
region: region,
credentials: {
accessKeyId: this.configService.get<string>('AWS_ACCESS_KEY_ID'),
secretAccessKey: this.configService.get<string>('AWS_SECRET_ACCESS_KEY'),
},
});
this.initializeConsumer();
this.consumer.start();
this.logger.log(`SQS Consumer started polling queue: ${this.queueUrl}`);
}
private initializeConsumer() {
this.consumer = Consumer.create({
queueUrl: this.queueUrl,
sqs: this.sqsClient, // Pass the initialized SQS client
handleMessage: async (message: Message) => {
await this.processMessage(message);
},
batchSize: 10, // Process up to 10 messages concurrently (adjust as needed)
visibilityTimeout: 60, // Must be >= handleMessage timeout. In seconds.
waitTimeSeconds: 10, // Use long polling (reduces costs, improves efficiency)
// --- IMPORTANT ---
// Set to false: We will manually delete messages only after successful processing.
shouldDeleteMessages: false,
});
this.consumer.on('error', (err, message) => {
this.logger.error(`SQS Consumer error: ${err.message}`, { messageId: message?.MessageId });
// Add more specific error handling/alerting here
});
this.consumer.on('processing_error', (err, message) => {
this.logger.error(`SQS Consumer processing error: ${err.message}`, { messageId: message?.MessageId });
// Decide if the message should be retried or sent to a Dead Letter Queue (DLQ)
// Note: If processing fails repeatedly, it will eventually go to DLQ if configured.
});
this.consumer.on('timeout_error', (err, message) => {
this.logger.error(`SQS Consumer message visibility timeout error: ${err.message}`, { messageId: message?.MessageId });
// This usually means processing took longer than the visibility timeout.
// Increase visibilityTimeout if needed, or optimize processing.
});
this.consumer.on('stopped', () => {
this.logger.log('SQS Consumer stopped.');
});
}
private async processMessage(message: Message): Promise<void> {
this.logger.debug(`Received message: ${message.MessageId}`);
if (!message.MessageId || !message.Body || !message.ReceiptHandle) {
this.logger.error('Received malformed message, skipping.', message);
// Consider sending to DLQ or logging more details
// We cannot delete without ReceiptHandle, so let it timeout and potentially retry/DLQ.
return;
}
const messageId = message.MessageId;
const messageBody = message.Body;
const receiptHandle = message.ReceiptHandle; // Capture receipt handle early
try {
// 1. Check for Duplicates (Idempotency Check)
const alreadyProcessed = await this.processedMessageDao.hasBeenProcessed(messageId);
if (alreadyProcessed) {
this.logger.warn(`Message ${messageId} already processed. Deleting from queue.`);
await this.deleteMessageFromQueue(receiptHandle);
return;
}
// 2. Parse and Process the Message Content
// Attempts to handle both raw SNS delivery (enabled) and non-raw (disabled).
// The primary path assumes raw delivery is enabled, where message.Body is the direct payload.
let payload: any;
let sourceType = MessageSourceType.UNKNOWN; // Default
try {
payload = JSON.parse(messageBody);
// Check if it looks like an SNS notification wrapper (non-raw delivery)
if (payload.Type === 'Notification' && payload.TopicArn && payload.Message) {
sourceType = MessageSourceType.SNS;
// Re-parse the actual message content from the wrapper
try {
payload = JSON.parse(payload.Message);
} catch (innerParseError) {
this.logger.warn(`Could not JSON parse inner SNS message for ${messageId}. Processing inner content as raw string.`);
payload = payload.Message; // Treat inner message as raw string
}
} else {
// Assume it's a direct SQS message or a raw SNS message (valid JSON)
sourceType = MessageSourceType.SQS;
}
} catch (parseError) {
// Not valid JSON - treat as a plain text SQS message or raw SNS message
this.logger.warn(`Message ${messageId} body is not valid JSON. Processing as raw string.`);
payload = messageBody;
sourceType = MessageSourceType.SQS;
}
this.logger.log(`Processing message ${messageId} of type ${sourceType}`);
// --- YOUR BUSINESS LOGIC HERE ---
// Example: Handle different message types based on payload content
if (payload?.type === 'reminder') {
this.logger.log(`Handling reminder: ${payload.message}`);
// await sendReminderNotification(payload);
} else if (payload?.type === 'notification') {
this.logger.log(`Handling notification: ${payload.message}`);
// await sendGenericNotification(payload);
} else {
this.logger.log(`Handling generic task:`, payload);
// await processGenericTask(payload);
}
// Simulate work
await new Promise(resolve => setTimeout(resolve, 500));
// --- END BUSINESS LOGIC ---
// 3. Record Successful Processing (Idempotency)
// Use the original SQS message ID and body for tracking
const recorded = await this.processedMessageDao.recordMessageProcessed(messageId, messageBody, sourceType);
if (recorded === null) {
// This means it was processed by another concurrent consumer between step 1 and now.
this.logger.warn(`Message ${messageId} was processed concurrently. Skipping delete.`);
// Do not delete, let the other consumer handle deletion.
return;
}
this.logger.log(`Successfully processed and recorded message ${messageId}.`);
// 4. Delete Message from Queue
await this.deleteMessageFromQueue(receiptHandle);
this.logger.debug(`Deleted message ${messageId} from queue.`);
} catch (error) {
// If any step fails (business logic, DB recording), DO NOT delete the message.
// It will become visible again after the visibility timeout for retry.
// Configure a Dead Letter Queue (DLQ) in SQS to handle messages that fail repeatedly.
this.logger.error(`Failed to process message ${messageId}: ${error.message}`, error.stack);
// Re-throwing might trigger 'processing_error' event depending on `sqs-consumer` version/handling.
// throw error; // Or handle specific errors without re-throwing if appropriate
}
}
private async deleteMessageFromQueue(receiptHandle: string): Promise<void> {
try {
const deleteCommand = new DeleteMessageCommand({
QueueUrl: this.queueUrl,
ReceiptHandle: receiptHandle,
});
await this.sqsClient.send(deleteCommand);
} catch (error) {
this.logger.error(`Failed to delete message with receipt handle ${receiptHandle.substring(0, 10)}... from queue ${this.queueUrl}: ${error.message}`, error.stack);
// This is problematic - the message was processed but not deleted.
// It will likely be processed again after the visibility timeout.
// Idempotency check should prevent duplicate *actions*, but logging this error is crucial.
// Consider adding monitoring/alerting for delete failures.
}
}
onModuleDestroy() {
this.consumer?.stop();
this.sqsClient?.destroy();
this.logger.log('SQS Consumer and Client Destroyed');
}
}- The service initializes
sqs-consumerwith the SQS client and configuration. shouldDeleteMessages: falseis crucial; we manually delete messages only after successful processing and recording.handleMessagecallsprocessMessage.processMessageimplements the core logic:- Idempotency check using
ProcessedMessageDao. If already processed, delete and return. - Parse the message body, attempting to handle raw vs. non-raw SNS delivery.
- Placeholder for your actual business logic.
- Record the message ID in the database using
ProcessedMessageDao. If it returnsnull(concurrent processing), return without deleting. - Delete the message from SQS using
deleteMessageFromQueue.
- Idempotency check using
- Error handling is included for consumer events and within
processMessage. Failed processing prevents deletion, allowing SQS retries (and eventually DLQ if configured). deleteMessageFromQueueencapsulates theDeleteMessageCommand.OnModuleDestroystops the consumer gracefully.
Step 3: Create SQS Module
Define the module for SQS components.
// src/sqs/sqs.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config'; // Import if not global
import { TypeOrmModule } from '@nestjs/typeorm'; // Needed for DAO injection
import { ProcessedMessage } from '../database/entities/processed-message.entity';
import { ProcessedMessageDao } from '../database/daos/processed-message.dao';
import { SqsConsumerService } from './sqs.consumer.service';
@Module({
imports: [
ConfigModule, // Only needed if ConfigModule isn't global
TypeOrmModule.forFeature([ProcessedMessage]), // Make repository available for DAO
],
providers: [SqsConsumerService, ProcessedMessageDao], // Provide Consumer and DAO
exports: [SqsConsumerService], // Export if needed elsewhere
})
export class SqsModule {}- This module imports
TypeOrmModule.forFeatureto make theProcessedMessagerepository available for injection intoProcessedMessageDao. - It provides both the
SqsConsumerServiceand theProcessedMessageDao.
Step 4: Import SQS Module into AppModule
Finally, add SqsModule to the imports array in src/app.module.ts. Ensure ProcessedMessageDao is provided/exported correctly (it's provided in AppModule and exported, and also provided in SqsModule - providing it in SqsModule is sufficient if it's only used there, or keep it exported from AppModule if used elsewhere too. Let's adjust AppModule slightly assuming DAO is primarily used by SQS).
// src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ProcessedMessage } from './database/entities/processed-message.entity';
// Remove DAO import/providers/exports if handled solely by SqsModule
// import { ProcessedMessageDao } from './database/daos/processed-message.dao';
import { SnsModule } from './sns/sns.module';
import { SqsModule } from './sqs/sqs.module'; // Import SQS module
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
envFilePath: '.env',
}),
TypeOrmModule.forRootAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
type: 'sqlite',
database: 'processed_messages.db',
entities: [ProcessedMessage],
synchronize: configService.get<string>('NODE_ENV') !== 'production', // Safer default
logging: configService.get<string>('NODE_ENV') !== 'production',
}),
inject: [ConfigService],
}),
// No need for TypeOrmModule.forFeature here if DAO handles it internally
SnsModule,
SqsModule, // Add SqsModule
],
controllers: [AppController],
providers: [AppService], // DAO is now managed within SqsModule
// exports: [], // No need to export DAO from here anymore
})
export class AppModule {}- We added
SqsModuleto the imports. - Removed
ProcessedMessageDaofromAppModule's providers/exports andTypeOrmModule.forFeaturefrom its imports, asSqsModulenow handles providing the DAO and importing the necessary feature module. - Adjusted
synchronizedefault to be safer.
7. Running and Testing
-
Start the Application:
bashnpm run start:devThis will start the NestJS application, initialize the SNS client, configure TypeORM (and create the SQLite file/table if
synchronize: true), and start the SQS consumer polling. -
Trigger Message Publishing: Use a tool like
curl, Postman, or Insomnia to send a POST request to the example endpoint (if you createdSnsController):bashcurl -X POST http://localhost:3000/messages/publish \ -H "Content-Type: application/json" \ -d '{ "message": "Send welcome email to user@example.com", "type": "notification", "userId": "user-123" }'You should receive a
202 Acceptedresponse with a message ID. -
Observe Logs:
- Check the NestJS application logs. You should see:
- Log message from
SnsPublisherServiceindicating successful publishing to SNS. - Log messages from
SqsConsumerServiceindicating:- Receipt of the message from SQS.
- Processing the message (your business logic logs).
- Recording the message ID in the database (
ProcessedMessageDao). - Deleting the message from SQS.
- Log message from
- Check the NestJS application logs. You should see:
-
Verify Idempotency:
- Send the exact same POST request again.
- Observe the logs. This time, the
SqsConsumerServiceshould log a warning like "Message X already processed. Deleting from queue." and it should not execute your business logic again. It should still delete the duplicate message from the queue.
-
Check Database:
- Use a SQLite browser tool to inspect the
processed_messages.dbfile. You should see entries for each successfully processed unique message ID.
- Use a SQLite browser tool to inspect the
8. Further Considerations and Best Practices
- Dead Letter Queues (DLQ): Configure a DLQ on your SQS queue. Messages that fail processing repeatedly (exceeding the
maxReceiveCount) will be sent to the DLQ for investigation, preventing them from blocking the main queue indefinitely. - Error Handling & Retries: Implement more robust error handling within your
processMessagelogic. Decide which errors are transient (and should be retried by letting the message visibility timeout expire) and which are permanent (and should potentially lead to moving the message to the DLQ immediately or logging a critical error). Thesqs-consumerlibrary has options for handling processing errors. - Monitoring & Alerting: Set up AWS CloudWatch Alarms based on SQS metrics (e.g.,
ApproximateNumberOfMessagesVisible,ApproximateAgeOfOldestMessage) and DLQ metrics (ApproximateNumberOfMessagesVisibleon the DLQ). Monitor application logs for errors (especially message deletion failures). - Configuration Management: For production, use a more robust configuration system than just
.env(e.g., AWS Systems Manager Parameter Store, AWS Secrets Manager, or environment variables injected by your deployment platform). - Structured Logging: Use JSON-based logging for easier parsing and analysis in log aggregation tools (like CloudWatch Logs Insights, Datadog, Splunk).
- FIFO Queues: If strict ordering or exactly-once processing (within the bounds of SQS FIFO guarantees) is required, use SNS FIFO topics and SQS FIFO queues. Remember to provide
MessageGroupIdand potentiallyMessageDeduplicationIdwhen publishing. Idempotency checks are still recommended even with FIFO. - Scalability: Monitor queue depth and CPU/Memory utilization of your consumer instances. Scale the number of consumer instances (e.g., in ECS, Kubernetes, or EC2 Auto Scaling Groups) based on these metrics.
sqs-consumer'sbatchSizealso affects concurrency within a single instance. - Security: Follow the principle of least privilege for IAM permissions. Ensure your AWS credentials are handled securely (never hardcode them, use IAM roles for EC2/ECS/Lambda where possible).
- TypeORM Migrations: Disable
synchronize: truein production and use TypeORM migrations to manage database schema changes safely and controllably. - Testing: Write unit tests for your services (Publisher, Consumer, DAO) and integration tests that mock AWS services or use tools like
localstackto simulate the AWS environment locally.
This guide provides a solid foundation for building reliable asynchronous processing systems with NestJS and AWS. Remember to adapt the patterns and implement robust error handling and monitoring for your specific production needs.
Frequently Asked Questions
How to build asynchronous tasks with NestJS?
Build asynchronous tasks in NestJS by integrating with AWS SNS and SQS to decouple task initiation and execution. Create a NestJS application to publish messages to an SNS topic. An SQS queue, subscribed to the SNS topic, buffers these messages. A NestJS consumer service then processes messages from the SQS queue and tracks them in a database like SQLite for idempotency.
What is AWS SNS used for in NestJS async tasks?
AWS SNS (Simple Notification Service) acts as a managed pub/sub messaging service in a NestJS asynchronous task setup. It's the central point where messages (events or commands) are published by the NestJS application. SNS then reliably distributes these messages to any subscribed services, like an SQS queue.
Why use AWS SQS with NestJS for asynchronous tasks?
AWS SQS (Simple Queue Service) offers a durable message buffer and scalability for asynchronous NestJS tasks. It subscribes to the SNS topic and stores incoming messages reliably until they're processed by a consumer service. SQS allows you to scale your consumer instances independently from the main application, managing large volumes of messages effectively.
How to handle message idempotency in SQS consumer?
Handle idempotency in your SQS consumer by tracking processed message IDs in a database, such as SQLite. Before processing a message, check if its ID exists in the database. If it does, the message has already been processed, so you can skip it or delete it from the SQS queue without re-processing. This prevents duplicate actions if SQS delivers a message multiple times.
How to publish a message to SNS from NestJS?
Use the AWS SDK for JavaScript v3 within a NestJS service to publish messages to SNS. Create an SNSClient, configure it with your AWS credentials and region, then use the PublishCommand to send messages to your SNS topic ARN. This allows your application to trigger events or commands asynchronously without blocking the main thread.
How to configure a NestJS project for AWS SNS and SQS?
Configure your NestJS project for AWS by installing necessary dependencies like '@aws-sdk/client-sns', '@aws-sdk/client-sqs', 'sqs-consumer', '@nestjs/config', and 'uuid'. You'll need an AWS account with credentials and to create an SNS topic and SQS queue, subscribing the queue to the topic and granting SNS permission to send messages to the queue. Securely store AWS credentials in a '.env' file and load them using NestJS's ConfigModule.
What is the purpose of sqs-consumer library?
The 'sqs-consumer' library simplifies polling and processing messages from an SQS queue in your NestJS application. It provides a clean API to handle message retrieval, error management (timeouts, processing errors), and efficient long polling. This abstracts away the complexities of direct SQS interactions, making your consumer logic more focused on message processing.
What database is used for idempotency in this example?
A simple SQLite database with TypeORM is used in this example to track processed message IDs and ensure idempotency. An entity 'ProcessedMessage' is created to store the message ID, body, and timestamp. TypeORM's features for database interactions and schema management streamline this process.
How are message duplicates managed in this system?
The example uses a ProcessedMessageDao, which keeps a record of handled message IDs. When a message is taken from SQS, the system checks if it's already been processed. If so, it logs the duplication and deletes the message from the queue, preventing duplicate processing and ensuring idempotency.
What should the visibility timeout be set to?
Visibility timeout, a key SQS parameter, should be set to a value greater than or equal to your expected maximum message processing time. For example, if your handleMessage function might take up to 60 seconds, visibilityTimeout should be at least 60. This setting prevents a message from reappearing in the queue before processing completes.
When to use FIFO queues with NestJS and AWS?
Use FIFO (First-In, First-Out) queues with NestJS and AWS when strict message ordering and exactly-once processing are critical for your application logic. Scenarios like financial transactions, order processing, or any situation requiring guaranteed sequential handling would benefit from FIFO queues. Remember that FIFO queues have lower throughput compared to standard queues.
How to scale the SQS consumer for higher throughput?
Scale the SQS consumer by increasing the number of consumer service instances, adjusting sqs-consumer's batchSize parameter (increasing concurrent message processing within an instance), or by tuning visibilityTimeout to optimize message handling efficiency. Monitor SQS metrics like ApproximateNumberOfMessagesVisible and consumer resource usage to guide scaling decisions.
What are some best practices for handling errors in SQS consumers?
Implement robust error handling by catching exceptions within the message processing logic. Classify errors as transient (retryable) or permanent. Use retries with exponential backoff for transient errors. Configure a Dead Letter Queue (DLQ) to capture messages that repeatedly fail processing. Log errors comprehensively and set up monitoring and alerts for critical failures.