AWS lambda Error: Data transformation in firehose. The Lambda function .. it returned an error result
While working with lambda to transform data , you could come across this error. We will find this error in Destination error logs. The error is "The Lambda function was successfully invoked but it returned an error result" . If you Check the releated lambda logs , you will most probably find out some errors related to key like "lambda_handler for record in event['Records']: KeyError: 'Records'".
Solution:
Its basiclly boils down, how you have setup the Firehose,Kinesis and Lambda pipeline. If your Kinesis stream triggers a Lambda to delivers the data to Firehose, then you'll be interested in Kinesis Record Event. Checkout Using AWS Lambda with Amazon Kinesis. Sample event below
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1545084650.987
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
"data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
"approximateArrivalTimestamp": 1545084711.166
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}
]
}
Another setup is to be Firehose polling the Kinesis stream. Also, we get the flexibility to setup a transformation Lambda for Firehose (Amazon Kinesis Data Firehose Data Transformation). In this setup sample event will be as follows (Using AWS Lambda with Amazon Kinesis Data Firehose)
{
"invocationId": "invoked123",
"deliveryStreamArn": "aws:lambda:events",
"region": "us-west-2",
"records": [
{
"data": "SGVsbG8gV29ybGQ=",
"recordId": "record1",
"approximateArrivalTimestamp": 1510772160000,
"kinesisRecordMetadata": {
"shardId": "shardId-000000000000",
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a",
"approximateArrivalTimestamp": "2012-04-23T18:25:43.511Z",
"sequenceNumber": "49546986683135544286507457936321625675700192471156785154",
"subsequenceNumber": ""
}
},
{
"data": "SGVsbG8gV29ybGQ=",
"recordId": "record2",
"approximateArrivalTimestamp": 151077216000,
"kinesisRecordMetadata": {
"shardId": "shardId-000000000001",
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a",
"approximateArrivalTimestamp": "2012-04-23T19:25:43.511Z",
"sequenceNumber": "49546986683135544286507457936321625675700192471156785155",
"subsequenceNumber": ""
}
}
]
}
In my Use case its the second type of the setup I've done. But still i was getting theKey Errors. I was running Python 3.9 and not decoding properly. I have given workingcode below, which you can refer. I'll explain the code in a later Post.
import base64import jsonimport calendarimport timeprint('Loading function')def lambda_handler(event, context):output = []now = calendar.timegm(time.gmtime())for record in event['records']:print(record['recordId'])payload = base64.b64decode(record['data']).decode('utf-8')# Do custom processing on the payload heremessage = json.loads(base64.b64decode(record['data']).decode('utf-8'))if float(message['CHANGE']) < 0:trend = 'DOWN'elif float(message['CHANGE']) > 0:trend = 'POSITIVE'# Construct outputdata_field = {'timestamp': now,'trend' : trend,'SECTOR': message['SECTOR'],'CHANGE': float(message['CHANGE']),'TICKER_SYMBOL': message['TICKER_SYMBOL'],'PRICE': float(message['PRICE'])}output_record = {'recordId': record['recordId'],'result': 'Ok',# 'data': base64.b64encode(payload.encode('utf-8')).decode('utf-8')'data': base64.b64encode(json.dumps(data_field).encode('utf-8')).decode('utf-8')}output.append(output_record)print('Successfully processed {} records.'.format(len(event['records'])))return {'records': output}
No comments