Developer Console

Catalog API Process SNS Message

Summary

The process SNS message code sample allows you to process the catalog upload result message. Please follow your organization process to prepare the code for production release.

Process SNS message code

You can use the following examples to process SNS messages from catalog uploads using AWS Lambda

import json
import boto3
import urllib3
import urllib.parse
from datetime import datetime
import os

def handler(event, context):
try:
# Get the SQS message body
sqs_body = event['Records'][0]['body']

        # Parse the SNS message from the SQS body
        sns_message = json.loads(sqs_body)
        
        # Parse the actual message content from SNS
        message_json = json.loads(sns_message['Message'])
        ingestion_id = message_json['ingestionId']
        download_link = message_json['downloadLink']
        
        # Initialize S3 client
        s3_client = boto3.client('s3')

        # Parse the S3 URL
        parsed_url = urllib.parse.urlparse(download_link)
        source_bucket = parsed_url.netloc.split('.')[0]
        source_key = parsed_url.path.lstrip('/')

        # Download file from S3
        response = s3_client.get_object(
            Bucket=source_bucket,
            Key=source_key
        )
        file_content = response['Body'].read()

        # Generate S3 key with timestamp
        timestamp = datetime.now().strftime('%Y/%m/%d/%H/%M')
        file_name = download_link.split('/')[-1].split('?')[0]
        s3_key = f"{timestamp}/{ingestion_id}/{file_name}"

        # Upload to destination S3
        bucket_name = os.environ.get('DESTINATION_BUCKET')
        s3_client.put_object(
            Bucket=bucket_name,
            Key=s3_key,
            Body=file_content
        )

        return {
            'statusCode': 200,
            'body': json.dumps({
                'ingestionId': ingestion_id,
                'downloadLink': download_link,
                's3Location': f"s3://{bucket_name}/{s3_key}"
            })
        }

    except Exception as e:
        print(f"Error: {str(e)}")
        raise

Example Unit Tests

The following example shows how to create unit tests for the SNS message processing:


import pytest
import json
from unittest.mock import patch, MagicMock
import urllib3
from botocore.exceptions import ClientError

# Import the handler function from your Lambda function file
from asset_catalog_api_solution.lambda_functions.process_catalog_message_lambda_function import handler

#----------------------------------------------------------------------------
# Fixtures
#----------------------------------------------------------------------------

@pytest.fixture
def valid_event():
    """Fixture for a valid event"""
    return {
        'Records': [{
            'body': json.dumps({
                'Message': json.dumps({
                    'ingestionId': 'test-123',
                    'downloadLink': 'https://example.com/test.csv'
                })
            })
        }]
    }

@pytest.fixture
def mock_urllib3():
    """Fixture to mock urllib3 PoolManager"""
    with patch('urllib3.PoolManager') as mock:
        mock_response = MagicMock()
        mock_response.status = 200
        mock_response.data = b"test data"
        mock_response.headers = {"Content-Type": "text/csv"}
        mock.return_value.request.return_value = mock_response
        yield mock

@pytest.fixture
def mock_s3_client():
    """Fixture to mock boto3 S3 client"""
    with patch('boto3.client') as mock:
        mock_client = MagicMock()
        mock.return_value = mock_client
        yield mock_client

@pytest.fixture
def mock_datetime():
    """Fixture to mock datetime"""
    with patch('datetime.datetime') as mock:
        mock_date = MagicMock()
        mock_date.now.return_value.strftime.return_value = "2023/01/01/12/00"
        mock.now.return_value = mock_date.now.return_value
        yield mock_date

#----------------------------------------------------------------------------
# Tests
#----------------------------------------------------------------------------

def test_missing_environment_variable(valid_event, mock_urllib3, mock_s3_client):
    """Test handling of missing environment variable"""
    # Mock all required dependencies except environment variables
    with patch.dict('os.environ', {}, clear=True):
        with patch('boto3.client'):
            with pytest.raises(ValueError) as exc_info:
                handler(valid_event, None)
            assert "DESTINATION_BUCKET environment variable is not set" in str(exc_info.value)

def test_successful_execution(valid_event, mock_urllib3, mock_s3_client, mock_datetime):
    """Test successful execution of the handler"""
    # Set required environment variables
    with patch.dict('os.environ', {'DESTINATION_BUCKET': 'test-bucket'}):
        response = handler(valid_event, None)

        assert response['statusCode'] == 200
        body = json.loads(response['body'])
        assert body['ingestionId'] == 'test-123'
        assert body['downloadLink'] == 'https://example.com/test.csv'
        assert body['s3Location'].startswith('s3://test-bucket/')

        mock_s3_client.put_object.assert_called_once()

def test_missing_records():
    """Test handling of event with missing Records"""
    with patch.dict('os.environ', {'DESTINATION_BUCKET': 'test-bucket'}):
        with pytest.raises(ValueError) as exc_info:
            handler({}, None)
        assert "No Records found in event" in str(exc_info.value)

def test_missing_message():
    """Test handling of event with missing Message"""
    event = {
        'Records': [{
            'body': json.dumps({})
        }]
    }
    with patch.dict('os.environ', {'DESTINATION_BUCKET': 'test-bucket'}):
        with pytest.raises(ValueError) as exc_info:
            handler(event, None)
        assert "No Message field found in SNS body" in str(exc_info.value)

def test_invalid_json_in_message():
    """Test handling of invalid JSON in Message"""
    event = {
        'Records': [{
            'body': 'invalid json'
        }]
    }
    with patch.dict('os.environ', {'DESTINATION_BUCKET': 'test-bucket'}):
        with pytest.raises(json.JSONDecodeError):
            handler(event, None)

def test_missing_required_fields():
    """Test handling of missing required fields in message"""
    event = {
        'Records': [{
            'body': json.dumps({
                'Message': json.dumps({
                    'ingestionId': 'test-123'
                    # missing downloadLink
                })
            })
        }]
    }
    with patch.dict('os.environ', {'DESTINATION_BUCKET': 'test-bucket'}):
        with pytest.raises(KeyError):
            handler(event, None)

def test_http_error(valid_event, mock_urllib3):
    """Test handling of HTTP errors"""
    with patch.dict('os.environ', {'DESTINATION_BUCKET': 'test-bucket'}):
        mock_urllib3.return_value.request.side_effect = urllib3.exceptions.HTTPError("HTTP Error")

        with pytest.raises(urllib3.exceptions.HTTPError):
            handler(valid_event, None)

def test_non_200_response(valid_event, mock_urllib3):
    """Test handling of non-200 HTTP response"""
    with patch.dict('os.environ', {'DESTINATION_BUCKET': 'test-bucket'}):
        mock_response = MagicMock()
        mock_response.status = 404
        mock_response.reason = "Not Found"
        mock_response.data = b"Not Found"
        mock_urllib3.return_value.request.return_value = mock_response

        with pytest.raises(Exception) as exc_info:
            handler(valid_event, None)
        assert "Failed to download file" in str(exc_info.value)

def test_s3_error(valid_event, mock_urllib3, mock_s3_client):
    """Test handling of S3 upload errors"""
    with patch.dict('os.environ', {'DESTINATION_BUCKET': 'test-bucket'}):
        mock_s3_client.put_object.side_effect = ClientError(
            {'Error': {'Code': 'InvalidAccessKeyId', 'Message': 'Invalid Access Key'}},
            'PutObject'
        )

        with pytest.raises(ClientError):
            handler(valid_event, None)

def test_long_filename_truncation(valid_event, mock_urllib3, mock_s3_client, mock_datetime):
    """Test handling of long filenames that exceed MAX_KEY_LENGTH"""
    with patch.dict('os.environ', {'DESTINATION_BUCKET': 'test-bucket'}):
        very_long_filename = "a" * 2000
        event = {
            'Records': [{
                'body': json.dumps({
                    'Message': json.dumps({
                        'ingestionId': 'test-123',
                        'downloadLink': f'https://example.com/{very_long_filename}.csv'
                    })
                })
            }]
        }

        response = handler(event, None)
        assert response['statusCode'] == 200

        s3_call_args = mock_s3_client.put_object.call_args[1]
        assert len(s3_call_args['Key'].encode('utf-8')) <= 1024

Example Response

A successful response looks like the following:

    {
        "statusCode": 200,
        "body": {
            "ingestionId": "test-123",
            "downloadLink": "https://example.com/test.csv",
            "s3Location": "s3://test-bucket/2023/01/01/12/00/test-123/test.csv"
        }
    }

Additional Information

  • The function requires a DESTINATION_BUCKET environment variable
  • Maximum S3 key length is 1024 bytes
  • The function handles file downloads and uploads to S3
  • Includes error handling for S3 operations and JSON parsing