as

Settings
Sign out
Notifications
Alexa
Amazonアプリストア
Ring
AWS
ドキュメント
Support
Contact Us
My Cases
Docs
Resources
Ecommerce Plug-ins
Publish
Connect
アクセスいただきありがとうございます。こちらのページは現在英語のみのご用意となっております。順次日本語化を進めてまいりますので、ご理解のほどよろしくお願いいたします。

Catalog API Invoke API

Summary

The invoke catalog API is used to upload items to the Amazon Just WalkOut API.

Python Code


import json
import boto3
import urllib3
import os
from datetime import datetime
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from urllib.parse import urlparse

# Initialize urllib3 PoolManager
http = urllib3.PoolManager()

def get_table():
    """Initialize DynamoDB table resource"""
    dynamodb = boto3.resource('dynamodb')
    return dynamodb.Table('CatalogAPICalls')

def sign_request(url, method, body):
    """Sign AWS request with SigV4"""
    session = boto3.Session()
    credentials = session.get_credentials()

    parsed_url = urlparse(url)
    region = parsed_url.netloc.split('.')[0]

    request = AWSRequest(
        method=method,
        url=url,
        data=json.dumps(body)
    )

    request.headers.add_header('Content-Type', 'application/json')
    SigV4Auth(credentials, 'execute-api', region).add_auth(request)

    return dict(request.headers)

def validate_payload(payload):
    """Validate the catalog items payload structure"""
    if not isinstance(payload, dict):
        return False
    if 'catalogItems' not in payload:
        return False
    if not isinstance(payload['catalogItems'], list):
        return False
    if not payload['catalogItems']:
        return False

    required_fields = [
        'item_sku', 'external_product_id', 'external_product_id_type',
        'item_name', 'store_id', 'standard_price', 'brand_name',
        'product_category', 'product_subcategory'
    ]

    return all(all(field in item for field in required_fields)
               for item in payload['catalogItems'])

def lambda_handler(event, context):
    """Main Lambda handler function"""
    print("Received event: " + json.dumps(event, indent=2))

    # Validate environment variables
    api_url = "<REPLACE_WITH_API_URL>"
    if not api_url:
        return {
            'statusCode': 500,
            'body': json.dumps('variable is not set')
        }

    # Define the payload
    payload = {
        "catalogItems": [
            {
                "item_sku": "96385074",
                "external_product_id": "96385074",
                "external_product_id_type": "EAN8",
                "item_name": "Life Water",
                "store_id": "<REPLACE_WITH_STORE>",
                "standard_price": "1.00",
                "brand_name": "Life Water",
                "product_category": "Drink",
                "product_subcategory": "Water"
            }
        ]
    }

    # Validate payload structure
    if not validate_payload(payload):
        return {
            'statusCode': 400,
            'body': json.dumps('Invalid payload structure')
        }

    try:
        # Get DynamoDB table
        table = get_table()

        # Sign the request
        headers = sign_request(api_url, 'POST', payload)

        # Make API request
        response = http.request(
            'POST',
            api_url,
            body=json.dumps(payload),
            headers=headers
        )

        if response.status != 201:
            raise urllib3.exceptions.HTTPError(f"Request failed with status {response.status}")

        # Process response
        api_response = json.loads(response.data.decode('utf-8'))
        print("API Response:", api_response)

        # Store in DynamoDB
        ingestion_id = datetime.utcnow().isoformat()
        table.put_item(Item={
            'ingestion_id': ingestion_id,
            'api_response': api_response,
            'timestamp': datetime.utcnow().isoformat()
        })

        return {
            'statusCode': 201,
            'body': json.dumps('Successfully processed catalog items')
        }
    except urllib3.exceptions.HTTPError as e:
        print(f"Error: {str(e)}")
        error_content = response.data.decode('utf-8') if 'response' in locals() else 'No response content'
        print(f"Response content: {error_content}")
        return {
            'statusCode': 500,
            'body': json.dumps(f'Error processing catalog items: {str(e)}')
        }
    except Exception as e:
        print(f"Unexpected error: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps(f'Unexpected error processing catalog items: {str(e)}')
        } 

Unit tests

import unittest
from unittest.mock import Mock, patch, MagicMock
import json
from datetime import datetime
import sys
import os

# Add parent directory to path to import the module
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from catalog_temp import validate_payload, sign_request, lambda_handler


class TestValidatePayload(unittest.TestCase):
    
    def test_valid_payload(self):
        payload = {
            "catalogItems": [{
                "item_sku": "123",
                "external_product_id": "456",
                "external_product_id_type": "UPC",
                "item_name": "Test Item",
                "store_id": "STORE-001",
                "standard_price": "10.00",
                "brand_name": "Test Brand",
                "product_category": "Category",
                "product_subcategory": "Subcategory"
            }]
        }
        self.assertTrue(validate_payload(payload))
    
    def test_invalid_payload_not_dict(self):
        self.assertFalse(validate_payload("not a dict"))
    
    def test_invalid_payload_missing_catalog_items(self):
        payload = {"other_field": "value"}
        self.assertFalse(validate_payload(payload))
    
    def test_invalid_payload_catalog_items_not_list(self):
        payload = {"catalogItems": "not a list"}
        self.assertFalse(validate_payload(payload))
    
    def test_invalid_payload_empty_catalog_items(self):
        payload = {"catalogItems": []}
        self.assertFalse(validate_payload(payload))
    
    def test_invalid_payload_missing_required_field(self):
        payload = {
            "catalogItems": [{
                "item_sku": "123",
                "external_product_id": "456"
                # Missing other required fields
            }]
        }
        self.assertFalse(validate_payload(payload))


class TestSignRequest(unittest.TestCase):
    
    @patch('catalog_temp.boto3.Session')
    @patch('catalog_temp.SigV4Auth')
    def test_sign_request(self, mock_sigv4, mock_session):
        mock_credentials = Mock()
        mock_session.return_value.get_credentials.return_value = mock_credentials
        
        url = "https://us-east-1.example.com/api"
        method = "POST"
        body = {"test": "data"}
        
        headers = sign_request(url, method, body)
        
        self.assertIsInstance(headers, dict)
        mock_session.return_value.get_credentials.assert_called_once()


class TestLambdaHandler(unittest.TestCase):
    
    def setUp(self):
        self.context = Mock()
        self.context.aws_request_id = "test-request-id"
    
    @patch('catalog_temp.get_table')
    @patch('catalog_temp.sign_request')
    @patch('catalog_temp.http')
    def test_lambda_handler_success(self, mock_http, mock_sign, mock_get_table):
        # Mock API response
        mock_response = Mock()
        mock_response.status = 201
        mock_response.data = json.dumps({"ingestionId": "test-123"}).encode('utf-8')
        mock_http.request.return_value = mock_response
        
        # Mock DynamoDB table
        mock_table = Mock()
        mock_get_table.return_value = mock_table
        
        # Mock sign_request
        mock_sign.return_value = {"Authorization": "test-auth"}
        
        event = {}
        
        with patch.dict(os.environ, {'API_URL': 'https://test.example.com/api'}):
            result = lambda_handler(event, self.context)
        
        self.assertEqual(result['statusCode'], 201)
        self.assertIn('Successfully processed', result['body'])
        mock_table.put_item.assert_called_once()
    
    def test_lambda_handler_missing_api_url(self):
        event = {}
        
        with patch.dict(os.environ, {}, clear=True):
            result = lambda_handler(event, self.context)
        
        self.assertEqual(result['statusCode'], 500)
    
    @patch('catalog_temp.validate_payload')
    def test_lambda_handler_invalid_payload(self, mock_validate):
        mock_validate.return_value = False
        
        event = {}
        
        with patch.dict(os.environ, {'API_URL': 'https://test.example.com/api'}):
            result = lambda_handler(event, self.context)
        
        self.assertEqual(result['statusCode'], 400)
        self.assertIn('Invalid payload', result['body'])
    
    @patch('catalog_temp.get_table')
    @patch('catalog_temp.sign_request')
    @patch('catalog_temp.http')
    def test_lambda_handler_api_error(self, mock_http, mock_sign, mock_get_table):
        # Mock API error response
        mock_response = Mock()
        mock_response.status = 500
        mock_http.request.return_value = mock_response
        
        mock_sign.return_value = {"Authorization": "test-auth"}
        
        event = {}
        
        with patch.dict(os.environ, {'API_URL': 'https://test.example.com/api'}):
            result = lambda_handler(event, self.context)
        
        self.assertEqual(result['statusCode'], 500)
        self.assertIn('Error processing', result['body'])
    
    @patch('catalog_temp.get_table')
    @patch('catalog_temp.sign_request')
    @patch('catalog_temp.http')
    def test_lambda_handler_exception(self, mock_http, mock_sign, mock_get_table):
        # Mock exception
        mock_http.request.side_effect = Exception("Network error")
        mock_sign.return_value = {"Authorization": "test-auth"}
        
        event = {}
        
        with patch.dict(os.environ, {'API_URL': 'https://test.example.com/api'}):
            result = lambda_handler(event, self.context)
        
        self.assertEqual(result['statusCode'], 500)
        self.assertIn('Unexpected error', result['body'])


if __name__ == '__main__':
    unittest.main()