Skip to main content

AWS SQS Setup Guide

This document describes the AWS SQS (Simple Queue Service) architecture for asynchronous communication between tellus-ehs-hazcom-service (backend API) and tellus-ehs-background-service (background job processor).

Overview

The platform uses SQS Standard Queues for reliable, asynchronous task processing. The backend API publishes messages to queues, and the background service consumes and processes them.

┌─────────────────────┐         ┌─────────────┐         ┌─────────────────────────┐
│ Backend API │ ───▶ │ SQS Queue │ ───▶ │ Background Service │
│ (hazcom-service) │ publish │ │ poll │ (background-service) │
└─────────────────────┘ └─────────────┘ └─────────────────────────┘

▼ (on failure)
┌─────────────┐
│ Dead Letter│
│ Queue (DLQ)│
└─────────────┘

Why SQS (Not SNS)?

FactorSQSSNS
PatternPoint-to-point queuePub/Sub fan-out
Message PersistenceYes (up to 14 days)No (fire-and-forget)
Retry on FailureAutomatic with visibility timeoutLimited
Consumer ModelPull (consumer controls rate)Push (subscriber receives immediately)
Dead Letter QueueBuilt-inRequires SQS subscription
Use Case FitBackground job processingEvent broadcasting

Decision: SQS is the right choice for backend ↔ background service communication because we need guaranteed delivery, retry handling, and rate control.

Queue Architecture

Queue Naming Convention

tellus-ehs-{purpose}-{env}
tellus-ehs-{purpose}-dlq-{env} (Dead Letter Queue)

Queues Per Environment

EnvironmentMain QueueDead Letter QueueRegion
Developmenttellus-ehs-jobs-devtellus-ehs-jobs-dlq-devus-west-2
Test/Integrationtellus-ehs-jobs-testtellus-ehs-jobs-dlq-testus-west-2
Productiontellus-ehs-jobs-prodtellus-ehs-jobs-dlq-produs-west-2

AWS Account ID: 518086992566

Queue Configuration

Main Queue Settings

SettingValueRationale
Queue TypeStandardOrder not critical, higher throughput
Visibility Timeout300 seconds (5 min)Allows time for job processing
Message Retention7 daysRecovery window for failures
Receive Wait Time20 secondsLong polling reduces costs
Max Message Size256 KBStandard limit
Delivery Delay0 secondsImmediate processing

Dead Letter Queue Settings

SettingValueRationale
Max Receive Count3Retry 3 times before DLQ
Message Retention14 daysExtended time for investigation

Message Schema

All messages follow a standard envelope format:

{
"message_id": "uuid-v4",
"message_type": "sds_parse",
"version": "1.0",
"timestamp": "2024-01-15T10:30:00Z",
"correlation_id": "request-trace-id",
"payload": {
// Job-specific data
},
"metadata": {
"company_id": "uuid",
"user_id": "uuid",
"priority": "normal",
"retry_count": 0
}
}

Message Types

Message TypeDescriptionPriority
sds_parseParse uploaded SDS documentnormal
sds_searchSearch for SDS via external APIsnormal
report_generateGenerate compliance reportnormal
email_sendSend transactional emailhigh
bulk_importProcess bulk inventory importlow
cleanupCleanup temporary fileslow
notification_sendSend in-app notificationnormal

AWS Setup Instructions

Prerequisites

  • AWS CLI installed and configured
  • Appropriate IAM permissions
  • Access to AWS Console or CLI

Step 1: Create Dead Letter Queue (DLQ)

Create the DLQ first since the main queue references it.

AWS CLI

# Development
aws sqs create-queue \
--queue-name tellus-ehs-jobs-dlq-dev \
--attributes '{
"MessageRetentionPeriod": "1209600",
"VisibilityTimeout": "300"
}' \
--tags 'Environment=dev,Project=tellus-ehs,Purpose=dead-letter'

# Test
aws sqs create-queue \
--queue-name tellus-ehs-jobs-dlq-test \
--attributes '{
"MessageRetentionPeriod": "1209600",
"VisibilityTimeout": "300"
}' \
--tags 'Environment=test,Project=tellus-ehs,Purpose=dead-letter'

# Production
aws sqs create-queue \
--queue-name tellus-ehs-jobs-dlq-prod \
--attributes '{
"MessageRetentionPeriod": "1209600",
"VisibilityTimeout": "300"
}' \
--tags 'Environment=prod,Project=tellus-ehs,Purpose=dead-letter'

Step 2: Get DLQ ARN

# Get the ARN for the DLQ (needed for main queue)
aws sqs get-queue-attributes \
--queue-url https://sqs.{region}.amazonaws.com/{account-id}/tellus-ehs-jobs-dlq-dev \
--attribute-names QueueArn

Step 3: Create Main Queue

# Development (replace {dlq-arn} with actual ARN)
aws sqs create-queue \
--queue-name tellus-ehs-jobs-dev \
--attributes '{
"MessageRetentionPeriod": "604800",
"VisibilityTimeout": "300",
"ReceiveMessageWaitTimeSeconds": "20",
"RedrivePolicy": "{\"deadLetterTargetArn\":\"{dlq-arn}\",\"maxReceiveCount\":\"3\"}"
}' \
--tags 'Environment=dev,Project=tellus-ehs,Purpose=job-queue'

# Test (replace {dlq-arn})
aws sqs create-queue \
--queue-name tellus-ehs-jobs-test \
--attributes '{
"MessageRetentionPeriod": "604800",
"VisibilityTimeout": "300",
"ReceiveMessageWaitTimeSeconds": "20",
"RedrivePolicy": "{\"deadLetterTargetArn\":\"{dlq-arn}\",\"maxReceiveCount\":\"3\"}"
}' \
--tags 'Environment=test,Project=tellus-ehs,Purpose=job-queue'

# Production (replace {dlq-arn})
aws sqs create-queue \
--queue-name tellus-ehs-jobs-prod \
--attributes '{
"MessageRetentionPeriod": "604800",
"VisibilityTimeout": "300",
"ReceiveMessageWaitTimeSeconds": "20",
"RedrivePolicy": "{\"deadLetterTargetArn\":\"{dlq-arn}\",\"maxReceiveCount\":\"3\"}"
}' \
--tags 'Environment=prod,Project=tellus-ehs,Purpose=job-queue'

Step 4: Create IAM Policy

Create a policy for the backend and background services:

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "SQSProducerPermissions",
"Effect": "Allow",
"Action": [
"sqs:SendMessage",
"sqs:SendMessageBatch",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes"
],
"Resource": [
"arn:aws:sqs:*:*:tellus-ehs-jobs-*"
]
},
{
"Sid": "SQSConsumerPermissions",
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:DeleteMessageBatch",
"sqs:ChangeMessageVisibility",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes"
],
"Resource": [
"arn:aws:sqs:*:*:tellus-ehs-jobs-*",
"arn:aws:sqs:*:*:tellus-ehs-jobs-dlq-*"
]
}
]
}

Save as tellus-sqs-policy.json and create:

aws iam create-policy \
--policy-name TellusEHSSQSPolicy \
--policy-document file://tellus-sqs-policy.json

AWS Console Setup (Alternative)

  1. Navigate to SQS Console: https://console.aws.amazon.com/sqs/

  2. Create DLQ:

    • Click "Create queue"
    • Type: Standard
    • Name: tellus-ehs-jobs-dlq-{env}
    • Message retention: 14 days
    • Create queue
  3. Create Main Queue:

    • Click "Create queue"
    • Type: Standard
    • Name: tellus-ehs-jobs-{env}
    • Visibility timeout: 5 minutes
    • Message retention: 7 days
    • Receive message wait time: 20 seconds
    • Dead-letter queue: Enable, select DLQ, max receives: 3
    • Create queue

Environment Configuration

Backend Service (.env)

# AWS Configuration
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=your-access-key
AWS_SECRET_ACCESS_KEY=your-secret-key

# SQS Configuration
SQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/{account-id}/tellus-ehs-jobs-dev
SQS_DLQ_URL=https://sqs.us-east-1.amazonaws.com/{account-id}/tellus-ehs-jobs-dlq-dev

# Optional: Use IAM roles instead of access keys (recommended for production)
# AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY not needed if using IAM roles

Background Service (.env)

# AWS Configuration
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=your-access-key
AWS_SECRET_ACCESS_KEY=your-secret-key

# SQS Configuration
SQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/{account-id}/tellus-ehs-jobs-dev
SQS_DLQ_URL=https://sqs.us-east-1.amazonaws.com/{account-id}/tellus-ehs-jobs-dlq-dev

# Consumer Configuration
SQS_MAX_MESSAGES=10
SQS_WAIT_TIME_SECONDS=20
SQS_VISIBILITY_TIMEOUT=300

Python Implementation

Dependencies

Add to requirements.txt:

boto3>=1.34.0

Configuration Module

Create app/core/sqs_config.py:

"""
AWS SQS Configuration for Tellus EHS Platform
"""
import os
from enum import Enum
from pydantic_settings import BaseSettings


class MessageType(str, Enum):
"""Supported message types for the job queue"""
SDS_PARSE = "sds_parse"
SDS_SEARCH = "sds_search"
REPORT_GENERATE = "report_generate"
EMAIL_SEND = "email_send"
BULK_IMPORT = "bulk_import"
CLEANUP = "cleanup"
NOTIFICATION_SEND = "notification_send"


class MessagePriority(str, Enum):
"""Message priority levels"""
HIGH = "high"
NORMAL = "normal"
LOW = "low"


class SQSSettings(BaseSettings):
"""SQS Configuration Settings"""

# AWS Configuration
aws_region: str = "us-east-1"
aws_access_key_id: str | None = None
aws_secret_access_key: str | None = None

# Queue URLs
sqs_queue_url: str
sqs_dlq_url: str | None = None

# Consumer settings
sqs_max_messages: int = 10
sqs_wait_time_seconds: int = 20
sqs_visibility_timeout: int = 300

class Config:
env_file = ".env"
extra = "ignore"


# Global settings instance
sqs_settings = SQSSettings()

SQS Client Service

Create app/services/sqs_service.py:

"""
AWS SQS Service for publishing and consuming messages
"""
import json
import uuid
from datetime import datetime, timezone
from typing import Any

import boto3
from botocore.exceptions import ClientError

from app.core.sqs_config import (
sqs_settings,
MessageType,
MessagePriority
)


class SQSService:
"""Service for interacting with AWS SQS"""

def __init__(self):
self._client = None

@property
def client(self):
"""Lazy initialization of SQS client"""
if self._client is None:
self._client = boto3.client(
'sqs',
region_name=sqs_settings.aws_region,
aws_access_key_id=sqs_settings.aws_access_key_id,
aws_secret_access_key=sqs_settings.aws_secret_access_key
)
return self._client

def publish_message(
self,
message_type: MessageType,
payload: dict[str, Any],
company_id: str | None = None,
user_id: str | None = None,
priority: MessagePriority = MessagePriority.NORMAL,
correlation_id: str | None = None,
delay_seconds: int = 0
) -> str:
"""
Publish a message to the SQS queue.

Args:
message_type: Type of job to process
payload: Job-specific data
company_id: Company context for the job
user_id: User who triggered the job
priority: Message priority level
correlation_id: Request trace ID for logging
delay_seconds: Delay before message becomes visible (0-900)

Returns:
Message ID from SQS

Raises:
SQSPublishError: If message cannot be published
"""
message_id = str(uuid.uuid4())

message = {
"message_id": message_id,
"message_type": message_type.value,
"version": "1.0",
"timestamp": datetime.now(timezone.utc).isoformat(),
"correlation_id": correlation_id or str(uuid.uuid4()),
"payload": payload,
"metadata": {
"company_id": company_id,
"user_id": user_id,
"priority": priority.value,
"retry_count": 0
}
}

try:
response = self.client.send_message(
QueueUrl=sqs_settings.sqs_queue_url,
MessageBody=json.dumps(message),
DelaySeconds=delay_seconds,
MessageAttributes={
"MessageType": {
"DataType": "String",
"StringValue": message_type.value
},
"Priority": {
"DataType": "String",
"StringValue": priority.value
}
}
)
return response["MessageId"]
except ClientError as e:
raise SQSPublishError(f"Failed to publish message: {e}")

def publish_batch(
self,
messages: list[dict[str, Any]]
) -> dict[str, Any]:
"""
Publish multiple messages in a batch (max 10).

Args:
messages: List of message dicts with keys:
- message_type: MessageType
- payload: dict
- company_id: str (optional)
- user_id: str (optional)
- priority: MessagePriority (optional)

Returns:
Dict with 'successful' and 'failed' message IDs
"""
if len(messages) > 10:
raise ValueError("Batch size cannot exceed 10 messages")

entries = []
for i, msg in enumerate(messages):
message_id = str(uuid.uuid4())
message = {
"message_id": message_id,
"message_type": msg["message_type"].value,
"version": "1.0",
"timestamp": datetime.now(timezone.utc).isoformat(),
"correlation_id": msg.get("correlation_id", str(uuid.uuid4())),
"payload": msg["payload"],
"metadata": {
"company_id": msg.get("company_id"),
"user_id": msg.get("user_id"),
"priority": msg.get("priority", MessagePriority.NORMAL).value,
"retry_count": 0
}
}

entries.append({
"Id": str(i),
"MessageBody": json.dumps(message),
"MessageAttributes": {
"MessageType": {
"DataType": "String",
"StringValue": msg["message_type"].value
}
}
})

try:
response = self.client.send_message_batch(
QueueUrl=sqs_settings.sqs_queue_url,
Entries=entries
)
return {
"successful": [r["MessageId"] for r in response.get("Successful", [])],
"failed": [r["Id"] for r in response.get("Failed", [])]
}
except ClientError as e:
raise SQSPublishError(f"Failed to publish batch: {e}")

def receive_messages(
self,
max_messages: int | None = None,
wait_time: int | None = None,
visibility_timeout: int | None = None
) -> list[dict[str, Any]]:
"""
Receive messages from the queue.

Args:
max_messages: Max messages to receive (1-10)
wait_time: Long polling wait time in seconds
visibility_timeout: Override visibility timeout

Returns:
List of messages with 'body', 'receipt_handle', and 'message_id'
"""
try:
response = self.client.receive_message(
QueueUrl=sqs_settings.sqs_queue_url,
MaxNumberOfMessages=max_messages or sqs_settings.sqs_max_messages,
WaitTimeSeconds=wait_time or sqs_settings.sqs_wait_time_seconds,
VisibilityTimeout=visibility_timeout or sqs_settings.sqs_visibility_timeout,
MessageAttributeNames=["All"],
AttributeNames=["ApproximateReceiveCount"]
)

messages = []
for msg in response.get("Messages", []):
body = json.loads(msg["Body"])
# Update retry count from SQS attribute
receive_count = int(msg.get("Attributes", {}).get("ApproximateReceiveCount", 1))
body["metadata"]["retry_count"] = receive_count - 1

messages.append({
"body": body,
"receipt_handle": msg["ReceiptHandle"],
"message_id": msg["MessageId"]
})

return messages
except ClientError as e:
raise SQSReceiveError(f"Failed to receive messages: {e}")

def delete_message(self, receipt_handle: str) -> None:
"""Delete a message after successful processing"""
try:
self.client.delete_message(
QueueUrl=sqs_settings.sqs_queue_url,
ReceiptHandle=receipt_handle
)
except ClientError as e:
raise SQSDeleteError(f"Failed to delete message: {e}")

def extend_visibility(
self,
receipt_handle: str,
visibility_timeout: int
) -> None:
"""Extend visibility timeout for long-running jobs"""
try:
self.client.change_message_visibility(
QueueUrl=sqs_settings.sqs_queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=visibility_timeout
)
except ClientError as e:
raise SQSError(f"Failed to extend visibility: {e}")

def get_queue_stats(self) -> dict[str, int]:
"""Get queue statistics"""
try:
response = self.client.get_queue_attributes(
QueueUrl=sqs_settings.sqs_queue_url,
AttributeNames=[
"ApproximateNumberOfMessages",
"ApproximateNumberOfMessagesNotVisible",
"ApproximateNumberOfMessagesDelayed"
]
)
attrs = response.get("Attributes", {})
return {
"messages_available": int(attrs.get("ApproximateNumberOfMessages", 0)),
"messages_in_flight": int(attrs.get("ApproximateNumberOfMessagesNotVisible", 0)),
"messages_delayed": int(attrs.get("ApproximateNumberOfMessagesDelayed", 0))
}
except ClientError as e:
raise SQSError(f"Failed to get queue stats: {e}")


class SQSError(Exception):
"""Base SQS error"""
pass


class SQSPublishError(SQSError):
"""Error publishing message"""
pass


class SQSReceiveError(SQSError):
"""Error receiving messages"""
pass


class SQSDeleteError(SQSError):
"""Error deleting message"""
pass


# Global service instance
sqs_service = SQSService()

Usage Examples

Publishing Messages (Backend API)

from app.services.sqs_service import sqs_service, MessageType, MessagePriority

# Publish SDS parsing job
sqs_service.publish_message(
message_type=MessageType.SDS_PARSE,
payload={
"sds_id": "uuid-of-sds",
"s3_key": "chemiq/sds/company/abc123/sds456.pdf",
"source": "user_upload"
},
company_id="abc123",
user_id="user456",
priority=MessagePriority.NORMAL
)

# Publish high-priority email
sqs_service.publish_message(
message_type=MessageType.EMAIL_SEND,
payload={
"template": "invitation",
"to": "user@example.com",
"data": {"invite_url": "https://..."}
},
company_id="abc123",
priority=MessagePriority.HIGH
)

# Publish with delay (e.g., scheduled cleanup)
sqs_service.publish_message(
message_type=MessageType.CLEANUP,
payload={"temp_folder": "uploads/abc123/batch001"},
delay_seconds=3600 # 1 hour delay
)

Consuming Messages (Background Service)

import asyncio
from app.services.sqs_service import sqs_service, MessageType

# Message handlers registry
handlers = {
MessageType.SDS_PARSE.value: handle_sds_parse,
MessageType.EMAIL_SEND.value: handle_email_send,
# ... other handlers
}

async def process_messages():
"""Main consumer loop"""
while True:
messages = sqs_service.receive_messages()

for msg in messages:
body = msg["body"]
message_type = body["message_type"]

try:
handler = handlers.get(message_type)
if handler:
await handler(body["payload"], body["metadata"])
sqs_service.delete_message(msg["receipt_handle"])
else:
print(f"Unknown message type: {message_type}")
except Exception as e:
print(f"Error processing message: {e}")
# Message will return to queue after visibility timeout

async def handle_sds_parse(payload: dict, metadata: dict):
"""Handle SDS parsing job"""
sds_id = payload["sds_id"]
s3_key = payload["s3_key"]
company_id = metadata["company_id"]

# Download from S3, parse, update database...
pass

Monitoring and Observability

CloudWatch Metrics

Key metrics to monitor:

MetricDescriptionAlert Threshold
ApproximateNumberOfMessagesVisibleMessages waiting> 1000
ApproximateAgeOfOldestMessageOldest message age> 3600 seconds
NumberOfMessagesSentMessages publishedBaseline + 50%
NumberOfMessagesReceivedMessages consumed-
NumberOfMessagesDeletedSuccessfully processed-
ApproximateNumberOfMessagesNotVisibleIn-flight messages> 100

CloudWatch Alarms

# Alarm for DLQ messages (indicates processing failures)
aws cloudwatch put-metric-alarm \
--alarm-name "TellusEHS-DLQ-Messages-Dev" \
--metric-name ApproximateNumberOfMessagesVisible \
--namespace AWS/SQS \
--statistic Average \
--period 300 \
--threshold 1 \
--comparison-operator GreaterThanThreshold \
--dimensions Name=QueueName,Value=tellus-ehs-jobs-dlq-dev \
--evaluation-periods 1 \
--alarm-actions arn:aws:sns:us-east-1:{account}:alerts

Logging Best Practices

import structlog

logger = structlog.get_logger()

# When publishing
logger.info(
"sqs_message_published",
message_type=message_type,
message_id=message_id,
company_id=company_id,
correlation_id=correlation_id
)

# When consuming
logger.info(
"sqs_message_received",
message_type=body["message_type"],
message_id=body["message_id"],
correlation_id=body["correlation_id"],
retry_count=body["metadata"]["retry_count"]
)

# On error
logger.error(
"sqs_message_processing_failed",
message_type=body["message_type"],
message_id=body["message_id"],
error=str(e),
retry_count=body["metadata"]["retry_count"]
)

Cost Estimation

Pricing (US East Region)

ResourceFree TierAfter Free Tier
API Requests1 million/month$0.40 per million
Data Transfer (same region)UnlimitedFree
Data Transfer (cross-region)-$0.09 per GB

Estimated Monthly Cost

EnvironmentEst. Messages/MonthEst. Cost
Development10,000 - 50,000$0 (free tier)
Test10,000 - 50,000$0 (free tier)
Production100,000 - 500,000$0 - $0.20
Total-$0 - $1/month

Troubleshooting

Common Issues

Messages not being received:

  • Check IAM permissions
  • Verify queue URL is correct
  • Check visibility timeout (message might be in-flight)

Messages going to DLQ:

  • Check handler for exceptions
  • Increase visibility timeout for long jobs
  • Review DLQ messages for error patterns

High message age:

  • Scale up consumers
  • Check for processing bottlenecks
  • Review message volume vs. consumer capacity

Useful CLI Commands

# View queue attributes
aws sqs get-queue-attributes \
--queue-url {queue-url} \
--attribute-names All

# Purge queue (dev/test only!)
aws sqs purge-queue --queue-url {queue-url}

# View DLQ messages
aws sqs receive-message \
--queue-url {dlq-url} \
--max-number-of-messages 10

# Move messages from DLQ back to main queue
aws sqs start-message-move-task \
--source-arn {dlq-arn} \
--destination-arn {main-queue-arn}

Local Development

For local development, you have several options to test the queue-based communication without needing AWS infrastructure.

Development Approaches

ApproachComplexityCostProduction ParityBest For
In-Memory QueueLowFreeMediumQuick local dev, unit tests
LocalStackMediumFreeHighIntegration testing
Dev AWS QueueVery Low~$0HighestFull E2E testing
┌─────────────────────────────────────────────────────────────┐
│ Development Flow │
├─────────────────────────────────────────────────────────────┤
│ │
│ Local Dev (QUEUE_MODE=memory): │
│ → In-memory queue simulates async behavior │
│ → Run both services locally (2 terminals) │
│ → No external dependencies │
│ → Best for day-to-day development │
│ │
│ Integration Testing (QUEUE_MODE=sqs + LocalStack): │
│ → Full SQS emulation via Docker │
│ → Test real SQS API calls │
│ → Catches SQS-specific issues │
│ │
│ E2E Testing (QUEUE_MODE=sqs + Dev AWS): │
│ → Use actual AWS Dev queue │
│ → Highest production parity │
│ → Final validation before deploy │
│ │
└─────────────────────────────────────────────────────────────┘

Simulates async queue behavior without external dependencies.

.env.local:

QUEUE_MODE=memory

Run both services locally:

# Terminal 1: Backend API
cd tellus-ehs-hazcom-service
python -m app.main

# Terminal 2: Background Service
cd tellus-ehs-background-service
python -m app.main

Option 2: LocalStack (Docker)

Full AWS SQS emulation locally via Docker.

docker-compose.yml addition:

services:
localstack:
image: localstack/localstack:latest
ports:
- "4566:4566"
environment:
- SERVICES=sqs
- DEBUG=1
volumes:
- "./localstack-data:/var/lib/localstack"

Setup commands:

# Start LocalStack
docker-compose up localstack -d

# Create local queues
aws --endpoint-url=http://localhost:4566 sqs create-queue \
--queue-name tellus-ehs-jobs-local

aws --endpoint-url=http://localhost:4566 sqs create-queue \
--queue-name tellus-ehs-jobs-dlq-local

# Verify queues created
aws --endpoint-url=http://localhost:4566 sqs list-queues

.env.local:

QUEUE_MODE=sqs
SQS_ENDPOINT_URL=http://localhost:4566
SQS_QUEUE_URL=http://localhost:4566/000000000000/tellus-ehs-jobs-local
SQS_DLQ_URL=http://localhost:4566/000000000000/tellus-ehs-jobs-dlq-local
AWS_ACCESS_KEY_ID=test
AWS_SECRET_ACCESS_KEY=test
AWS_REGION=us-east-1

Option 3: Dev AWS Queue (Current Setup)

Use the actual AWS Dev environment queue.

.env.local:

# SQS Configuration
SQS_QUEUE_URL=https://sqs.us-west-2.amazonaws.com/518086992566/tellus-ehs-jobs-dev
SQS_DLQ_URL=https://sqs.us-west-2.amazonaws.com/518086992566/tellus-ehs-jobs-dlq-dev
AWS_REGION=us-west-2
AWS_ACCESS_KEY_ID=your-dev-access-key
AWS_SECRET_ACCESS_KEY=your-dev-secret-key

Queue URLs by Environment

EnvironmentQueue URL
Devhttps://sqs.us-west-2.amazonaws.com/518086992566/tellus-ehs-jobs-dev
Testhttps://sqs.us-west-2.amazonaws.com/518086992566/tellus-ehs-jobs-test
Prodhttps://sqs.us-west-2.amazonaws.com/518086992566/tellus-ehs-jobs-prod

Queue Abstraction Implementation

To support multiple backends, use an abstracted queue service:

app/core/sqs_config.py (updated):

class SQSSettings(BaseSettings):
"""SQS Configuration Settings"""

# Queue mode: memory (local dev/testing), sqs (LocalStack or AWS)
queue_mode: str = "sqs"

# AWS Configuration
aws_region: str = "us-west-2"
aws_access_key_id: str | None = None
aws_secret_access_key: str | None = None

# Optional: LocalStack endpoint (set for local SQS emulation)
sqs_endpoint_url: str | None = None

# Queue URLs (not required for memory mode)
sqs_queue_url: str | None = None
sqs_dlq_url: str | None = None

# Consumer settings
sqs_max_messages: int = 10
sqs_wait_time_seconds: int = 20
sqs_visibility_timeout: int = 300

class Config:
env_file = ".env"
extra = "ignore"

app/services/queue_service.py:

"""
Abstracted queue service with pluggable backends
"""
import asyncio
import json
from abc import ABC, abstractmethod
from collections import deque
from datetime import datetime, timezone
from typing import Any
import uuid

from app.core.sqs_config import sqs_settings, MessageType, MessagePriority


class QueueBackend(ABC):
"""Abstract queue backend"""

@abstractmethod
async def send_message(self, message: dict, delay_seconds: int = 0) -> str:
pass

@abstractmethod
async def receive_messages(self, max_messages: int = 10) -> list[dict]:
pass

@abstractmethod
async def delete_message(self, receipt_handle: str) -> None:
pass


class InMemoryQueueBackend(QueueBackend):
"""In-memory queue for local development and testing"""

def __init__(self):
self._queue: deque = deque()
self._in_flight: dict[str, dict] = {}
self._lock = asyncio.Lock()

async def send_message(self, message: dict, delay_seconds: int = 0) -> str:
message_id = str(uuid.uuid4())
async with self._lock:
self._queue.append({
"message_id": message_id,
"body": message,
"visible_at": datetime.now(timezone.utc).timestamp() + delay_seconds
})
print(f"[InMemoryQueue] Message sent: {message.get('message_type', 'unknown')}")
return message_id

async def receive_messages(self, max_messages: int = 10) -> list[dict]:
messages = []
now = datetime.now(timezone.utc).timestamp()

async with self._lock:
temp_queue = deque()
while self._queue and len(messages) < max_messages:
msg = self._queue.popleft()
if msg["visible_at"] <= now:
receipt_handle = str(uuid.uuid4())
self._in_flight[receipt_handle] = msg
messages.append({
"message_id": msg["message_id"],
"body": msg["body"],
"receipt_handle": receipt_handle
})
else:
temp_queue.append(msg)

# Put back messages that aren't visible yet
self._queue.extendleft(reversed(temp_queue))

return messages

async def delete_message(self, receipt_handle: str) -> None:
async with self._lock:
self._in_flight.pop(receipt_handle, None)
print(f"[InMemoryQueue] Message deleted: {receipt_handle[:8]}...")

def get_stats(self) -> dict:
return {
"messages_available": len(self._queue),
"messages_in_flight": len(self._in_flight)
}


class SQSQueueBackend(QueueBackend):
"""AWS SQS backend for staging/production"""

def __init__(self):
import boto3

client_kwargs = {
"region_name": sqs_settings.aws_region,
}

# Support LocalStack endpoint
if sqs_settings.sqs_endpoint_url:
client_kwargs["endpoint_url"] = sqs_settings.sqs_endpoint_url

if sqs_settings.aws_access_key_id:
client_kwargs["aws_access_key_id"] = sqs_settings.aws_access_key_id
client_kwargs["aws_secret_access_key"] = sqs_settings.aws_secret_access_key

self._client = boto3.client("sqs", **client_kwargs)
self._queue_url = sqs_settings.sqs_queue_url

async def send_message(self, message: dict, delay_seconds: int = 0) -> str:
response = self._client.send_message(
QueueUrl=self._queue_url,
MessageBody=json.dumps(message),
DelaySeconds=delay_seconds
)
return response["MessageId"]

async def receive_messages(self, max_messages: int = 10) -> list[dict]:
response = self._client.receive_message(
QueueUrl=self._queue_url,
MaxNumberOfMessages=max_messages,
WaitTimeSeconds=sqs_settings.sqs_wait_time_seconds,
VisibilityTimeout=sqs_settings.sqs_visibility_timeout,
MessageAttributeNames=["All"],
AttributeNames=["ApproximateReceiveCount"]
)

messages = []
for msg in response.get("Messages", []):
messages.append({
"message_id": msg["MessageId"],
"body": json.loads(msg["Body"]),
"receipt_handle": msg["ReceiptHandle"]
})
return messages

async def delete_message(self, receipt_handle: str) -> None:
self._client.delete_message(
QueueUrl=self._queue_url,
ReceiptHandle=receipt_handle
)


# Factory function
def get_queue_backend() -> QueueBackend:
"""Get appropriate queue backend based on environment"""
if sqs_settings.queue_mode == "memory":
return InMemoryQueueBackend()
elif sqs_settings.queue_mode == "sqs":
return SQSQueueBackend()
else:
raise ValueError(f"Unknown queue mode: {sqs_settings.queue_mode}")


# Singleton instance
_queue_backend: QueueBackend | None = None


def get_queue() -> QueueBackend:
global _queue_backend
if _queue_backend is None:
_queue_backend = get_queue_backend()
return _queue_backend

app/services/job_dispatcher.py:

"""
Job dispatcher - sends jobs to the queue for background processing
"""
from app.core.sqs_config import MessageType, MessagePriority
from app.services.queue_service import get_queue


class JobDispatcher:
"""Dispatch jobs to queue for async processing by background service"""

async def dispatch(
self,
message_type: MessageType,
payload: dict,
company_id: str | None = None,
user_id: str | None = None,
priority: MessagePriority = MessagePriority.NORMAL,
delay_seconds: int = 0
) -> str:
"""
Send a job to the queue for background processing.

Args:
message_type: Type of job (e.g., SDS_PARSE, EMAIL_SEND)
payload: Job-specific data
company_id: Company context
user_id: User who triggered the job
priority: Message priority level
delay_seconds: Delay before message becomes visible (0-900)

Returns:
Message ID from the queue
"""
queue = get_queue()

message = {
"message_type": message_type.value,
"payload": payload,
"metadata": {
"company_id": company_id,
"user_id": user_id,
"priority": priority.value,
"retry_count": 0
}
}

return await queue.send_message(message, delay_seconds)


# Global dispatcher instance
job_dispatcher = JobDispatcher()

Usage in API Endpoints

from app.services.job_dispatcher import job_dispatcher
from app.core.sqs_config import MessageType

@router.post("/chemicals/{chemical_id}/parse-sds")
async def trigger_sds_parse(
chemical_id: str,
current_user: User = Depends(get_current_user)
):
# Sends to queue - background service processes it
await job_dispatcher.dispatch(
message_type=MessageType.SDS_PARSE,
payload={"chemical_id": chemical_id},
company_id=str(current_user.company_id),
user_id=str(current_user.id)
)
return {"status": "processing"}

Testing Utilities

tests/conftest.py:

import pytest
import app.services.queue_service as queue_module
from app.services.queue_service import InMemoryQueueBackend

@pytest.fixture
def memory_queue():
"""Provide a fresh in-memory queue for testing"""
queue = InMemoryQueueBackend()
queue_module._queue_backend = queue
yield queue
queue_module._queue_backend = None

Example test:

async def test_sds_parse_job_queued(memory_queue, client):
# Trigger the endpoint
response = await client.post("/api/v1/chemicals/123/parse-sds")
assert response.status_code == 200

# Verify message was queued
messages = await memory_queue.receive_messages()
assert len(messages) == 1
assert messages[0]["body"]["message_type"] == "sds_parse"
assert messages[0]["body"]["payload"]["chemical_id"] == "123"

Revision History

DateVersionChanges
2024-01-151.0Initial document