Developer Console
感谢您的访问。此页面目前仅提供英语版本。我们正在开发中文版本。谢谢您的理解。

Create Purchases API

Python Code


"""
Cart Processing Lambda Function
Handles cart validation, processing, and error handling with API integration
"""

import json
from typing import Dict, Any, Tuple, List, Optional
import os
import re
import logging
import boto3
import uuid
import time
import aiohttp
import asyncio
from datetime import datetime
from botocore.exceptions import ClientError

# Initialize logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Custom Exceptions
class EnvironmentError(Exception):
    """Custom exception for environment configuration errors"""
    pass

class ValidationError(Exception):
    """Custom exception for data validation errors"""
    pass

class ProductValidationError(Exception):
    """Custom exception for product validation errors"""
    pass

class DuplicateRequestError(Exception):
    """Custom exception for duplicate request errors"""
    pass

# Configuration management
def load_config() -> Dict:
    """Load and validate configuration from environment variables"""
    required_vars = [
        'identityInfoTable',
        'cartStepFunction',
        'badCartDataQueue',
        'emptyCartMessageQueue',
        'invalidItemsQueue',
        'duplicateRequestQueue',
        'PRODUCT_API_ENDPOINT',
        'role_name',
        'LogGroupName',
        'LogStreamName'
    ]

    missing_vars = [var for var in required_vars if not os.environ.get(var)]
    if missing_vars:
        raise EnvironmentError(f"Missing environment variables: {', '.join(missing_vars)}")

    return {
        'max_retries': int(os.environ.get('MAX_RETRIES', 3)),
        'initial_delay': float(os.environ.get('INITIAL_DELAY', 1.0)),
        'api_timeout': int(os.environ.get('API_TIMEOUT', 30)),
        **{var.lower(): os.environ[var] for var in required_vars}
    }

# Initialize AWS clients and config
config = load_config()
dynamodb = boto3.resource('dynamodb')
sqs = boto3.client('sqs')
cloudwatch = boto3.client('cloudwatch-logs')
table = dynamodb.Table(config['identityinfotable'])

def retry_with_backoff(func: callable, *args, **kwargs) -> Any:
    """Execute a function with exponential backoff retry logic"""
    for attempt in range(config['max_retries']):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            if attempt == config['max_retries'] - 1:
                logger.error(f"Final retry attempt failed: {str(e)}")
                raise
            delay = config['initial_delay'] * (2 ** attempt)
            logger.warning(f"Retry attempt {attempt + 1} failed. Retrying in {delay} seconds")
            time.sleep(delay)

def extract_cart_data(event: Dict) -> Dict:
    """Extract cart data from the event"""
    try:
        if isinstance(event.get('body'), str):
            return json.loads(event['body'])
        return event['body']
    except (KeyError, json.JSONDecodeError) as e:
        raise ValidationError(f"Failed to extract cart data: {str(e)}")

def validate_cart_structure(cart_data: Dict) -> Tuple[bool, str]:
    """Validate basic cart structure"""
    required_fields = [
        'idempotentShoppingTripId',
        'storeId',
        'shoppingTrip',
        'cartItems'
    ]

    for field in required_fields:
        if field not in cart_data:
            return False, f"Missing required field: {field}"

    trip_id = cart_data['idempotentShoppingTripId']
    if not re.match(r'^[A-Za-z0-9-]+$', trip_id):
        return False, "Invalid shopping trip ID format"

    return True, ""

async def check_duplicate_request(trip_id: str) -> bool:
    """Check for duplicate request based on shopping trip ID"""
    try:
        loop = asyncio.get_event_loop()
        response = await loop.run_in_executor(
            None,
            lambda: retry_with_backoff(
                table.get_item,
                Key={'shopping_trip_id': trip_id}
            )
        )
        return 'Item' in response
    except ClientError as e:
        logger.error(f"Error checking for duplicate request: {str(e)}")
        raise

async def validate_product(item: Dict, session: aiohttp.ClientSession) -> bool:
    """Validate single product against API"""
    try:
        async with session.get(
                f"{config['product_api_endpoint']}/products/{item['id']}",
                timeout=config['api_timeout']
        ) as response:
            if response.status != 200:
                return False
            product_data = await response.json()
            return validate_product_data(item, product_data)
    except Exception as e:
        logger.error(f"Product validation error for {item['id']}: {str(e)}")
        return False

def validate_product_data(cart_item: Dict, product_data: Dict) -> bool:
    """Validate product data against cart item"""
    # Add your product validation logic here
    return True

async def validate_cart_items(cart_items: List[Dict]) -> Tuple[bool, List[str]]:
    """Validate cart items against API"""
    if not cart_items:
        return True, []

    async with aiohttp.ClientSession() as session:
        tasks = [validate_product(item, session) for item in cart_items]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    invalid_items = [
        item['id'] for item, result in zip(cart_items, results)
        if isinstance(result, Exception) or not result
    ]

    return len(invalid_items) == 0, invalid_items

def validate_auth_events(shopping_trip: Dict) -> bool:
    """Validate authentication events"""
    return bool(shopping_trip.get('authEvents', []))

def send_to_sqs(queue_url: str, message: Any) -> None:
    """Send message to SQS queue"""
    try:
        sqs.send_message(
            QueueUrl=queue_url,
            MessageBody=json.dumps(message)
        )
    except Exception as e:
        logger.error(f"Failed to send to SQS {queue_url}: {str(e)}")
        raise

def log_to_cloudwatch(message: str) -> None:
    """Send log to CloudWatch"""
    try:
        cloudwatch.put_log_events(
            logGroupName=config['loggroupname'],
            logStreamName=config['logstreamname'],
            logEvents=[{
                'timestamp': int(time.time() * 1000),
                'message': message
            }]
        )
    except Exception as e:
        logger.error(f"CloudWatch logging error: {str(e)}")
        raise

def log_inventory(cart_items: List[Dict]) -> None:
    """Log inventory usage"""
    for item in cart_items:
        try:
            message = (
                f"item_sku={item['id']},"
                f"item_quantity={item['quantity']['value']},"
                f"item_unit={item['quantity']['unit']}"
            )
            log_to_cloudwatch(message)
        except Exception as e:
            logger.error(f"Failed to log inventory item: {str(e)}")

def generate_response(status_code: int, body: Dict) -> Dict:
    """Generate API response"""
    return {
        'statusCode': status_code,
        'headers': {'Content-Type': 'application/json'},
        'body': json.dumps(body)
    }

def handle_error(error: Exception) -> Dict:
    """Handle processing errors"""
    error_mappings = {
        ValidationError: ('ValidationError', 400),
        ProductValidationError: ('ProductValidationError', 400),
        DuplicateRequestError: ('DuplicateRequestError', 409),
        json.JSONDecodeError: ('JSONDecodeError', 400),
        ClientError: ('AWSError', 500)
    }

    error_type, status_code = error_mappings.get(
        type(error),
        ('InternalError', 500)
    )

    logger.error(f"{error_type}: {str(error)}")
    return generate_response(status_code, {
        'errorType': error_type,
        'errorMessage': str(error),
        'timestamp': datetime.utcnow().isoformat()
    })

async def process_cart(cart_data: Dict) -> Dict:
    """Process cart data"""
    try:
        # Validate cart structure
        is_valid, validation_error = validate_cart_structure(cart_data)
        if not is_valid:
            raise ValidationError(validation_error)

        # Check for duplicate request
        trip_id = cart_data['idempotentShoppingTripId']
        if await check_duplicate_request(trip_id):
            send_to_sqs(config['duplicaterequestqueue'], cart_data)
            raise DuplicateRequestError(f"Duplicate request for shopping trip ID: {trip_id}")

        # Add request tracking
        cart_data['metadata'] = {
            'processedTimestamp': datetime.utcnow().isoformat(),
            'requestId': str(uuid.uuid4()),
            'processingVersion': '2.0'
        }

        # Validate cart items
        cart_items = cart_data.get('cartItems', [])
        items_valid, invalid_items = await validate_cart_items(cart_items)
        if not items_valid:
            send_to_sqs(config['invaliditemsqueue'], {
                'cart_data': cart_data,
                'invalid_items': invalid_items,
                'timestamp': datetime.utcnow().isoformat()
            })
            raise ProductValidationError(f"Invalid products: {', '.join(invalid_items)}")

        # Validate auth events
        if not validate_auth_events(cart_data['shoppingTrip']):
            send_to_sqs(config['badcartdataqueue'], cart_data)
            raise ValidationError("Empty authEvent value in the incoming cart")

        # Handle empty cart
        if not cart_items:
            logger.info(f"Empty cart detected: {trip_id}")
            send_to_sqs(config['emptycartmessagequeue'], cart_data)

        # Log inventory
        log_inventory(cart_items)

        return generate_response(200, {
            'purchaseId': trip_id.partition("-")[0],
            'requestId': cart_data['metadata']['requestId'],
            'processedTimestamp': cart_data['metadata']['processedTimestamp']
        })

    except Exception as e:
        return handle_error(e)

async def lambda_handler(event: Dict, context: Any) -> Dict:
    """AWS Lambda handler"""
    logger.info(f"Lambda handler started. Event: {json.dumps(event)}")

    try:
        cart_data = extract_cart_data(event)
        return await process_cart(cart_data)
    except Exception as e:
        return handle_error(e)

def handler(event: Dict, context: Any) -> Dict:
    """Wrapper for async lambda handler"""
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(lambda_handler(event, context))




Unit Tests


"""
Unit tests for cart processing Lambda function
"""

import pytest
import json
import asyncio
import boto3
from datetime import datetime
from unittest.mock import Mock, patch, AsyncMock
from botocore.exceptions import ClientError

# Import the main Lambda code (assuming it's in cart_processor.py)
from cart_processor import (
    load_config,
    extract_cart_data,
    validate_cart_structure,
    check_duplicate_request,
    validate_product,
    validate_cart_items,
    validate_auth_events,
    process_cart,
    handler
)

# Test data
VALID_CART_DATA = {
    "idempotentShoppingTripId": "test-123",
    "storeId": "store-1",
    "shoppingTrip": {
        "authEvents": [
            {"type": "auth", "timestamp": "2024-01-01T00:00:00Z"}
        ]
    },
    "cartItems": [
        {
            "id": "item-1",
            "quantity": {"value": 1, "unit": "pieces"}
        }
    ]
}

@pytest.fixture
def mock_env(monkeypatch):
    """Mock environment variables"""
    env_vars = {
        'identityInfoTable': 'test-table',
        'cartStepFunction': 'test-step-function',
        'badCartDataQueue': 'test-bad-cart-queue',
        'emptyCartMessageQueue': 'test-empty-cart-queue',
        'invalidItemsQueue': 'test-invalid-items-queue',
        'duplicateRequestQueue': 'test-duplicate-queue',
        'PRODUCT_API_ENDPOINT': 'http://test-api',
        'role_name': 'test-role',
        'LogGroupName': 'test-log-group',
        'LogStreamName': 'test-log-stream'
    }
    for key, value in env_vars.items():
        monkeypatch.setenv(key, value)
    return env_vars

@pytest.fixture
def mock_aws_clients():
    """Mock AWS clients"""
    with patch('boto3.resource') as mock_resource,
            patch('boto3.client') as mock_client:

        mock_table = Mock()
        mock_resource.return_value.Table.return_value = mock_table

        mock_sqs = Mock()
        mock_cloudwatch = Mock()
        mock_client.side_effect = lambda service: {
            'sqs': mock_sqs,
            'cloudwatch-logs': mock_cloudwatch
        }[service]

        yield {
            'table': mock_table,
            'sqs': mock_sqs,
            'cloudwatch': mock_cloudwatch
        }

class TestConfig:
    """Test configuration loading"""

    def test_load_config_success(self, mock_env):
        """Test successful config loading"""
        config = load_config()
        assert config['identityinfotable'] == 'test-table'
        assert isinstance(config['max_retries'], int)
        assert isinstance(config['initial_delay'], float)

    def test_load_config_missing_vars(self, monkeypatch):
        """Test config loading with missing variables"""
        monkeypatch.delenv('identityInfoTable', raising=False)
        with pytest.raises(EnvironmentError) as exc:
            load_config()
        assert 'Missing environment variables' in str(exc.value)

class TestCartDataExtraction:
    """Test cart data extraction"""

    def test_extract_cart_data_json_string(self):
        """Test extracting cart data from JSON string"""
        event = {'body': json.dumps(VALID_CART_DATA)}
        result = extract_cart_data(event)
        assert result == VALID_CART_DATA

    def test_extract_cart_data_dict(self):
        """Test extracting cart data from dict"""
        event = {'body': VALID_CART_DATA}
        result = extract_cart_data(event)
        assert result == VALID_CART_DATA

    def test_extract_cart_data_invalid_json(self):
        """Test extracting cart data with invalid JSON"""
        event = {'body': 'invalid json'}
        with pytest.raises(ValidationError):
            extract_cart_data(event)

class TestCartValidation:
    """Test cart validation"""

    def test_validate_cart_structure_valid(self):
        """Test valid cart structure"""
        is_valid, error = validate_cart_structure(VALID_CART_DATA)
        assert is_valid
        assert error == ""

    def test_validate_cart_structure_missing_field(self):
        """Test cart structure with missing field"""
        invalid_cart = VALID_CART_DATA.copy()
        del invalid_cart['storeId']
        is_valid, error = validate_cart_structure(invalid_cart)
        assert not is_valid
        assert "Missing required field" in error

    def test_validate_cart_structure_invalid_trip_id(self):
        """Test cart structure with invalid trip ID"""
        invalid_cart = VALID_CART_DATA.copy()
        invalid_cart['idempotentShoppingTripId'] = "invalid#id"
        is_valid, error = validate_cart_structure(invalid_cart)
        assert not is_valid
        assert "Invalid shopping trip ID format" in error

@pytest.mark.asyncio
class TestDuplicateCheck:
    """Test duplicate request checking"""

    async def test_check_duplicate_request_not_found(self, mock_aws_clients):
        """Test checking for non-existent duplicate"""
        mock_aws_clients['table'].get_item.return_value = {}
        result = await check_duplicate_request('test-123')
        assert not result

    async def test_check_duplicate_request_found(self, mock_aws_clients):
        """Test checking for existing duplicate"""
        mock_aws_clients['table'].get_item.return_value = {'Item': {}}
        result = await check_duplicate_request('test-123')
        assert result

    async def test_check_duplicate_request_error(self, mock_aws_clients):
        """Test duplicate check with AWS error"""
        mock_aws_clients['table'].get_item.side_effect = ClientError(
            {'Error': {'Code': 'TestException', 'Message': 'Test error'}},
            'GetItem'
        )
        with pytest.raises(ClientError):
            await check_duplicate_request('test-123')

@pytest.mark.asyncio
class TestProductValidation:
    """Test product validation"""

    async def test_validate_product_valid(self):
        """Test validating valid product"""
        session_mock = AsyncMock()
        response_mock = AsyncMock()
        response_mock.status = 200
        response_mock.json.return_value = {'valid': True}
        session_mock.get.return_value.__aenter__.return_value = response_mock

        result = await validate_product({'id': 'item-1'}, session_mock)
        assert result

    async def test_validate_product_invalid(self):
        """Test validating invalid product"""
        session_mock = AsyncMock()
        response_mock = AsyncMock()
        response_mock.status = 404
        session_mock.get.return_value.__aenter__.return_value = response_mock

        result = await validate_product({'id': 'item-1'}, session_mock)
        assert not result

@pytest.mark.asyncio
class TestCartProcessing:
    """Test cart processing"""

    async def test_process_cart_success(self, mock_aws_clients):
        """Test successful cart processing"""
        with patch('cart_processor.check_duplicate_request', return_value=False),
                patch('cart_processor.validate_cart_items', return_value=(True, [])):

            result = await process_cart(VALID_CART_DATA)
            assert result['statusCode'] == 200
            assert 'purchaseId' in json.loads(result['body'])

    async def test_process_cart_duplicate(self, mock_aws_clients):
        """Test processing duplicate cart"""
        with patch('cart_processor.check_duplicate_request', return_value=True):
            result = await process_cart(VALID_CART_DATA)
            assert result['statusCode'] == 409
            assert 'DuplicateRequestError' in result['body']

    async def test_process_cart_invalid_items(self, mock_aws_clients):
        """Test processing cart with invalid items"""
        with patch('cart_processor.check_duplicate_request', return_value=False),
                patch('cart_processor.validate_cart_items', return_value=(False, ['item-1'])):

            result = await process_cart(VALID_CART_DATA)
            assert result['statusCode'] == 400
            assert 'ProductValidationError' in result['body']

class TestLambdaHandler:
    """Test Lambda handler"""

    @pytest.mark.asyncio
    async def test_handler_success(self):
        """Test successful Lambda execution"""
        with patch('cart_processor.process_cart') as mock_process:
            mock_process.return_value = {'statusCode': 200, 'body': '{}'}
            result = await lambda_handler({'body': json.dumps(VALID_CART_DATA)}, {})
            assert result['statusCode'] == 200

    @pytest.mark.asyncio
    async def test_handler_error(self):
        """Test Lambda execution with error"""
        with patch('cart_processor.process_cart') as mock_process:
            mock_process.side_effect = Exception('Test error')
            result = await lambda_handler({'body': json.dumps(VALID_CART_DATA)}, {})
            assert result['statusCode'] == 500

def test_validate_auth_events():
    """Test auth events validation"""
    valid_shopping_trip = {'authEvents': [{'type': 'auth'}]}
    assert validate_auth_events(valid_shopping_trip)

    invalid_shopping_trip = {'authEvents': []}
    assert not validate_auth_events(invalid_shopping_trip)


JavaScript Code

// cart-processor.test.js

const AWS = require('aws-sdk-mock');
const axios = require('axios');
const { handler } = require('./cart-processor');

jest.mock('axios');

const VALID_CART_DATA = {
    idempotentShoppingTripId: 'test-123',
    storeId: 'store-1',
    shoppingTrip: {
        authEvents: [
            { type: 'auth', timestamp: '2024-01-01T00:00:00Z' }
        ]
    },
    cartItems: [
        {
            id: 'item-1',
            quantity: { value: 1, unit: 'pieces' }
        }
    ]
};

describe('Cart Processor', () => {
    beforeAll(() => {
        // Mock environment variables
        process.env.identityInfoTable = 'test-table';
        process.env.cartStepFunction = 'test-step-function';
        process.env.badCartDataQueue = 'test-bad-cart-queue';
        process.env.emptyCartMessageQueue = 'test-empty-cart-queue';
        process.env.invalidItemsQueue = 'test-invalid-items-queue';
        process.env.duplicateRequestQueue = 'test-duplicate-queue';
        process.env.PRODUCT_API_ENDPOINT = 'http://test-api';
        process.env.role_name = 'test-role';
        process.env.LogGroupName = 'test-log-group';
        process.env.LogStreamName = 'test-log-stream';
    });

    beforeEach(() => {
        // Reset AWS mocks
        AWS.restore();
    });

    test('should process valid cart successfully', async () => {
        // Mock DynamoDB
        AWS.mock('DynamoDB.DocumentClient', 'get', (params, callback) => {
            callback(null, {});
        });

        // Mock SQS
        AWS.mock('SQS', 'sendMessage', (params, callback) => {
            callback(null, {});
        });

        // Mock CloudWatch
        AWS.mock('CloudWatch', 'putMetricData', (params, callback) => {
            callback(null, {});
        });

        // Mock product API
        axios.get.mockResolvedValue({ status: 200, data: {} });

        const event = {
            body: JSON.stringify(VALID_CART_DATA)
        };

        const result = await handler(event, {});
        expect(result.statusCode).toBe(200);
        expect(JSON.parse(result.body)).toHaveProperty('purchaseId');
    });

    test('should handle duplicate request', async () => {
        AWS.mock('DynamoDB.DocumentClient', 'get', (params, callback) => {
            callback(null, { Item: {} });
        });

        AWS.mock('SQS', 'sendMessage', (params, callback) => {
            callback(null, {});
        });

        const event = {
            body: JSON.stringify(VALID_CART_DATA)
        };

        const result = await handler(event, {});
        expect(result.statusCode).toBe(409);
        expect(JSON.parse(result.body)).toHaveProperty('errorType', 'DuplicateRequestError');
    });

    test('should handle invalid cart data', async () => {
        const event = {
            body: JSON.stringify({})
        };

        const result = await handler(event, {});
        expect(result.statusCode).toBe(400);
        expect(JSON.parse(result.body)).toHaveProperty('errorType', 'ValidationError');
    });

    test('should handle invalid products', async () => {
        AWS.mock('DynamoDB.DocumentClient', 'get', (params, callback) => {
            callback(null, {});
        });

        AWS.mock('SQS', 'sendMessage', (params, callback) => {
            callback(null, {});
        });

        axios.get.mockRejectedValue(new Error('Product not found'));

        const event = {
            body: JSON.stringify(VALID_CART_DATA)
        };

        const result = await handler(event, {});
        expect(result.statusCode).toBe(400);
        expect(JSON.parse(result.body)).toHaveProperty('errorType', 'ProductValidationError');
    });
});