code examples

Sent logo
Sent TeamMay 3, 2025 / code examples / Article

Building Asynchronous Tasks with NestJS, AWS SNS, and SQS

A guide to implementing scalable and reliable asynchronous tasks, notifications, and reminders in NestJS using AWS SNS and SQS for decoupling and background processing.

Project Overview and Goals

In modern applications, performing tasks asynchronously is crucial for maintaining responsiveness and scalability. Operations like sending email notifications, processing uploaded files, generating reports, or sending scheduled reminders shouldn't block the main application thread or delay user responses.

This guide demonstrates how to build a system using NestJS that offloads such tasks to a background worker process facilitated by AWS SNS and SQS.

What We Will Build:

  1. A NestJS application capable of publishing messages (events or commands) to an AWS SNS topic.
  2. An AWS SQS queue subscribed to the SNS topic, acting as a durable buffer for messages.
  3. A NestJS-based consumer service that polls the SQS queue, processes messages reliably, and stores processed message information in a simple SQLite database for idempotency/tracking.
  4. API endpoints to trigger message publishing.

Problem Solved:

  • Decoupling: Separates the task initiation (e.g., user signup, reminder creation) from the task execution (e.g., sending a welcome email, dispatching the reminder notification).
  • Scalability: SQS queues can handle large volumes of messages, and consumer instances can be scaled independently based on queue depth.
  • Reliability: SQS ensures messages are stored durably until successfully processed. SNS provides reliable fan-out capabilities if multiple consumers need the same message.
  • Improved Performance: Prevents long-running tasks from impacting the performance of user-facing APIs.

Technologies Involved:

  • NestJS: A progressive Node.js framework for building efficient, reliable, and scalable server-side applications using TypeScript. Chosen for its modular architecture, dependency injection, and excellent TypeScript support.
  • AWS SNS (Simple Notification Service): A managed pub/sub messaging service. Used here to publish events/messages to a central topic.
  • AWS SQS (Simple Queue Service): A managed message queuing service. Used to subscribe to the SNS topic and reliably buffer messages for processing by our consumer.
  • AWS SDK for JavaScript v3: Used within the NestJS application to interact with SNS and SQS APIs.
  • TypeORM & SQLite: An ORM (Object-Relational Mapper) and a simple file-based database used for persisting processed message IDs to ensure idempotency.
  • sqs-consumer: A Node.js library simplifying the process of polling and processing messages from an SQS queue.

System Architecture:

mermaid
graph LR
    A[Client / API Request] --> B(NestJS API / Publisher);
    B -- Publish Message --> C{AWS SNS Topic};
    C -- Fan-out --> D[AWS SQS Queue];
    E(NestJS Consumer Service) -- Polls Queue --> D;
    E -- Processes Message --> F[(SQLite Database)];
    E -- Deletes Message --> D;

    style B fill:#f9f,stroke:#333,stroke-width:2px
    style E fill:#f9f,stroke:#333,stroke-width:2px
    style C fill:#FF9900,stroke:#333,stroke-width:2px
    style D fill:#FF9900,stroke:#333,stroke-width:2px
    style F fill:#ccf,stroke:#333,stroke-width:2px
  • Flow: A request triggers the NestJS Publisher, which sends a message to SNS. SNS fans it out to the subscribed SQS Queue. The NestJS Consumer polls the queue, retrieves a message, processes it (optionally interacting with a database), and finally deletes the message from the queue upon successful processing.

Prerequisites:

  • Node.js (LTS version recommended) and npm/yarn installed.
  • An AWS account with IAM user credentials (Access Key ID, Secret Access Key) configured with permissions for SNS (Publish) and SQS (ReceiveMessage, DeleteMessage, GetQueueAttributes).
  • AWS CLI installed and configured (optional, but helpful for verification).
  • Basic understanding of TypeScript and NestJS concepts (Modules, Services, Controllers).
  • Docker (optional, for containerized development/deployment).

1. Setting up the Project

Let's bootstrap our NestJS project and install the necessary dependencies.

Step 1: Create a New NestJS Project

Open your terminal and run the NestJS CLI command:

bash
npx @nestjs/cli new nestjs-sns-sqs-reminders

Choose your preferred package manager (npm or yarn). When prompted, navigate into the newly created directory:

bash
cd nestjs-sns-sqs-reminders

Step 2: Install Dependencies

We need several packages for AWS integration, database interaction, configuration, and SQS polling:

bash
# AWS SDK v3 Clients for SNS and SQS
npm install @aws-sdk/client-sns @aws-sdk/client-sqs

# SQS Consumer library
npm install sqs-consumer

# TypeORM for database interaction and SQLite driver
npm install @nestjs/typeorm typeorm sqlite3

# Configuration management and UUID generation
npm install @nestjs/config uuid dotenv
npm install --save-dev @types/uuid # Types for uuid

Step 3: Project Structure

NestJS promotes a modular structure. We'll create dedicated modules for SNS publishing and SQS consuming logic. Your src directory might eventually look something like this:

text
src/
├── app.controller.ts
├── app.module.ts
├── app.service.ts
├── main.ts
├── config/                 # Configuration setup (optional, but good practice)
│   └── configuration.ts
├── database/               # Database related files
│   ├── entities/
│   │   └── processed-message.entity.ts
│   └── daos/
│       └── processed-message.dao.ts
├── sns/                    # SNS Publisher module
│   ├── sns.module.ts
│   ├── sns.publisher.service.ts
│   └── sns.controller.ts   # Example controller to trigger publishing
└── sqs/                    # SQS Consumer module
    ├── sqs.module.ts
    ├── sqs.consumer.service.ts
    └── utils/
        └── constants.ts    # Shared constants like MessageSourceType

We'll create these files as we go.


2. AWS Setup: Creating SNS Topic and SQS Queue

Before writing code, we need the necessary AWS resources.

Step 1: Create an SQS Queue

  1. Log in to the AWS Management Console.
  2. Navigate to the Simple Queue Service (SQS) console.
  3. Click Create queue.
  4. Type: Choose Standard. Standard queues offer higher throughput and at-least-once delivery, suitable for many reminder/task scenarios. FIFO queues guarantee order and exactly-once processing but have lower throughput limits.
  5. Name: Enter a descriptive name, e.g., nestjs-reminders-queue.
  6. Configuration: Review default settings like Visibility timeout (e.g., 30 seconds – how long a message is hidden after being received before it becomes visible again if not deleted) and Message retention period (e.g., 4 days). Adjust if needed.
  7. Access policy: We will configure this in the next step via the SNS subscription, so leave the basic setting for now.
  8. Click Create queue.
  9. Important: Once created, note down the Queue URL and the ARN from the queue details page. You'll need these for your .env file and the SNS subscription.

Step 2: Create an SNS Topic

  1. Navigate to the Simple Notification Service (SNS) console.
  2. Click Topics in the left navigation pane.
  3. Click Create topic.
  4. Type: Choose Standard. Matches the SQS queue type.
  5. Name: Enter a descriptive name, e.g., nestjs-tasks-topic.
  6. Leave other settings as default for now.
  7. Click Create topic.
  8. Important: Note down the Topic ARN.

Step 3: Subscribe SQS Queue to SNS Topic

  1. Go to the details page of the SNS Topic you just created (nestjs-tasks-topic).
  2. Scroll down to the Subscriptions tab and click Create subscription.
  3. Topic ARN: Should be pre-filled.
  4. Protocol: Select Amazon SQS.
  5. Endpoint: Paste the ARN of the SQS queue (nestjs-reminders-queue) you created earlier.
  6. Enable raw message delivery: It's often simpler to enable this. If enabled, the exact message you publish to SNS is delivered to SQS without SNS metadata wrapping. If disabled (default), the SQS message body will be a JSON structure containing the SNS message details. We'll assume raw delivery is enabled for simplicity in the primary consumer code path shown in this guide.
  7. Click Create subscription.

Step 4: Grant SNS Permission to Send to SQS (Crucial!)

SQS needs to allow the SNS topic to send messages to it.

  1. Go back to the SQS Console and select your queue (nestjs-reminders-queue).
  2. Go to the Access policy tab and click Edit.
  3. Use the Policy generator or manually add a statement to the existing policy's Statement array. The goal is to add a statement like this (replace placeholders):
json
{
  ""Sid"": ""AllowSNSPublishToSQS"",
  ""Effect"": ""Allow"",
  ""Principal"": {
    ""Service"": ""sns.amazonaws.com""
  },
  ""Action"": ""SQS:SendMessage"",
  ""Resource"": ""YOUR_SQS_QUEUE_ARN"",
  ""Condition"": {
    ""ArnEquals"": {
      ""aws:SourceArn"": ""YOUR_SNS_TOPIC_ARN""
    }
  }
}
  1. Ensure the final policy JSON is valid. Replace YOUR_SQS_QUEUE_ARN with your actual Queue ARN and YOUR_SNS_TOPIC_ARN with your actual Topic ARN.
  2. Click Save.
  • Why this policy? It explicitly grants the SNS service (sns.amazonaws.com) permission to perform the SQS:SendMessage action on your specific SQS queue resource, but only if the message originates from your specific SNS topic. This follows the principle of least privilege.

3. Environment Configuration

Securely manage your AWS credentials and resource identifiers using environment variables.

Step 1: Create .env File

In the root directory of your project, create a file named .env:

dotenv
# .env

# AWS Credentials - Use IAM user keys, NOT root keys
AWS_ACCESS_KEY_ID=YOUR_AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY=YOUR_AWS_SECRET_ACCESS_KEY
AWS_REGION=us-east-1 # Or your preferred AWS region

# AWS Resource Identifiers
SNS_TOPIC_ARN=YOUR_SNS_TOPIC_ARN # ARN copied from SNS console
SQS_QUEUE_URL=YOUR_SQS_QUEUE_URL # URL copied from SQS console
  • Replace placeholders with the actual values obtained from your IAM user configuration and the AWS console (SNS Topic ARN, SQS Queue URL).
  • Security: Add .env to your .gitignore file immediately to prevent committing credentials.

Step 2: Load Environment Variables using ConfigModule

NestJS provides a ConfigModule to load and manage environment variables. Modify your main application module (src/app.module.ts):

typescript
// src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { AppController } from './app.controller';
import { AppService } from './app.service';
// Import other modules later (SnsModule, SqsModule, TypeOrmModule)

@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true, // Make ConfigService available globally
      envFilePath: '.env', // Specify the env file path
    }),
    // ... other modules will be added here
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}
  • isGlobal: true makes the ConfigService injectable in any module without needing to import ConfigModule everywhere.
  • envFilePath: '.env' explicitly tells the module where to find your variables.

4. Database Setup (SQLite & TypeORM)

We'll use a simple SQLite database via TypeORM to keep track of processed message IDs. This helps prevent processing the same message multiple times if errors occur or if SQS delivers a message more than once (which can happen with Standard queues).

Step 1: Define the Entity

Create an entity representing a processed message record.

typescript
// src/database/entities/processed-message.entity.ts
import { Entity, PrimaryGeneratedColumn, Column, CreateDateColumn, Index } from 'typeorm';

@Entity({ name: 'processed_messages' })
export class ProcessedMessage {
  @PrimaryGeneratedColumn()
  id: number;

  @Index({ unique: true }) // Ensure messageId is unique
  @Column({ type: 'varchar', length: 255 })
  messageId: string; // The unique ID from the SQS message

  @Column({ type: 'text' })
  messageBody: string; // Store the original message body for reference

  @Column({ type: 'varchar', length: 50, nullable: true })
  sourceType?: string; // e.g., 'SNS', 'SQS' - optional tracking

  @CreateDateColumn()
  processedAt: Date;
}
  • We add a unique index on messageId to leverage the database for enforcing idempotency.

Step 2: Create the Data Access Object (DAO)

Create a service to handle database interactions for this entity.

typescript
// src/database/daos/processed-message.dao.ts
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, QueryFailedError } from 'typeorm';
import { ProcessedMessage } from '../entities/processed-message.entity';

@Injectable()
export class ProcessedMessageDao {
  private readonly logger = new Logger(ProcessedMessageDao.name);

  constructor(
    @InjectRepository(ProcessedMessage)
    private readonly repository: Repository<ProcessedMessage>,
  ) {}

  async recordMessageProcessed(
    messageId: string,
    messageBody: string,
    sourceType?: string,
  ): Promise<ProcessedMessage | null> {
    try {
      const newMessage = this.repository.create({
        messageId,
        messageBody,
        sourceType,
      });
      return await this.repository.save(newMessage);
    } catch (error) {
      // Check if it's a unique constraint violation (already processed)
      // Note: Specific error code ('SQLITE_CONSTRAINT') might differ for other DBs (e.g., '23505' for PostgreSQL)
      if (error instanceof QueryFailedError && error.driverError?.code === 'SQLITE_CONSTRAINT') {
        this.logger.warn(`Message ID ${messageId} has already been processed.`);
        return null; // Indicate already processed
      }
      this.logger.error(`Failed to save processed message ${messageId}: ${error.message}`, error.stack);
      throw error; // Re-throw other errors
    }
  }

  async hasBeenProcessed(messageId: string): Promise<boolean> {
    const count = await this.repository.count({ where: { messageId } });
    return count > 0;
  }
}
  • The recordMessageProcessed method attempts to save the record. If it fails due to the unique constraint on messageId, it means the message was already processed, and it returns null. Other errors are re-thrown.
  • hasBeenProcessed provides a direct check.

Step 3: Configure TypeORM Module

Update src/app.module.ts to configure TypeORM and make the ProcessedMessage entity and DAO available.

typescript
// src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { TypeOrmModule } from '@nestjs/typeorm'; // Import TypeOrmModule
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ProcessedMessage } from './database/entities/processed-message.entity'; // Import Entity
import { ProcessedMessageDao } from './database/daos/processed-message.dao'; // Import DAO
// Import other modules later (SnsModule, SqsModule)

@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true,
      envFilePath: '.env',
    }),
    TypeOrmModule.forRootAsync({ // Use forRootAsync for dependency injection
      imports: [ConfigModule], // Import ConfigModule here
      useFactory: (configService: ConfigService) => ({
        type: 'sqlite',
        database: 'processed_messages.db', // Name of the SQLite file
        entities: [ProcessedMessage],      // Register the entity
        synchronize: true, // DEV ONLY: Auto-creates schema. Disable in prod and use migrations.
        logging: configService.get<string>('NODE_ENV') !== 'production', // Log SQL in dev
      }),
      inject: [ConfigService], // Inject ConfigService into the factory
    }),
    TypeOrmModule.forFeature([ProcessedMessage]), // Make repository available
    // ... other modules will be added here
  ],
  controllers: [AppController],
  providers: [AppService, ProcessedMessageDao], // Provide the DAO
  exports: [ProcessedMessageDao], // Export DAO if needed by other modules
})
export class AppModule {}
  • We use forRootAsync to potentially inject ConfigService if needed for database configuration (though not strictly necessary for this simple SQLite setup).
  • synchronize: true is convenient for development as it automatically creates/updates the database schema based on your entities. IMPORTANT: Set this to false in production and use TypeORM migrations to manage schema changes safely.
  • TypeOrmModule.forFeature([ProcessedMessage]) makes the Repository<ProcessedMessage> injectable.
  • We provide and export ProcessedMessageDao so it can be used by the SQS consumer later.

Note on Production Databases: While SQLite is excellent for tutorials and local development, the idempotency pattern demonstrated here is production-ready. However, SQLite itself has limitations regarding concurrency and scalability under heavy load. For production environments handling significant traffic, consider using more robust database systems like PostgreSQL, MySQL, or managed NoSQL services like AWS DynamoDB.


5. SNS Publisher Implementation

This module will handle publishing messages to our SNS topic.

Step 1: Create SNS Publisher Service

This service encapsulates the logic for interacting with the AWS SNS client.

typescript
// src/sns/sns.publisher.service.ts
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { SNSClient, PublishCommand, PublishCommandInput } from '@aws-sdk/client-sns';
import { v4 as uuid } from 'uuid';

@Injectable()
export class SnsPublisherService implements OnModuleInit, OnModuleDestroy {
  private readonly logger = new Logger(SnsPublisherService.name);
  private snsClient: SNSClient;
  private topicArn: string;

  constructor(private configService: ConfigService) {}

  onModuleInit() {
    const region = this.configService.get<string>('AWS_REGION');
    this.topicArn = this.configService.get<string>('SNS_TOPIC_ARN');

    if (!region || !this.topicArn) {
        throw new Error('AWS Region or SNS Topic ARN not configured in environment variables.');
    }

    this.snsClient = new SNSClient({
      region: region,
      credentials: {
        accessKeyId: this.configService.get<string>('AWS_ACCESS_KEY_ID'),
        secretAccessKey: this.configService.get<string>('AWS_SECRET_ACCESS_KEY'),
      },
    });
    this.logger.log('SNS Client Initialized');
  }

  async publish<T>(message: T, messageGroupId?: string): Promise<string | undefined> {
    // Ensure message is stringified JSON for structured data
    const messageBody = typeof message === 'string' ? message : JSON.stringify(message);

    const params: PublishCommandInput = {
      TopicArn: this.topicArn,
      Message: messageBody,
      // MessageGroupId and MessageDeduplicationId are required for FIFO topics
      // If using a Standard topic, they are optional but can help with ordering/deduplication within SQS FIFO subscribers
      // MessageGroupId: messageGroupId || 'default-group', // Example group ID
      // MessageDeduplicationId: uuid(), // Use content-based hashing or a unique ID if needed for FIFO
    };

    // Remove FIFO parameters if not needed (e.g., for Standard topics)
    // Check if your topic ARN ends with .fifo before including these
    // if (!this.topicArn.endsWith('.fifo')) {
    //   delete params.MessageGroupId;
    //   delete params.MessageDeduplicationId;
    // }

    try {
      const command = new PublishCommand(params);
      const response = await this.snsClient.send(command);
      this.logger.log(`Message published to SNS Topic ${this.topicArn}. Message ID: ${response.MessageId}`);
      return response.MessageId;
    } catch (error) {
      this.logger.error(`Error publishing message to SNS Topic ${this.topicArn}: ${error.message}`, error.stack);
      // Implement retry logic or specific error handling here if needed
      throw error; // Re-throw to allow upstream handling
    }
  }

  onModuleDestroy() {
    this.snsClient?.destroy();
    this.logger.log('SNS Client Destroyed');
  }
}
  • The service initializes the SNSClient using credentials and region from ConfigService.
  • The publish method takes a message (object or string), stringifies it if necessary, and sends it using PublishCommand.
  • It includes commented-out logic for MessageGroupId and MessageDeduplicationId which are primarily for FIFO topics/queues but demonstrates their usage. Adjust or remove based on your topic type.
  • Basic logging and error handling are included.
  • OnModuleInit ensures the client is ready, OnModuleDestroy cleans up resources.

Step 2: Create Example Controller (Optional)

Create a simple controller to trigger the publisher via an HTTP request.

typescript
// src/sns/sns.controller.ts
import { Controller, Post, Body, HttpCode, HttpStatus } from '@nestjs/common';
import { SnsPublisherService } from './sns.publisher.service';

interface PublishMessageDto {
  message: any; // Can be any structure you want to send
  type: 'reminder' | 'notification' | 'task'; // Example field
}

@Controller('messages') // Example endpoint prefix
export class SnsController {
  constructor(private readonly snsPublisherService: SnsPublisherService) {}

  @Post('publish')
  @HttpCode(HttpStatus.ACCEPTED) // Use 202 Accepted for async operations
  async publishMessage(@Body() body: PublishMessageDto) {
    // You might add validation using class-validator here
    const messageId = await this.snsPublisherService.publish(body);
    return {
      status: 'Message submitted for publishing',
      messageId: messageId,
    };
  }
}
  • This provides a /messages/publish endpoint to test sending messages.
  • It uses HttpStatus.ACCEPTED (202) as publishing is an asynchronous operation trigger.

Step 3: Create SNS Module

Define the module to encapsulate the SNS components.

typescript
// src/sns/sns.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config'; // Import if not global
import { SnsPublisherService } from './sns.publisher.service';
import { SnsController } from './sns.controller';

@Module({
  imports: [ConfigModule], // Only needed if ConfigModule isn't global
  controllers: [SnsController],
  providers: [SnsPublisherService],
  exports: [SnsPublisherService], // Export service if needed elsewhere
})
export class SnsModule {}

Step 4: Import SNS Module into AppModule

Add SnsModule to the imports array in src/app.module.ts.

typescript
// src/app.module.ts
// ... other imports
import { SnsModule } from './sns/sns.module'; // Import SNS module
import { SqsModule } from './sqs/sqs.module'; // Placeholder for later
import { TypeOrmModule } from '@nestjs/typeorm';
import { ConfigModule } from '@nestjs/config';
import { ProcessedMessage } from './database/entities/processed-message.entity';
import { ProcessedMessageDao } from './database/daos/processed-message.dao';
import { AppController } from './app.controller';
import { AppService } from './app.service';


@Module({
  imports: [
    ConfigModule.forRoot({ isGlobal: true, envFilePath: '.env' }),
    TypeOrmModule.forRootAsync({ /* ... TypeORM config ... */ }),
    TypeOrmModule.forFeature([ProcessedMessage]),
    SnsModule, // Add SnsModule
    // SqsModule, // Add SqsModule later
  ],
  controllers: [AppController],
  providers: [AppService, ProcessedMessageDao],
  exports: [ProcessedMessageDao],
})
export class AppModule {}

6. SQS Consumer Implementation

This module will poll the SQS queue, process messages, and ensure idempotency using the database.

Step 1: Define Constants (Optional)

It can be helpful to define constants for message source types.

typescript
// src/sqs/utils/constants.ts
export enum MessageSourceType {
  SQS = 'SQS', // Indicates direct SQS message or Raw SNS message
  SNS = 'SNS', // Indicates message originated from SNS (non-raw delivery)
  UNKNOWN = 'UNKNOWN',
}

Step 2: Create SQS Consumer Service

This service uses the sqs-consumer library to handle polling, processing, and deleting messages.

typescript
// src/sqs/sqs.consumer.service.ts
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { SQSClient, Message, DeleteMessageCommand } from '@aws-sdk/client-sqs';
import { Consumer } from 'sqs-consumer';
import { ProcessedMessageDao } from '../database/daos/processed-message.dao';
import { MessageSourceType } from './utils/constants';

@Injectable()
export class SqsConsumerService implements OnModuleInit, OnModuleDestroy {
  private readonly logger = new Logger(SqsConsumerService.name);
  private consumer: Consumer;
  private sqsClient: SQSClient;
  private queueUrl: string;

  constructor(
    private configService: ConfigService,
    private processedMessageDao: ProcessedMessageDao, // Inject DAO
  ) {}

  onModuleInit() {
    const region = this.configService.get<string>('AWS_REGION');
    this.queueUrl = this.configService.get<string>('SQS_QUEUE_URL');

    if (!region || !this.queueUrl) {
      throw new Error('AWS Region or SQS Queue URL not configured.');
    }

    // Create an SQS client instance for the consumer and for manual deletion.
    this.sqsClient = new SQSClient({
      region: region,
      credentials: {
        accessKeyId: this.configService.get<string>('AWS_ACCESS_KEY_ID'),
        secretAccessKey: this.configService.get<string>('AWS_SECRET_ACCESS_KEY'),
      },
    });

    this.initializeConsumer();
    this.consumer.start();
    this.logger.log(`SQS Consumer started polling queue: ${this.queueUrl}`);
  }

  private initializeConsumer() {
    this.consumer = Consumer.create({
      queueUrl: this.queueUrl,
      sqs: this.sqsClient, // Pass the initialized SQS client
      handleMessage: async (message: Message) => {
        await this.processMessage(message);
      },
      batchSize: 10, // Process up to 10 messages concurrently (adjust as needed)
      visibilityTimeout: 60, // Must be >= handleMessage timeout. In seconds.
      waitTimeSeconds: 10, // Use long polling (reduces costs, improves efficiency)
      // --- IMPORTANT ---
      // Set to false: We will manually delete messages only after successful processing.
      shouldDeleteMessages: false,
    });

    this.consumer.on('error', (err, message) => {
      this.logger.error(`SQS Consumer error: ${err.message}`, { messageId: message?.MessageId });
      // Add more specific error handling/alerting here
    });

    this.consumer.on('processing_error', (err, message) => {
      this.logger.error(`SQS Consumer processing error: ${err.message}`, { messageId: message?.MessageId });
      // Decide if the message should be retried or sent to a Dead Letter Queue (DLQ)
      // Note: If processing fails repeatedly, it will eventually go to DLQ if configured.
    });

    this.consumer.on('timeout_error', (err, message) => {
        this.logger.error(`SQS Consumer message visibility timeout error: ${err.message}`, { messageId: message?.MessageId });
        // This usually means processing took longer than the visibility timeout.
        // Increase visibilityTimeout if needed, or optimize processing.
    });

    this.consumer.on('stopped', () => {
        this.logger.log('SQS Consumer stopped.');
    });
  }

  private async processMessage(message: Message): Promise<void> {
    this.logger.debug(`Received message: ${message.MessageId}`);

    if (!message.MessageId || !message.Body || !message.ReceiptHandle) {
        this.logger.error('Received malformed message, skipping.', message);
        // Consider sending to DLQ or logging more details
        // We cannot delete without ReceiptHandle, so let it timeout and potentially retry/DLQ.
        return;
    }

    const messageId = message.MessageId;
    const messageBody = message.Body;
    const receiptHandle = message.ReceiptHandle; // Capture receipt handle early

    try {
      // 1. Check for Duplicates (Idempotency Check)
      const alreadyProcessed = await this.processedMessageDao.hasBeenProcessed(messageId);
      if (alreadyProcessed) {
        this.logger.warn(`Message ${messageId} already processed. Deleting from queue.`);
        await this.deleteMessageFromQueue(receiptHandle);
        return;
      }

      // 2. Parse and Process the Message Content
      // Attempts to handle both raw SNS delivery (enabled) and non-raw (disabled).
      // The primary path assumes raw delivery is enabled, where message.Body is the direct payload.
      let payload: any;
      let sourceType = MessageSourceType.UNKNOWN; // Default
      try {
          payload = JSON.parse(messageBody);
          // Check if it looks like an SNS notification wrapper (non-raw delivery)
          if (payload.Type === 'Notification' && payload.TopicArn && payload.Message) {
              sourceType = MessageSourceType.SNS;
              // Re-parse the actual message content from the wrapper
              try {
                payload = JSON.parse(payload.Message);
              } catch (innerParseError) {
                 this.logger.warn(`Could not JSON parse inner SNS message for ${messageId}. Processing inner content as raw string.`);
                 payload = payload.Message; // Treat inner message as raw string
              }
          } else {
              // Assume it's a direct SQS message or a raw SNS message (valid JSON)
              sourceType = MessageSourceType.SQS;
          }
      } catch (parseError) {
        // Not valid JSON - treat as a plain text SQS message or raw SNS message
        this.logger.warn(`Message ${messageId} body is not valid JSON. Processing as raw string.`);
        payload = messageBody;
        sourceType = MessageSourceType.SQS;
      }

      this.logger.log(`Processing message ${messageId} of type ${sourceType}`);
      // --- YOUR BUSINESS LOGIC HERE ---
      // Example: Handle different message types based on payload content
      if (payload?.type === 'reminder') {
        this.logger.log(`Handling reminder: ${payload.message}`);
        // await sendReminderNotification(payload);
      } else if (payload?.type === 'notification') {
         this.logger.log(`Handling notification: ${payload.message}`);
         // await sendGenericNotification(payload);
      } else {
         this.logger.log(`Handling generic task:`, payload);
         // await processGenericTask(payload);
      }
      // Simulate work
      await new Promise(resolve => setTimeout(resolve, 500));
      // --- END BUSINESS LOGIC ---


      // 3. Record Successful Processing (Idempotency)
      // Use the original SQS message ID and body for tracking
      const recorded = await this.processedMessageDao.recordMessageProcessed(messageId, messageBody, sourceType);
      if (recorded === null) {
        // This means it was processed by another concurrent consumer between step 1 and now.
        this.logger.warn(`Message ${messageId} was processed concurrently. Skipping delete.`);
        // Do not delete, let the other consumer handle deletion.
        return;
      }
      this.logger.log(`Successfully processed and recorded message ${messageId}.`);

      // 4. Delete Message from Queue
      await this.deleteMessageFromQueue(receiptHandle);
      this.logger.debug(`Deleted message ${messageId} from queue.`);

    } catch (error) {
      // If any step fails (business logic, DB recording), DO NOT delete the message.
      // It will become visible again after the visibility timeout for retry.
      // Configure a Dead Letter Queue (DLQ) in SQS to handle messages that fail repeatedly.
      this.logger.error(`Failed to process message ${messageId}: ${error.message}`, error.stack);
      // Re-throwing might trigger 'processing_error' event depending on `sqs-consumer` version/handling.
      // throw error; // Or handle specific errors without re-throwing if appropriate
    }
  }

  private async deleteMessageFromQueue(receiptHandle: string): Promise<void> {
    try {
      const deleteCommand = new DeleteMessageCommand({
        QueueUrl: this.queueUrl,
        ReceiptHandle: receiptHandle,
      });
      await this.sqsClient.send(deleteCommand);
    } catch (error) {
      this.logger.error(`Failed to delete message with receipt handle ${receiptHandle.substring(0, 10)}... from queue ${this.queueUrl}: ${error.message}`, error.stack);
      // This is problematic - the message was processed but not deleted.
      // It will likely be processed again after the visibility timeout.
      // Idempotency check should prevent duplicate *actions*, but logging this error is crucial.
      // Consider adding monitoring/alerting for delete failures.
    }
  }

  onModuleDestroy() {
    this.consumer?.stop();
    this.sqsClient?.destroy();
    this.logger.log('SQS Consumer and Client Destroyed');
  }
}
  • The service initializes sqs-consumer with the SQS client and configuration.
  • shouldDeleteMessages: false is crucial; we manually delete messages only after successful processing and recording.
  • handleMessage calls processMessage.
  • processMessage implements the core logic:
    1. Idempotency check using ProcessedMessageDao. If already processed, delete and return.
    2. Parse the message body, attempting to handle raw vs. non-raw SNS delivery.
    3. Placeholder for your actual business logic.
    4. Record the message ID in the database using ProcessedMessageDao. If it returns null (concurrent processing), return without deleting.
    5. Delete the message from SQS using deleteMessageFromQueue.
  • Error handling is included for consumer events and within processMessage. Failed processing prevents deletion, allowing SQS retries (and eventually DLQ if configured).
  • deleteMessageFromQueue encapsulates the DeleteMessageCommand.
  • OnModuleDestroy stops the consumer gracefully.

Step 3: Create SQS Module

Define the module for SQS components.

typescript
// src/sqs/sqs.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config'; // Import if not global
import { TypeOrmModule } from '@nestjs/typeorm'; // Needed for DAO injection
import { ProcessedMessage } from '../database/entities/processed-message.entity';
import { ProcessedMessageDao } from '../database/daos/processed-message.dao';
import { SqsConsumerService } from './sqs.consumer.service';

@Module({
  imports: [
    ConfigModule, // Only needed if ConfigModule isn't global
    TypeOrmModule.forFeature([ProcessedMessage]), // Make repository available for DAO
  ],
  providers: [SqsConsumerService, ProcessedMessageDao], // Provide Consumer and DAO
  exports: [SqsConsumerService], // Export if needed elsewhere
})
export class SqsModule {}
  • This module imports TypeOrmModule.forFeature to make the ProcessedMessage repository available for injection into ProcessedMessageDao.
  • It provides both the SqsConsumerService and the ProcessedMessageDao.

Step 4: Import SQS Module into AppModule

Finally, add SqsModule to the imports array in src/app.module.ts. Ensure ProcessedMessageDao is provided/exported correctly (it's provided in AppModule and exported, and also provided in SqsModule - providing it in SqsModule is sufficient if it's only used there, or keep it exported from AppModule if used elsewhere too. Let's adjust AppModule slightly assuming DAO is primarily used by SQS).

typescript
// src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ProcessedMessage } from './database/entities/processed-message.entity';
// Remove DAO import/providers/exports if handled solely by SqsModule
// import { ProcessedMessageDao } from './database/daos/processed-message.dao';
import { SnsModule } from './sns/sns.module';
import { SqsModule } from './sqs/sqs.module'; // Import SQS module

@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true,
      envFilePath: '.env',
    }),
    TypeOrmModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: (configService: ConfigService) => ({
        type: 'sqlite',
        database: 'processed_messages.db',
        entities: [ProcessedMessage],
        synchronize: configService.get<string>('NODE_ENV') !== 'production', // Safer default
        logging: configService.get<string>('NODE_ENV') !== 'production',
      }),
      inject: [ConfigService],
    }),
    // No need for TypeOrmModule.forFeature here if DAO handles it internally
    SnsModule,
    SqsModule, // Add SqsModule
  ],
  controllers: [AppController],
  providers: [AppService], // DAO is now managed within SqsModule
  // exports: [], // No need to export DAO from here anymore
})
export class AppModule {}
  • We added SqsModule to the imports.
  • Removed ProcessedMessageDao from AppModule's providers/exports and TypeOrmModule.forFeature from its imports, as SqsModule now handles providing the DAO and importing the necessary feature module.
  • Adjusted synchronize default to be safer.

7. Running and Testing

  1. Start the Application:

    bash
    npm run start:dev

    This will start the NestJS application, initialize the SNS client, configure TypeORM (and create the SQLite file/table if synchronize: true), and start the SQS consumer polling.

  2. Trigger Message Publishing: Use a tool like curl, Postman, or Insomnia to send a POST request to the example endpoint (if you created SnsController):

    bash
    curl -X POST http://localhost:3000/messages/publish \
    -H "Content-Type: application/json" \
    -d '{
      "message": "Send welcome email to user@example.com",
      "type": "notification",
      "userId": "user-123"
    }'

    You should receive a 202 Accepted response with a message ID.

  3. Observe Logs:

    • Check the NestJS application logs. You should see:
      • Log message from SnsPublisherService indicating successful publishing to SNS.
      • Log messages from SqsConsumerService indicating:
        • Receipt of the message from SQS.
        • Processing the message (your business logic logs).
        • Recording the message ID in the database (ProcessedMessageDao).
        • Deleting the message from SQS.
  4. Verify Idempotency:

    • Send the exact same POST request again.
    • Observe the logs. This time, the SqsConsumerService should log a warning like "Message X already processed. Deleting from queue." and it should not execute your business logic again. It should still delete the duplicate message from the queue.
  5. Check Database:

    • Use a SQLite browser tool to inspect the processed_messages.db file. You should see entries for each successfully processed unique message ID.

8. Further Considerations and Best Practices

  • Dead Letter Queues (DLQ): Configure a DLQ on your SQS queue. Messages that fail processing repeatedly (exceeding the maxReceiveCount) will be sent to the DLQ for investigation, preventing them from blocking the main queue indefinitely.
  • Error Handling & Retries: Implement more robust error handling within your processMessage logic. Decide which errors are transient (and should be retried by letting the message visibility timeout expire) and which are permanent (and should potentially lead to moving the message to the DLQ immediately or logging a critical error). The sqs-consumer library has options for handling processing errors.
  • Monitoring & Alerting: Set up AWS CloudWatch Alarms based on SQS metrics (e.g., ApproximateNumberOfMessagesVisible, ApproximateAgeOfOldestMessage) and DLQ metrics (ApproximateNumberOfMessagesVisible on the DLQ). Monitor application logs for errors (especially message deletion failures).
  • Configuration Management: For production, use a more robust configuration system than just .env (e.g., AWS Systems Manager Parameter Store, AWS Secrets Manager, or environment variables injected by your deployment platform).
  • Structured Logging: Use JSON-based logging for easier parsing and analysis in log aggregation tools (like CloudWatch Logs Insights, Datadog, Splunk).
  • FIFO Queues: If strict ordering or exactly-once processing (within the bounds of SQS FIFO guarantees) is required, use SNS FIFO topics and SQS FIFO queues. Remember to provide MessageGroupId and potentially MessageDeduplicationId when publishing. Idempotency checks are still recommended even with FIFO.
  • Scalability: Monitor queue depth and CPU/Memory utilization of your consumer instances. Scale the number of consumer instances (e.g., in ECS, Kubernetes, or EC2 Auto Scaling Groups) based on these metrics. sqs-consumer's batchSize also affects concurrency within a single instance.
  • Security: Follow the principle of least privilege for IAM permissions. Ensure your AWS credentials are handled securely (never hardcode them, use IAM roles for EC2/ECS/Lambda where possible).
  • TypeORM Migrations: Disable synchronize: true in production and use TypeORM migrations to manage database schema changes safely and controllably.
  • Testing: Write unit tests for your services (Publisher, Consumer, DAO) and integration tests that mock AWS services or use tools like localstack to simulate the AWS environment locally.

This guide provides a solid foundation for building reliable asynchronous processing systems with NestJS and AWS. Remember to adapt the patterns and implement robust error handling and monitoring for your specific production needs.

Frequently Asked Questions

How to build asynchronous tasks with NestJS?

Build asynchronous tasks in NestJS by integrating with AWS SNS and SQS to decouple task initiation and execution. Create a NestJS application to publish messages to an SNS topic. An SQS queue, subscribed to the SNS topic, buffers these messages. A NestJS consumer service then processes messages from the SQS queue and tracks them in a database like SQLite for idempotency.

What is AWS SNS used for in NestJS async tasks?

AWS SNS (Simple Notification Service) acts as a managed pub/sub messaging service in a NestJS asynchronous task setup. It's the central point where messages (events or commands) are published by the NestJS application. SNS then reliably distributes these messages to any subscribed services, like an SQS queue.

Why use AWS SQS with NestJS for asynchronous tasks?

AWS SQS (Simple Queue Service) offers a durable message buffer and scalability for asynchronous NestJS tasks. It subscribes to the SNS topic and stores incoming messages reliably until they're processed by a consumer service. SQS allows you to scale your consumer instances independently from the main application, managing large volumes of messages effectively.

How to handle message idempotency in SQS consumer?

Handle idempotency in your SQS consumer by tracking processed message IDs in a database, such as SQLite. Before processing a message, check if its ID exists in the database. If it does, the message has already been processed, so you can skip it or delete it from the SQS queue without re-processing. This prevents duplicate actions if SQS delivers a message multiple times.

How to publish a message to SNS from NestJS?

Use the AWS SDK for JavaScript v3 within a NestJS service to publish messages to SNS. Create an SNSClient, configure it with your AWS credentials and region, then use the PublishCommand to send messages to your SNS topic ARN. This allows your application to trigger events or commands asynchronously without blocking the main thread.

How to configure a NestJS project for AWS SNS and SQS?

Configure your NestJS project for AWS by installing necessary dependencies like '@aws-sdk/client-sns', '@aws-sdk/client-sqs', 'sqs-consumer', '@nestjs/config', and 'uuid'. You'll need an AWS account with credentials and to create an SNS topic and SQS queue, subscribing the queue to the topic and granting SNS permission to send messages to the queue. Securely store AWS credentials in a '.env' file and load them using NestJS's ConfigModule.

What is the purpose of sqs-consumer library?

The 'sqs-consumer' library simplifies polling and processing messages from an SQS queue in your NestJS application. It provides a clean API to handle message retrieval, error management (timeouts, processing errors), and efficient long polling. This abstracts away the complexities of direct SQS interactions, making your consumer logic more focused on message processing.

What database is used for idempotency in this example?

A simple SQLite database with TypeORM is used in this example to track processed message IDs and ensure idempotency. An entity 'ProcessedMessage' is created to store the message ID, body, and timestamp. TypeORM's features for database interactions and schema management streamline this process.

How are message duplicates managed in this system?

The example uses a ProcessedMessageDao, which keeps a record of handled message IDs. When a message is taken from SQS, the system checks if it's already been processed. If so, it logs the duplication and deletes the message from the queue, preventing duplicate processing and ensuring idempotency.

What should the visibility timeout be set to?

Visibility timeout, a key SQS parameter, should be set to a value greater than or equal to your expected maximum message processing time. For example, if your handleMessage function might take up to 60 seconds, visibilityTimeout should be at least 60. This setting prevents a message from reappearing in the queue before processing completes.

When to use FIFO queues with NestJS and AWS?

Use FIFO (First-In, First-Out) queues with NestJS and AWS when strict message ordering and exactly-once processing are critical for your application logic. Scenarios like financial transactions, order processing, or any situation requiring guaranteed sequential handling would benefit from FIFO queues. Remember that FIFO queues have lower throughput compared to standard queues.

How to scale the SQS consumer for higher throughput?

Scale the SQS consumer by increasing the number of consumer service instances, adjusting sqs-consumer's batchSize parameter (increasing concurrent message processing within an instance), or by tuning visibilityTimeout to optimize message handling efficiency. Monitor SQS metrics like ApproximateNumberOfMessagesVisible and consumer resource usage to guide scaling decisions.

What are some best practices for handling errors in SQS consumers?

Implement robust error handling by catching exceptions within the message processing logic. Classify errors as transient (retryable) or permanent. Use retries with exponential backoff for transient errors. Configure a Dead Letter Queue (DLQ) to capture messages that repeatedly fail processing. Log errors comprehensively and set up monitoring and alerts for critical failures.