Resolve Scan API
Note: Make sure to follow your organization's process to prepare the code for production release.
Python Code
"""
Enhanced Lambda function for processing loyalty scan events with decoding capabilities.
"""
import json
import os
import base64
from datetime import datetime, UTC
import logging
from typing import Dict, Any, Optional, List, TypedDict, Union
import time
import boto3
from boto3.dynamodb.conditions import Key
from botocore.exceptions import ClientError
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class ScanEvent(TypedDict):
id: str
timestamp: str
location: str
value: str
channel: str
class ScanRequest(TypedDict):
requestId: str
storeId: str
scanEvent: ScanEvent
# Environment variables
ENVIRONMENT_VARS = {
'shopperInfoTable': os.environ.get('shopperInfoTable'),
'identityInfoTable': os.environ.get('identityInfoTable'),
'loyaltyInfoTable': os.environ.get('LoyaltyInfoTable'),
'loyaltyScanHistoryTable': os.environ.get('loyaltyScanHistoryTable'),
'shopperLoyaltyTable': os.environ.get('shopperLoyaltyTable')
}
# Initialize AWS session and clients
session = boto3.Session()
dynamo_client = session.resource('dynamodb')
# Custom Exceptions
class LoyaltyScanError(Exception):
"""Base exception for loyalty scan processing errors."""
pass
class ValidationError(LoyaltyScanError):
"""Validation related errors."""
pass
class DecodingError(LoyaltyScanError):
"""Decoding related errors."""
pass
class DatabaseError(LoyaltyScanError):
"""Database operation related errors."""
pass
def validate_environment() -> None:
"""Validate required environment variables."""
missing_vars = [k for k, v in ENVIRONMENT_VARS.items() if not v]
if missing_vars:
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
def get_table(table_name: str) -> boto3.resource.Table:
"""Get DynamoDB table resource."""
return dynamo_client.Table(table_name)
def decode_loyalty_key(encoded_key: str) -> str:
"""
Decode loyalty key from base64 or hex format.
Args:
encoded_key: Encoded loyalty key
Returns:
str: Decoded loyalty key
Raises:
DecodingError: If decoding fails
"""
def is_valid_base64(s: str) -> bool:
return (len(s) % 4 == 0 and
all(c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=" for c in s))
logger.info("Attempting to decode loyalty key")
# Try base64 decoding
if is_valid_base64(encoded_key):
try:
decoded = base64.b64decode(encoded_key).decode('utf-8', errors='ignore')
logger.info("Successfully decoded base64 key")
return decoded
except Exception as e:
logger.warning(f"Base64 decode failed: {e}")
# Try hex decoding
try:
decoded = bytes.fromhex(encoded_key).decode('utf-8', errors='ignore')
logger.info("Successfully decoded hex key")
return decoded
except Exception as e:
logger.warning(f"Hex decode failed: {e}")
raise DecodingError(f"Failed to decode key: {str(e)}")
def create_scan_record(table_name: str, scan_data: Dict[str, Any]) -> bool:
"""
Create scan record in DynamoDB.
Args:
table_name: Target table name
scan_data: Scan record data
Returns:
bool: Success status
Raises:
DatabaseError: If database operation fails
"""
try:
table = get_table(table_name)
item = {
"scanId": scan_data["requestId"],
"customerLoyaltyId": scan_data["scanEvent_value"],
"storeId": scan_data["storeId"],
"scanEvent_id": scan_data["scanEvent_id"],
"scanEvent_timestamp": scan_data["scanEvent_timestamp"],
"scanEvent_location": scan_data["scanEvent_location"],
"scanEvent_channel": scan_data["scanEvent_channel"],
"scanOutcome": scan_data["scanOutcome"],
"created_at": datetime.now(UTC).isoformat()
}
table.put_item(
Item=item,
ConditionExpression='attribute_not_exists(scanId)'
)
return True
except ClientError as e:
logger.error(f"Failed to create scan record: {e}")
raise DatabaseError(f"Failed to create scan record: {str(e)}")
def batch_create_scan_records(table_name: str, scan_records: List[Dict[str, Any]]) -> bool:
"""
Batch create scan records in DynamoDB.
Args:
table_name: Target table name
scan_records: List of scan record data
Returns:
bool: Success status
"""
table = get_table(table_name)
try:
with table.batch_writer() as batch:
for record in scan_records:
batch.put_item(Item=record)
return True
except ClientError as e:
logger.error(f"Batch write failed: {e}")
return False
def query_loyalty(table_name: str, loyalty_id: str) -> Optional[Dict[str, Any]]:
"""
Query loyalty information from DynamoDB.
Args:
table_name: Target table name
loyalty_id: Loyalty ID to query
Returns:
Optional[Dict]: Loyalty information if found
Raises:
DatabaseError: If database operation fails
"""
try:
table = get_table(table_name)
response = table.query(
KeyConditionExpression=Key('customerLoyaltyId').eq(loyalty_id),
ConsistentRead=True
)
return response.get('Items', [])
except ClientError as e:
logger.error(f"DynamoDB query failed: {e}")
raise DatabaseError(f"Loyalty query failed: {str(e)}")
def create_response(status_code: int, body: Dict[str, Any]) -> Dict[str, Any]:
"""Create standardized API response."""
return {
"statusCode": status_code,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(body),
"isBase64Encoded": False
}
def validate_scan_request(scan_request: Dict[str, Any]) -> bool:
"""Validate scan request format."""
required_fields = ['requestId', 'storeId', 'scanEvent']
scan_event_fields = ['id', 'timestamp', 'location', 'value', 'channel']
if not all(field in scan_request for field in required_fields):
return False
if not all(field in scan_request['scanEvent'] for field in scan_event_fields):
return False
return True
def log_metrics(scan_details: Dict[str, Any], duration: float) -> None:
"""Log processing metrics."""
logger.info(
"Scan processing metrics",
extra={
"duration_ms": duration,
"store_id": scan_details["storeId"],
"outcome": scan_details["scanOutcome"],
"channel": scan_details["scanEvent_channel"]
}
)
def process_loyalty_scan(scan_request: ScanRequest) -> Dict[str, Any]:
"""
Process loyalty scan and prepare response.
Args:
scan_request: Validated scan request
Returns:
Dict: API response
"""
start_time = time.time()
scan_event = scan_request['scanEvent']
decoded_value = decode_loyalty_key(scan_event['value'])
scan_details = {
"requestId": scan_request['requestId'],
"storeId": scan_request['storeId'],
"scanEvent_id": scan_event['id'],
"scanEvent_timestamp": scan_event['timestamp'],
"scanEvent_location": scan_event['location'],
"scanEvent_value": decoded_value,
"scanEvent_channel": scan_event['channel'],
"timeStamp": datetime.now(UTC).isoformat()
}
loyalty_results = query_loyalty(
ENVIRONMENT_VARS['shopperLoyaltyTable'],
decoded_value
)
if loyalty_results:
scan_details['scanOutcome'] = "ACCEPT"
response_data = {
"id": decoded_value,
"type": "LOYALTY",
"action": "ACCEPT"
}
else:
scan_details['scanOutcome'] = "REJECT"
response_data = {
"id": "no-loyalty-id",
"type": "LOYALTY",
"action": "REJECT"
}
create_scan_record(
ENVIRONMENT_VARS['loyaltyScanHistoryTable'],
scan_details
)
duration = (time.time() - start_time) * 1000 # Convert to milliseconds
log_metrics(scan_details, duration)
return create_response(200, response_data)
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
Main Lambda handler function.
Args:
event: Lambda event
context: Lambda context
Returns:
Dict: API response
"""
try:
logger.info(f"Processing event: {event}")
validate_environment()
if not event.get('body'):
raise ValidationError("Missing request body")
scan_request = json.loads(event['body'])
if not validate_scan_request(scan_request):
raise ValidationError("Invalid scan request format")
return process_loyalty_scan(scan_request)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON: {e}")
return create_response(400, {"error": "Invalid request format"})
except ValidationError as e:
logger.error(f"Validation error: {e}")
return create_response(400, {"error": str(e)})
except DecodingError as e:
logger.error(f"Decoding error: {e}")
return create_response(400, {"error": str(e)})
except DatabaseError as e:
logger.error(f"Database error: {e}")
return create_response(500, {"error": str(e)})
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
return create_response(500, {"error": "Internal server error"})
Unit Tests
import pytest
import json
from unittest.mock import patch, MagicMock
from botocore.exceptions import ClientError
# Import the functions from your Lambda function file
from your_lambda_file import (
validate_environment, decode_loyalty_key, create_scan_record,
query_loyalty, validate_scan_request, process_loyalty_scan,
lambda_handler, LoyaltyScanError, ValidationError, DecodingError, DatabaseError
)
# Mock environment variables
@pytest.fixture(autouse=True)
def mock_env_vars(monkeypatch):
monkeypatch.setenv('shopperInfoTable', 'mock-shopper-info')
monkeypatch.setenv('identityInfoTable', 'mock-identity-info')
monkeypatch.setenv('LoyaltyInfoTable', 'mock-loyalty-info')
monkeypatch.setenv('loyaltyScanHistoryTable', 'mock-loyalty-scan-history')
monkeypatch.setenv('shopperLoyaltyTable', 'mock-shopper-loyalty')
def test_validate_environment():
validate_environment() # Should not raise an exception
def test_validate_environment_missing_var(monkeypatch):
monkeypatch.delenv('shopperInfoTable')
with pytest.raises(ValueError):
validate_environment()
@pytest.mark.parametrize("input_key,expected_output", [
("SGVsbG8gV29ybGQ=", "Hello World"), # Base64
("48656C6C6F20576F726C64", "Hello World"), # Hex
("plaintext", "plaintext"), # Plain text (fallback)
])
def test_decode_loyalty_key(input_key, expected_output):
assert decode_loyalty_key(input_key) == expected_output
def test_decode_loyalty_key_error():
with pytest.raises(DecodingError):
decode_loyalty_key("invalid_key_!@#")
@patch('your_lambda_file.get_table')
def test_create_scan_record(mock_get_table):
mock_table = MagicMock()
mock_get_table.return_value = mock_table
scan_data = {
"requestId": "test-request-id",
"scanEvent_value": "test-loyalty-id",
"storeId": "test-store-id",
"scanEvent_id": "test-event-id",
"scanEvent_timestamp": "2023-08-25T12:00:00Z",
"scanEvent_location": "test-location",
"scanEvent_channel": "test-channel",
"scanOutcome": "ACCEPT"
}
assert create_scan_record("test-table", scan_data) == True
mock_table.put_item.assert_called_once()
@patch('your_lambda_file.get_table')
def test_create_scan_record_error(mock_get_table):
mock_table = MagicMock()
mock_get_table.return_value = mock_table
mock_table.put_item.side_effect = ClientError({"Error": {"Code": "ConditionalCheckFailedException"}}, "PutItem")
with pytest.raises(DatabaseError):
create_scan_record("test-table", {})
@patch('your_lambda_file.get_table')
def test_query_loyalty(mock_get_table):
mock_table = MagicMock()
mock_get_table.return_value = mock_table
mock_table.query.return_value = {"Items": [{"key": "value"}]}
result = query_loyalty("test-table", "test-loyalty-id")
assert result == [{"key": "value"}]
mock_table.query.assert_called_once()
@patch('your_lambda_file.get_table')
def test_query_loyalty_error(mock_get_table):
mock_table = MagicMock()
mock_get_table.return_value = mock_table
mock_table.query.side_effect = ClientError({"Error": {"Code": "ResourceNotFoundException"}}, "Query")
with pytest.raises(DatabaseError):
query_loyalty("test-table", "test-loyalty-id")
def test_validate_scan_request():
valid_request = {
"requestId": "test-id",
"storeId": "test-store",
"scanEvent": {
"id": "test-event-id",
"timestamp": "2023-08-25T12:00:00Z",
"location": "test-location",
"value": "test-value",
"channel": "test-channel"
}
}
assert validate_scan_request(valid_request) == True
invalid_request = {
"requestId": "test-id",
"storeId": "test-store",
"scanEvent": {
"id": "test-event-id",
"timestamp": "2023-08-25T12:00:00Z",
"location": "test-location",
# Missing 'value' and 'channel'
}
}
assert validate_scan_request(invalid_request) == False
@patch('your_lambda_file.decode_loyalty_key')
@patch('your_lambda_file.query_loyalty')
@patch('your_lambda_file.create_scan_record')
def test_process_loyalty_scan(mock_create_scan_record, mock_query_loyalty, mock_decode_loyalty_key):
mock_decode_loyalty_key.return_value = "decoded-value"
mock_query_loyalty.return_value = [{"loyalty": "data"}]
mock_create_scan_record.return_value = True
scan_request = {
"requestId": "test-id",
"storeId": "test-store",
"scanEvent": {
"id": "test-event-id",
"timestamp": "2023-08-25T12:00:00Z",
"location": "test-location",
"value": "encoded-value",
"channel": "test-channel"
}
}
result = process_loyalty_scan(scan_request)
assert result['statusCode'] == 200
assert json.loads(result['body'])['action'] == 'ACCEPT'
@patch('your_lambda_file.process_loyalty_scan')
def test_lambda_handler(mock_process_loyalty_scan):
mock_process_loyalty_scan.return_value = {"statusCode": 200, "body": json.dumps({"action": "ACCEPT"})}
event = {
"body": json.dumps({
"requestId": "test-id",
"storeId": "test-store",
"scanEvent": {
"id": "test-event-id",
"timestamp": "2023-08-25T12:00:00Z",
"location": "test-location",
"value": "encoded-value",
"channel": "test-channel"
}
})
}
result = lambda_handler(event, None)
assert result['statusCode'] == 200
assert json.loads(result['body'])['action'] == 'ACCEPT'
def test_lambda_handler_invalid_json():
event = {"body": "invalid json"}
result = lambda_handler(event, None)
assert result['statusCode'] == 400
assert "Invalid request format" in result['body']
def test_lambda_handler_missing_body():
event = {}
result = lambda_handler(event, None)
assert result['statusCode'] == 400
assert "Missing request body" in result['body']
JavaScript Code
// test_lambda_function.test.js
const AWS = require('aws-sdk');
const { handler } = require('./index');
// Mock AWS SDK
jest.mock('aws-sdk');
describe('Loyalty Scan Lambda Function', () => {
beforeEach(() => {
// Reset mocks
jest.clearAllMocks();
// Set environment variables
process.env.shopperInfoTable = 'mock-shopper-info';
process.env.identityInfoTable = 'mock-identity-info';
process.env.LoyaltyInfoTable = 'mock-loyalty-info';
process.env.loyaltyScanHistoryTable = 'mock-loyalty-scan-history';
process.env.shopperLoyaltyTable = 'mock-shopper-loyalty';
});
const validEvent = {
body: JSON.stringify({
requestId: 'test-id',
storeId: 'test-store',
scanEvent: {
id: 'test-event-id',
timestamp: '2023-08-25T12:00:00Z',
location: 'test-location',
value: 'SGVsbG8gV29ybGQ=', // "Hello World" in base64
channel: 'test-channel'
}
})
};
test('successful loyalty scan processing', async () => {
// Mock DynamoDB query response
AWS.DynamoDB.DocumentClient.prototype.query.mockImplementationOnce(() => ({
promise: () => Promise.resolve({ Items: [{ id: 'test-loyalty' }] })
}));
// Mock DynamoDB put response
AWS.DynamoDB.DocumentClient.prototype.put.mockImplementationOnce(() => ({
promise: () => Promise.resolve({})
}));
const response = await handler(validEvent);
expect(response.statusCode).toBe(200);
expect(JSON.parse(response.body)).toEqual({
id: 'Hello World',
type: 'LOYALTY',
action: 'ACCEPT'
});
});
test('rejected loyalty scan processing', async () => {
// Mock DynamoDB query response (empty result)
AWS.DynamoDB.DocumentClient.prototype.query.mockImplementationOnce(() => ({
promise: () => Promise.resolve({ Items: [] })
}));
// Mock DynamoDB put response
AWS.DynamoDB.DocumentClient.prototype.put.mockImplementationOnce(() => ({
promise: () => Promise.resolve({})
}));
const response = await handler(validEvent);
expect(response.statusCode).toBe(200);
expect(JSON.parse(response.body)).toEqual({
id: 'no-loyalty-id',
type: 'LOYALTY',
action: 'REJECT'
});
});
test('missing request body', async () => {
const response = await handler({});
expect(response.statusCode).toBe(400);
expect(JSON.parse(response.body)).toEqual({
error: 'Missing request body'
});
});
test('invalid request format', async () => {
const response = await handler({
body: JSON.stringify({
requestId: 'test-id'
// Missing required fields
})
});
expect(response.statusCode).toBe(400);
expect(JSON.parse(response.body)).toEqual({
error: 'Invalid scan request format'
});
});
test('database error handling', async () => {
AWS.DynamoDB.DocumentClient.prototype.query.mockImplementationOnce(() => ({
promise: () => Promise.reject(new Error('Database connection failed'))
}));
const response = await handler(validEvent);
expect(response.statusCode).toBe(500);
expect(JSON.parse(response.body)).toEqual({
error: 'Loyalty query failed: Database connection failed'
});
});
});