Catalog API Get Status
Summary
The Get Status code API is used to get the catalog upload status.
Note: Make sure to follow your organization's process to prepare the code for production release.
Python Code
import json
import boto3
import os
import time
import urllib3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from urllib.parse import urlparse
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['TABLE_NAME'])
API_ENDPOINT = os.environ['API_ENDPOINT']
API_REGION = os.environ['API_REGION']
http = urllib3.PoolManager()
def sign_request(url, method='POST', body=None):
session = boto3.Session()
credentials = session.get_credentials()
parsed_url = urlparse(url)
host = parsed_url.netloc
request = AWSRequest(
method=method,
url=url,
data=body
)
request.headers.add_header('host', host)
SigV4Auth(credentials, 'execute-api', API_REGION).add_auth(request)
return dict(request.headers)
def handler(event, context):
try:
print(f"Input event: {json.dumps(event)}")
body = json.dumps({"catalogItems": event['catalogItems']})
headers = sign_request(API_ENDPOINT, 'POST', body)
headers.update({
'Content-Type': 'application/json',
'Accept': 'application/json'
})
response = http.request(
'POST',
API_ENDPOINT,
body=body,
headers=headers
)
print(f"Response status: {response.status}")
print(f"Response data: {response.data.decode('utf-8')}")
try:
response_data = json.loads(response.data.decode('utf-8'))
except json.JSONDecodeError as e:
print(f"Failed to decode response JSON: {e}")
return {
'statusCode': 500,
'body': json.dumps({
'error': 'Invalid JSON response from API',
'raw_response': response.data.decode('utf-8')
})
}
if response.status == 201:
if not isinstance(response_data, dict):
print(f"Unexpected response data type: {type(response_data)}")
return {
'statusCode': 500,
'body': json.dumps({
'error': 'Invalid response format',
'response': response_data
})
}
process_id = response_data.get('id') or response_data.get('requestId') or str(time.time())
item = {
'id': process_id,
'status': 'PENDING',
'payload': event['catalogItems'],
'response': response_data,
'timestamp': int(time.time() * 1000),
'ttl': int(time.time()) + (7 * 24 * 60 * 60),
'requires_status_check': True
}
table.put_item(Item=item)
return {
'statusCode': 200,
'body': json.dumps({
'id': process_id,
'status': 'PENDING',
'requiresStatusCheck': True,
'response': response_data
})
}
else:
return {
'statusCode': response.status,
'body': json.dumps({
'message': 'Process completed',
'response': response_data,
'requiresStatusCheck': False
})
}
except KeyError as e:
print(f"KeyError: Missing required field - {str(e)}")
return {
'statusCode': 400,
'body': json.dumps({
'error': f'Missing required field: {str(e)}',
'type': 'KeyError'
})
}
except Exception as e:
print(f"Unexpected error: {str(e)}")
import traceback
print(f"Traceback: {traceback.format_exc()}")
return {
'statusCode': 500,
'body': json.dumps({
'error': str(e),
'type': str(type(e).__name__),
'trace': traceback.format_exc()
})
}
Unit Tests
import pytest
import json
import time
from unittest.mock import Mock, patch, MagicMock
import sys
import os
# Add the src directory to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'src')))
# We'll patch the environment variables before importing the lambda function
with patch.dict('os.environ', {
'TABLE_NAME': 'test-table',
'API_ENDPOINT': 'https://api.test.com/endpoint',
'API_REGION': 'us-west-*'
}):
from asset_catalog_api_solution.lambda_functions.check_status_lambda_function import handler, sign_request
@pytest.fixture(autouse=True)
def mock_env_variables():
"""
Automatically used fixture that mocks environment variables for all tests
"""
with patch.dict('os.environ', {
'TABLE_NAME': 'test-table',
'API_ENDPOINT': 'https://api.test.com/endpoint',
'API_REGION': 'us-west-*'
}) as mock_env:
yield mock_env
@pytest.fixture
def mock_boto3_session():
mock_session = MagicMock()
mock_credentials = MagicMock()
mock_session.get_credentials.return_value = mock_credentials
with patch('boto3.Session', return_value=mock_session):
yield mock_session
@pytest.fixture
def mock_dynamodb():
with patch('boto3.resource') as mock_resource:
mock_table = MagicMock()
mock_resource.return_value.Table.return_value = mock_table
yield mock_table
@pytest.fixture
def mock_http():
with patch('urllib3.PoolManager') as mock_pool_manager:
mock_response = MagicMock()
mock_pool_manager.return_value.request.return_value = mock_response
yield mock_response
def test_sign_request(mock_env_variables):
with patch('boto3.Session') as mock_session:
mock_credentials = MagicMock()
mock_session.return_value.get_credentials.return_value = mock_credentials
url = 'https://api.test.com/endpoint'
headers = sign_request(url, 'POST', '{"test": "data"}')
assert isinstance(headers, dict)
assert 'host' in headers
def test_successful_request(mock_env_variables, mock_dynamodb, mock_http, mock_boto3_session):
mock_http.status = 201
mock_http.data = json.dumps({'id': 'test-id'}).encode('utf-8')
event = {
'catalogItems': ['item1', 'item2']
}
response = handler(event, None)
assert response['statusCode'] == 200
response_body = json.loads(response['body'])
assert response_body['id'] == 'test-id'
assert response_body['status'] == 'PENDING'
assert response_body['requiresStatusCheck'] is True
mock_dynamodb.put_item.assert_called_once()
def test_failed_request(mock_env_variables, mock_dynamodb, mock_http):
mock_http.status = 400
mock_http.data = json.dumps({'error': 'Bad Request'}).encode('utf-8')
event = {
'catalogItems': ['item1', 'item2']
}
response = handler(event, None)
assert response['statusCode'] == 400
response_body = json.loads(response['body'])
assert 'response' in response_body
assert response_body['requiresStatusCheck'] is False
def test_missing_required_field(mock_env_variables):
event = {}
response = handler(event, None)
assert response['statusCode'] == 400
response_body = json.loads(response['body'])
assert 'error' in response_body
assert 'Missing required field' in response_body['error']
def test_invalid_json_response(mock_env_variables, mock_http):
mock_http.status = 201
mock_http.data = 'Invalid JSON'.encode('utf-8')
event = {
'catalogItems': ['item1', 'item2']
}
response = handler(event, None)
assert response['statusCode'] == 500
response_body = json.loads(response['body'])
assert 'error' in response_body
assert 'Invalid JSON response from API' in response_body['error']
def test_unexpected_response_format(mock_env_variables, mock_http):
mock_http.status = 201
mock_http.data = json.dumps(['unexpected', 'format']).encode('utf-8')
event = {
'catalogItems': ['item1', 'item2']
}
response = handler(event, None)
assert response['statusCode'] == 500
response_body = json.loads(response['body'])
assert 'error' in response_body
assert 'Invalid response format' in response_body['error']
def test_unexpected_exception(mock_env_variables, mock_http):
mock_http.request.side_effect = Exception('Unexpected error')
event = {
'catalogItems': ['item1', 'item2']
}
response = handler(event, None)
assert response['statusCode'] == 500
response_body = json.loads(response['body'])
assert 'error' in response_body
assert 'type' in response_body
assert 'trace' in response_body