Developer Console
Gracias por tu visita. Esta página solo está disponible en inglés.

Resolve Scan API

Python Code

"""
Enhanced Lambda function for processing loyalty scan events with decoding capabilities.
"""

import json
import os
import base64
from datetime import datetime, UTC
import logging
from typing import Dict, Any, Optional, List, TypedDict, Union
import time

import boto3
from boto3.dynamodb.conditions import Key
from botocore.exceptions import ClientError

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

class ScanEvent(TypedDict):
    id: str
    timestamp: str
    location: str
    value: str
    channel: str

class ScanRequest(TypedDict):
    requestId: str
    storeId: str
    scanEvent: ScanEvent

# Environment variables
ENVIRONMENT_VARS = {
    'shopperInfoTable': os.environ.get('shopperInfoTable'),
    'identityInfoTable': os.environ.get('identityInfoTable'),
    'loyaltyInfoTable': os.environ.get('LoyaltyInfoTable'),
    'loyaltyScanHistoryTable': os.environ.get('loyaltyScanHistoryTable'),
    'shopperLoyaltyTable': os.environ.get('shopperLoyaltyTable')
}

# Initialize AWS session and clients
session = boto3.Session()
dynamo_client = session.resource('dynamodb')

# Custom Exceptions
class LoyaltyScanError(Exception):
    """Base exception for loyalty scan processing errors."""
    pass

class ValidationError(LoyaltyScanError):
    """Validation related errors."""
    pass

class DecodingError(LoyaltyScanError):
    """Decoding related errors."""
    pass

class DatabaseError(LoyaltyScanError):
    """Database operation related errors."""
    pass

def validate_environment() -> None:
    """Validate required environment variables."""
    missing_vars = [k for k, v in ENVIRONMENT_VARS.items() if not v]
    if missing_vars:
        raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")

def get_table(table_name: str) -> boto3.resource.Table:
    """Get DynamoDB table resource."""
    return dynamo_client.Table(table_name)

def decode_loyalty_key(encoded_key: str) -> str:
    """
    Decode loyalty key from base64 or hex format.

    Args:
        encoded_key: Encoded loyalty key
        
    Returns:
        str: Decoded loyalty key
        
    Raises:
        DecodingError: If decoding fails
    """
    def is_valid_base64(s: str) -> bool:
        return (len(s) % 4 == 0 and
                all(c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=" for c in s))

    logger.info("Attempting to decode loyalty key")

    # Try base64 decoding
    if is_valid_base64(encoded_key):
        try:
            decoded = base64.b64decode(encoded_key).decode('utf-8', errors='ignore')
            logger.info("Successfully decoded base64 key")
            return decoded
        except Exception as e:
            logger.warning(f"Base64 decode failed: {e}")

    # Try hex decoding
    try:
        decoded = bytes.fromhex(encoded_key).decode('utf-8', errors='ignore')
        logger.info("Successfully decoded hex key")
        return decoded
    except Exception as e:
        logger.warning(f"Hex decode failed: {e}")
        raise DecodingError(f"Failed to decode key: {str(e)}")

def create_scan_record(table_name: str, scan_data: Dict[str, Any]) -> bool:
    """
    Create scan record in DynamoDB.

    Args:
        table_name: Target table name
        scan_data: Scan record data
        
    Returns:
        bool: Success status
        
    Raises:
        DatabaseError: If database operation fails
    """
    try:
        table = get_table(table_name)
        item = {
            "scanId": scan_data["requestId"],
            "customerLoyaltyId": scan_data["scanEvent_value"],
            "storeId": scan_data["storeId"],
            "scanEvent_id": scan_data["scanEvent_id"],
            "scanEvent_timestamp": scan_data["scanEvent_timestamp"],
            "scanEvent_location": scan_data["scanEvent_location"],
            "scanEvent_channel": scan_data["scanEvent_channel"],
            "scanOutcome": scan_data["scanOutcome"],
            "created_at": datetime.now(UTC).isoformat()
        }

        table.put_item(
            Item=item,
            ConditionExpression='attribute_not_exists(scanId)'
        )
        return True

    except ClientError as e:
        logger.error(f"Failed to create scan record: {e}")
        raise DatabaseError(f"Failed to create scan record: {str(e)}")

def batch_create_scan_records(table_name: str, scan_records: List[Dict[str, Any]]) -> bool:
    """
    Batch create scan records in DynamoDB.

    Args:
        table_name: Target table name
        scan_records: List of scan record data
        
    Returns:
        bool: Success status
    """
    table = get_table(table_name)
    try:
        with table.batch_writer() as batch:
            for record in scan_records:
                batch.put_item(Item=record)
        return True
    except ClientError as e:
        logger.error(f"Batch write failed: {e}")
        return False

def query_loyalty(table_name: str, loyalty_id: str) -> Optional[Dict[str, Any]]:
    """
    Query loyalty information from DynamoDB.

    Args:
        table_name: Target table name
        loyalty_id: Loyalty ID to query
        
    Returns:
        Optional[Dict]: Loyalty information if found
        
    Raises:
        DatabaseError: If database operation fails
    """
    try:
        table = get_table(table_name)
        response = table.query(
            KeyConditionExpression=Key('customerLoyaltyId').eq(loyalty_id),
            ConsistentRead=True
        )
        return response.get('Items', [])
    except ClientError as e:
        logger.error(f"DynamoDB query failed: {e}")
        raise DatabaseError(f"Loyalty query failed: {str(e)}")

def create_response(status_code: int, body: Dict[str, Any]) -> Dict[str, Any]:
    """Create standardized API response."""
    return {
        "statusCode": status_code,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps(body),
        "isBase64Encoded": False
    }

def validate_scan_request(scan_request: Dict[str, Any]) -> bool:
    """Validate scan request format."""
    required_fields = ['requestId', 'storeId', 'scanEvent']
    scan_event_fields = ['id', 'timestamp', 'location', 'value', 'channel']

    if not all(field in scan_request for field in required_fields):
        return False

    if not all(field in scan_request['scanEvent'] for field in scan_event_fields):
        return False

    return True

def log_metrics(scan_details: Dict[str, Any], duration: float) -> None:
    """Log processing metrics."""
    logger.info(
        "Scan processing metrics",
        extra={
            "duration_ms": duration,
            "store_id": scan_details["storeId"],
            "outcome": scan_details["scanOutcome"],
            "channel": scan_details["scanEvent_channel"]
        }
    )

def process_loyalty_scan(scan_request: ScanRequest) -> Dict[str, Any]:
    """
    Process loyalty scan and prepare response.
    
    Args:
        scan_request: Validated scan request
        
    Returns:
        Dict: API response
    """
    start_time = time.time()

    scan_event = scan_request['scanEvent']
    decoded_value = decode_loyalty_key(scan_event['value'])

    scan_details = {
        "requestId": scan_request['requestId'],
        "storeId": scan_request['storeId'],
        "scanEvent_id": scan_event['id'],
        "scanEvent_timestamp": scan_event['timestamp'],
        "scanEvent_location": scan_event['location'],
        "scanEvent_value": decoded_value,
        "scanEvent_channel": scan_event['channel'],
        "timeStamp": datetime.now(UTC).isoformat()
    }

    loyalty_results = query_loyalty(
        ENVIRONMENT_VARS['shopperLoyaltyTable'],
        decoded_value
    )

    if loyalty_results:
        scan_details['scanOutcome'] = "ACCEPT"
        response_data = {
            "id": decoded_value,
            "type": "LOYALTY",
            "action": "ACCEPT"
        }
    else:
        scan_details['scanOutcome'] = "REJECT"
        response_data = {
            "id": "no-loyalty-id",
            "type": "LOYALTY",
            "action": "REJECT"
        }

    create_scan_record(
        ENVIRONMENT_VARS['loyaltyScanHistoryTable'],
        scan_details
    )

    duration = (time.time() - start_time) * 1000  # Convert to milliseconds
    log_metrics(scan_details, duration)

    return create_response(200, response_data)

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """
    Main Lambda handler function.

    Args:
        event: Lambda event
        context: Lambda context
        
    Returns:
        Dict: API response
    """
    try:
        logger.info(f"Processing event: {event}")
        validate_environment()

        if not event.get('body'):
            raise ValidationError("Missing request body")

        scan_request = json.loads(event['body'])

        if not validate_scan_request(scan_request):
            raise ValidationError("Invalid scan request format")

        return process_loyalty_scan(scan_request)

    except json.JSONDecodeError as e:
        logger.error(f"Invalid JSON: {e}")
        return create_response(400, {"error": "Invalid request format"})

    except ValidationError as e:
        logger.error(f"Validation error: {e}")
        return create_response(400, {"error": str(e)})

    except DecodingError as e:
        logger.error(f"Decoding error: {e}")
        return create_response(400, {"error": str(e)})

    except DatabaseError as e:
        logger.error(f"Database error: {e}")
        return create_response(500, {"error": str(e)})

    except Exception as e:
        logger.error(f"Unexpected error: {e}", exc_info=True)
        return create_response(500, {"error": "Internal server error"})


Unit Tests


import pytest
import json
from unittest.mock import patch, MagicMock
from botocore.exceptions import ClientError

# Import the functions from your Lambda function file
from your_lambda_file import (
    validate_environment, decode_loyalty_key, create_scan_record,
    query_loyalty, validate_scan_request, process_loyalty_scan,
    lambda_handler, LoyaltyScanError, ValidationError, DecodingError, DatabaseError
)

# Mock environment variables
@pytest.fixture(autouse=True)
def mock_env_vars(monkeypatch):
    monkeypatch.setenv('shopperInfoTable', 'mock-shopper-info')
    monkeypatch.setenv('identityInfoTable', 'mock-identity-info')
    monkeypatch.setenv('LoyaltyInfoTable', 'mock-loyalty-info')
    monkeypatch.setenv('loyaltyScanHistoryTable', 'mock-loyalty-scan-history')
    monkeypatch.setenv('shopperLoyaltyTable', 'mock-shopper-loyalty')

def test_validate_environment():
    validate_environment()  # Should not raise an exception

def test_validate_environment_missing_var(monkeypatch):
    monkeypatch.delenv('shopperInfoTable')
    with pytest.raises(ValueError):
        validate_environment()

@pytest.mark.parametrize("input_key,expected_output", [
    ("SGVsbG8gV29ybGQ=", "Hello World"),  # Base64
    ("48656C6C6F20576F726C64", "Hello World"),  # Hex
    ("plaintext", "plaintext"),  # Plain text (fallback)
])
def test_decode_loyalty_key(input_key, expected_output):
    assert decode_loyalty_key(input_key) == expected_output

def test_decode_loyalty_key_error():
    with pytest.raises(DecodingError):
        decode_loyalty_key("invalid_key_!@#")

@patch('your_lambda_file.get_table')
def test_create_scan_record(mock_get_table):
    mock_table = MagicMock()
    mock_get_table.return_value = mock_table
    
    scan_data = {
        "requestId": "test-request-id",
        "scanEvent_value": "test-loyalty-id",
        "storeId": "test-store-id",
        "scanEvent_id": "test-event-id",
        "scanEvent_timestamp": "2023-08-25T12:00:00Z",
        "scanEvent_location": "test-location",
        "scanEvent_channel": "test-channel",
        "scanOutcome": "ACCEPT"
    }
    
    assert create_scan_record("test-table", scan_data) == True
    mock_table.put_item.assert_called_once()

@patch('your_lambda_file.get_table')
def test_create_scan_record_error(mock_get_table):
    mock_table = MagicMock()
    mock_get_table.return_value = mock_table
    mock_table.put_item.side_effect = ClientError({"Error": {"Code": "ConditionalCheckFailedException"}}, "PutItem")
    
    with pytest.raises(DatabaseError):
        create_scan_record("test-table", {})

@patch('your_lambda_file.get_table')
def test_query_loyalty(mock_get_table):
    mock_table = MagicMock()
    mock_get_table.return_value = mock_table
    mock_table.query.return_value = {"Items": [{"key": "value"}]}
    
    result = query_loyalty("test-table", "test-loyalty-id")
    assert result == [{"key": "value"}]
    mock_table.query.assert_called_once()

@patch('your_lambda_file.get_table')
def test_query_loyalty_error(mock_get_table):
    mock_table = MagicMock()
    mock_get_table.return_value = mock_table
    mock_table.query.side_effect = ClientError({"Error": {"Code": "ResourceNotFoundException"}}, "Query")
    
    with pytest.raises(DatabaseError):
        query_loyalty("test-table", "test-loyalty-id")

def test_validate_scan_request():
    valid_request = {
        "requestId": "test-id",
        "storeId": "test-store",
        "scanEvent": {
            "id": "test-event-id",
            "timestamp": "2023-08-25T12:00:00Z",
            "location": "test-location",
            "value": "test-value",
            "channel": "test-channel"
        }
    }
    assert validate_scan_request(valid_request) == True

    invalid_request = {
        "requestId": "test-id",
        "storeId": "test-store",
        "scanEvent": {
            "id": "test-event-id",
            "timestamp": "2023-08-25T12:00:00Z",
            "location": "test-location",
            # Missing 'value' and 'channel'
        }
    }
    assert validate_scan_request(invalid_request) == False

@patch('your_lambda_file.decode_loyalty_key')
@patch('your_lambda_file.query_loyalty')
@patch('your_lambda_file.create_scan_record')
def test_process_loyalty_scan(mock_create_scan_record, mock_query_loyalty, mock_decode_loyalty_key):
    mock_decode_loyalty_key.return_value = "decoded-value"
    mock_query_loyalty.return_value = [{"loyalty": "data"}]
    mock_create_scan_record.return_value = True

    scan_request = {
        "requestId": "test-id",
        "storeId": "test-store",
        "scanEvent": {
            "id": "test-event-id",
            "timestamp": "2023-08-25T12:00:00Z",
            "location": "test-location",
            "value": "encoded-value",
            "channel": "test-channel"
        }
    }

    result = process_loyalty_scan(scan_request)
    assert result['statusCode'] == 200
    assert json.loads(result['body'])['action'] == 'ACCEPT'

@patch('your_lambda_file.process_loyalty_scan')
def test_lambda_handler(mock_process_loyalty_scan):
    mock_process_loyalty_scan.return_value = {"statusCode": 200, "body": json.dumps({"action": "ACCEPT"})}

    event = {
        "body": json.dumps({
            "requestId": "test-id",
            "storeId": "test-store",
            "scanEvent": {
                "id": "test-event-id",
                "timestamp": "2023-08-25T12:00:00Z",
                "location": "test-location",
                "value": "encoded-value",
                "channel": "test-channel"
            }
        })
    }

    result = lambda_handler(event, None)
    assert result['statusCode'] == 200
    assert json.loads(result['body'])['action'] == 'ACCEPT'

def test_lambda_handler_invalid_json():
    event = {"body": "invalid json"}
    result = lambda_handler(event, None)
    assert result['statusCode'] == 400
    assert "Invalid request format" in result['body']

def test_lambda_handler_missing_body():
    event = {}
    result = lambda_handler(event, None)
    assert result['statusCode'] == 400
    assert "Missing request body" in result['body']


JavaScript Code

// test_lambda_function.test.js

const AWS = require('aws-sdk');
const { handler } = require('./index');

// Mock AWS SDK
jest.mock('aws-sdk');

describe('Loyalty Scan Lambda Function', () => {
    beforeEach(() => {
        // Reset mocks
        jest.clearAllMocks();

        // Set environment variables
        process.env.shopperInfoTable = 'mock-shopper-info';
        process.env.identityInfoTable = 'mock-identity-info';
        process.env.LoyaltyInfoTable = 'mock-loyalty-info';
        process.env.loyaltyScanHistoryTable = 'mock-loyalty-scan-history';
        process.env.shopperLoyaltyTable = 'mock-shopper-loyalty';
    });

    const validEvent = {
        body: JSON.stringify({
            requestId: 'test-id',
            storeId: 'test-store',
            scanEvent: {
                id: 'test-event-id',
                timestamp: '2023-08-25T12:00:00Z',
                location: 'test-location',
                value: 'SGVsbG8gV29ybGQ=', // "Hello World" in base64
                channel: 'test-channel'
            }
        })
    };

    test('successful loyalty scan processing', async () => {
        // Mock DynamoDB query response
        AWS.DynamoDB.DocumentClient.prototype.query.mockImplementationOnce(() => ({
            promise: () => Promise.resolve({ Items: [{ id: 'test-loyalty' }] })
        }));

        // Mock DynamoDB put response
        AWS.DynamoDB.DocumentClient.prototype.put.mockImplementationOnce(() => ({
            promise: () => Promise.resolve({})
        }));

        const response = await handler(validEvent);

        expect(response.statusCode).toBe(200);
        expect(JSON.parse(response.body)).toEqual({
            id: 'Hello World',
            type: 'LOYALTY',
            action: 'ACCEPT'
        });
    });

    test('rejected loyalty scan processing', async () => {
        // Mock DynamoDB query response (empty result)
        AWS.DynamoDB.DocumentClient.prototype.query.mockImplementationOnce(() => ({
            promise: () => Promise.resolve({ Items: [] })
        }));

        // Mock DynamoDB put response
        AWS.DynamoDB.DocumentClient.prototype.put.mockImplementationOnce(() => ({
            promise: () => Promise.resolve({})
        }));

        const response = await handler(validEvent);

        expect(response.statusCode).toBe(200);
        expect(JSON.parse(response.body)).toEqual({
            id: 'no-loyalty-id',
            type: 'LOYALTY',
            action: 'REJECT'
        });
    });

    test('missing request body', async () => {
        const response = await handler({});

        expect(response.statusCode).toBe(400);
        expect(JSON.parse(response.body)).toEqual({
            error: 'Missing request body'
        });
    });

    test('invalid request format', async () => {
        const response = await handler({
            body: JSON.stringify({
                requestId: 'test-id'
                // Missing required fields
            })
        });

        expect(response.statusCode).toBe(400);
        expect(JSON.parse(response.body)).toEqual({
            error: 'Invalid scan request format'
        });
    });

    test('database error handling', async () => {
        AWS.DynamoDB.DocumentClient.prototype.query.mockImplementationOnce(() => ({
            promise: () => Promise.reject(new Error('Database connection failed'))
        }));

        const response = await handler(validEvent);

        expect(response.statusCode).toBe(500);
        expect(JSON.parse(response.body)).toEqual({
            error: 'Loyalty query failed: Database connection failed'
        });
    });
});