Developer Console
アクセスいただきありがとうございます。こちらのページは現在英語のみのご用意となっております。順次日本語化を進めてまいりますので、ご理解のほどよろしくお願いいたします。

Catalog API Get Status

Summary

The Get Status code API is used to get the catalog upload status.

Python Code

import json
import boto3
import os
import time
import urllib3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from urllib.parse import urlparse

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['TABLE_NAME'])
API_ENDPOINT = os.environ['API_ENDPOINT']
API_REGION = os.environ['API_REGION']
http = urllib3.PoolManager()

def sign_request(url, method='POST', body=None):
    session = boto3.Session()
    credentials = session.get_credentials()
    parsed_url = urlparse(url)
    host = parsed_url.netloc
    request = AWSRequest(
        method=method,
        url=url,
        data=body
    )
    request.headers.add_header('host', host)
    SigV4Auth(credentials, 'execute-api', API_REGION).add_auth(request)
    return dict(request.headers)

def handler(event, context):
    try:
        print(f"Input event: {json.dumps(event)}")
        body = json.dumps({"catalogItems": event['catalogItems']})
        headers = sign_request(API_ENDPOINT, 'POST', body)
        headers.update({
            'Content-Type': 'application/json',
            'Accept': 'application/json'
        })

        response = http.request(
            'POST',
            API_ENDPOINT,
            body=body,
            headers=headers
        )

        print(f"Response status: {response.status}")
        print(f"Response data: {response.data.decode('utf-8')}")

        try:
            response_data = json.loads(response.data.decode('utf-8'))
        except json.JSONDecodeError as e:
            print(f"Failed to decode response JSON: {e}")
            return {
                'statusCode': 500,
                'body': json.dumps({
                    'error': 'Invalid JSON response from API',
                    'raw_response': response.data.decode('utf-8')
                })
            }

        if response.status == 201:
            if not isinstance(response_data, dict):
                print(f"Unexpected response data type: {type(response_data)}")
                return {
                    'statusCode': 500,
                    'body': json.dumps({
                        'error': 'Invalid response format',
                        'response': response_data
                    })
                }

            process_id = response_data.get('id') or response_data.get('requestId') or str(time.time())
            item = {
                'id': process_id,
                'status': 'PENDING',
                'payload': event['catalogItems'],
                'response': response_data,
                'timestamp': int(time.time() * 1000),
                'ttl': int(time.time()) + (7 * 24 * 60 * 60),
                'requires_status_check': True
            }
            table.put_item(Item=item)

            return {
                'statusCode': 200,
                'body': json.dumps({
                    'id': process_id,
                    'status': 'PENDING',
                    'requiresStatusCheck': True,
                    'response': response_data
                })
            }
        else:
            return {
                'statusCode': response.status,
                'body': json.dumps({
                    'message': 'Process completed',
                    'response': response_data,
                    'requiresStatusCheck': False
                })
            }

    except KeyError as e:
        print(f"KeyError: Missing required field - {str(e)}")
        return {
            'statusCode': 400,
            'body': json.dumps({
                'error': f'Missing required field: {str(e)}',
                'type': 'KeyError'
            })
        }

    except Exception as e:
        print(f"Unexpected error: {str(e)}")
        import traceback
        print(f"Traceback: {traceback.format_exc()}")
        return {
            'statusCode': 500,
            'body': json.dumps({
                'error': str(e),
                'type': str(type(e).__name__),
                'trace': traceback.format_exc()
            })
        }

Unit Tests

import pytest
import json
import time
from unittest.mock import Mock, patch, MagicMock
import sys
import os

# Add the src directory to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'src')))

# We'll patch the environment variables before importing the lambda function
with patch.dict('os.environ', {
    'TABLE_NAME': 'test-table',
    'API_ENDPOINT': 'https://api.test.com/endpoint',
    'API_REGION': 'us-west-*'
}):
    from asset_catalog_api_solution.lambda_functions.check_status_lambda_function import handler, sign_request

@pytest.fixture(autouse=True)
def mock_env_variables():
    """
    Automatically used fixture that mocks environment variables for all tests
    """
    with patch.dict('os.environ', {
        'TABLE_NAME': 'test-table',
        'API_ENDPOINT': 'https://api.test.com/endpoint',
        'API_REGION': 'us-west-*'
    }) as mock_env:
        yield mock_env

@pytest.fixture
def mock_boto3_session():
    mock_session = MagicMock()
    mock_credentials = MagicMock()
    mock_session.get_credentials.return_value = mock_credentials
    with patch('boto3.Session', return_value=mock_session):
        yield mock_session

@pytest.fixture
def mock_dynamodb():
    with patch('boto3.resource') as mock_resource:
        mock_table = MagicMock()
        mock_resource.return_value.Table.return_value = mock_table
        yield mock_table

@pytest.fixture
def mock_http():
    with patch('urllib3.PoolManager') as mock_pool_manager:
        mock_response = MagicMock()
        mock_pool_manager.return_value.request.return_value = mock_response
        yield mock_response

def test_sign_request(mock_env_variables):
    with patch('boto3.Session') as mock_session:
        mock_credentials = MagicMock()
        mock_session.return_value.get_credentials.return_value = mock_credentials

        url = 'https://api.test.com/endpoint'
        headers = sign_request(url, 'POST', '{"test": "data"}')

        assert isinstance(headers, dict)
        assert 'host' in headers

def test_successful_request(mock_env_variables, mock_dynamodb, mock_http, mock_boto3_session):
    mock_http.status = 201
    mock_http.data = json.dumps({'id': 'test-id'}).encode('utf-8')

    event = {
        'catalogItems': ['item1', 'item2']
    }

    response = handler(event, None)

    assert response['statusCode'] == 200
    response_body = json.loads(response['body'])
    assert response_body['id'] == 'test-id'
    assert response_body['status'] == 'PENDING'
    assert response_body['requiresStatusCheck'] is True

    mock_dynamodb.put_item.assert_called_once()

def test_failed_request(mock_env_variables, mock_dynamodb, mock_http):
    mock_http.status = 400
    mock_http.data = json.dumps({'error': 'Bad Request'}).encode('utf-8')

    event = {
        'catalogItems': ['item1', 'item2']
    }

    response = handler(event, None)

    assert response['statusCode'] == 400
    response_body = json.loads(response['body'])
    assert 'response' in response_body
    assert response_body['requiresStatusCheck'] is False

def test_missing_required_field(mock_env_variables):
    event = {}

    response = handler(event, None)

    assert response['statusCode'] == 400
    response_body = json.loads(response['body'])
    assert 'error' in response_body
    assert 'Missing required field' in response_body['error']

def test_invalid_json_response(mock_env_variables, mock_http):
    mock_http.status = 201
    mock_http.data = 'Invalid JSON'.encode('utf-8')

    event = {
        'catalogItems': ['item1', 'item2']
    }

    response = handler(event, None)

    assert response['statusCode'] == 500
    response_body = json.loads(response['body'])
    assert 'error' in response_body
    assert 'Invalid JSON response from API' in response_body['error']

def test_unexpected_response_format(mock_env_variables, mock_http):
    mock_http.status = 201
    mock_http.data = json.dumps(['unexpected', 'format']).encode('utf-8')

    event = {
        'catalogItems': ['item1', 'item2']
    }

    response = handler(event, None)

    assert response['statusCode'] == 500
    response_body = json.loads(response['body'])
    assert 'error' in response_body
    assert 'Invalid response format' in response_body['error']

def test_unexpected_exception(mock_env_variables, mock_http):
    mock_http.request.side_effect = Exception('Unexpected error')

    event = {
        'catalogItems': ['item1', 'item2']
    }

    response = handler(event, None)

    assert response['statusCode'] == 500
    response_body = json.loads(response['body'])
    assert 'error' in response_body
    assert 'type' in response_body
    assert 'trace' in response_body