From 3a1a1b4d7be1aae268540ce80bc609c94680792f Mon Sep 17 00:00:00 2001 From: Artur Barseghyan Date: Wed, 12 Oct 2022 15:32:15 +0200 Subject: [PATCH] Blackify and isortify --- src/asyncproc.py | 178 ++++++------ src/datastore.py | 107 ++++--- src/docproc.py | 82 +++--- src/events.py | 130 ++++----- src/helper.py | 86 +++--- src/jobresultsproc.py | 81 +++--- src/og.py | 35 ++- src/s3batchproc.py | 65 +++-- src/s3proc.py | 26 +- src/syncproc.py | 107 +++---- src/test.py | 152 +++++----- src/trp.py | 260 ++++++++++-------- .../lambda/asyncprocessor/lambda_function.py | 178 ++++++------ .../documentprocessor/lambda_function.py | 82 +++--- .../lambda/helper/python/datastore.py | 107 ++++--- .../lambda/helper/python/helper.py | 86 +++--- .../jobresultprocessor/lambda_function.py | 81 +++--- .../s3batchprocessor/lambda_function.py | 65 +++-- .../lambda/s3processor/lambda_function.py | 26 +- .../lambda/syncprocessor/lambda_function.py | 107 +++---- .../lambda/textractor/python/og.py | 35 ++- .../lambda/textractor/python/trp.py | 260 ++++++++++-------- 22 files changed, 1235 insertions(+), 1101 deletions(-) diff --git a/src/asyncproc.py b/src/asyncproc.py index 623d9c61..f4173e39 100644 --- a/src/asyncproc.py +++ b/src/asyncproc.py @@ -1,113 +1,116 @@ import json -import boto3 import os -from helper import AwsHelper import time -def startJob(bucketName, objectName, documentId, snsTopic, snsRole, detectForms, detectTables): +import boto3 + +from helper import AwsHelper - print("Starting job with documentId: {}, bucketName: {}, objectName: {}".format(documentId, bucketName, objectName)) + +def startJob( + bucketName, objectName, documentId, snsTopic, snsRole, detectForms, detectTables +): + + print( + "Starting job with documentId: {}, bucketName: {}, objectName: {}".format( + documentId, bucketName, objectName + ) + ) response = None - client = AwsHelper().getClient('textract') - if(not detectForms and not detectTables): + client = AwsHelper().getClient("textract") + if not detectForms and not detectTables: response = client.start_document_text_detection( - ClientRequestToken = documentId, - DocumentLocation={ - 'S3Object': { - 'Bucket': bucketName, - 'Name': objectName - } - }, - NotificationChannel= { - "RoleArn": snsRole, - "SNSTopicArn": snsTopic - }, - JobTag = documentId) + ClientRequestToken=documentId, + DocumentLocation={"S3Object": {"Bucket": bucketName, "Name": objectName}}, + NotificationChannel={"RoleArn": snsRole, "SNSTopicArn": snsTopic}, + JobTag=documentId, + ) else: - features = [] - if(detectTables): + features = [] + if detectTables: features.append("TABLES") - if(detectForms): + if detectForms: features.append("FORMS") response = client.start_document_analysis( - ClientRequestToken = documentId, - DocumentLocation={ - 'S3Object': { - 'Bucket': bucketName, - 'Name': objectName - } - }, + ClientRequestToken=documentId, + DocumentLocation={"S3Object": {"Bucket": bucketName, "Name": objectName}}, FeatureTypes=features, - NotificationChannel= { - "RoleArn": snsRole, - "SNSTopicArn": snsTopic - }, - JobTag = documentId) + NotificationChannel={"RoleArn": snsRole, "SNSTopicArn": snsTopic}, + JobTag=documentId, + ) return response["JobId"] def processItem(message, snsTopic, snsRole): - print('message:') + print("message:") print(message) - messageBody = json.loads(message['Body']) + messageBody = json.loads(message["Body"]) - bucketName = messageBody['bucketName'] - objectName = messageBody['objectName'] - documentId = messageBody['documentId'] - features = messageBody['features'] + bucketName = messageBody["bucketName"] + objectName = messageBody["objectName"] + documentId = messageBody["documentId"] + features = messageBody["features"] - print('Bucket Name: ' + bucketName) - print('Object Name: ' + objectName) - print('Task ID: ' + documentId) + print("Bucket Name: " + bucketName) + print("Object Name: " + objectName) + print("Task ID: " + documentId) print("API: {}".format(features)) - print('starting Textract job...') + print("starting Textract job...") - detectForms = 'Forms' in features - detectTables = 'Tables' in features + detectForms = "Forms" in features + detectTables = "Tables" in features - jobId = startJob(bucketName, objectName, documentId, snsTopic, snsRole, detectForms, detectTables) + jobId = startJob( + bucketName, objectName, documentId, snsTopic, snsRole, detectForms, detectTables + ) - if(jobId): + if jobId: print("Started Job with Id: {}".format(jobId)) return jobId + def changeVisibility(sqs, qUrl, receipt_handle): try: sqs.change_message_visibility( - QueueUrl=qUrl, - ReceiptHandle=receipt_handle, - VisibilityTimeout=0 - ) + QueueUrl=qUrl, ReceiptHandle=receipt_handle, VisibilityTimeout=0 + ) except Exception as e: - print("Failed to change visibility for {} with error: {}".format(receipt_handle, e)) + print( + "Failed to change visibility for {} with error: {}".format( + receipt_handle, e + ) + ) + -def getMessagesFromQueue(sqs, qUrl,): +def getMessagesFromQueue( + sqs, + qUrl, +): # Receive message from SQS queue response = sqs.receive_message( - QueueUrl=qUrl, - MaxNumberOfMessages=1, - VisibilityTimeout=60 #14400 + QueueUrl=qUrl, MaxNumberOfMessages=1, VisibilityTimeout=60 # 14400 ) - print('SQS Response Recieved:') + print("SQS Response Recieved:") print(response) - if('Messages' in response): - return response['Messages'] + if "Messages" in response: + return response["Messages"] else: print("No messages in queue.") return None + def processItems(qUrl, snsTopic, snsRole): - sqs = AwsHelper().getClient('sqs') + sqs = AwsHelper().getClient("sqs") messages = getMessagesFromQueue(sqs, qUrl) jc = 0 @@ -115,48 +118,47 @@ def processItems(qUrl, snsTopic, snsRole): hitLimit = False limitException = None - if(messages): - + if messages: totalMessages = len(messages) print("Total messages: {}".format(totalMessages)) for message in messages: - receipt_handle = message['ReceiptHandle'] + receipt_handle = message["ReceiptHandle"] try: - if(hitLimit): + if hitLimit: changeVisibility(sqs, qUrl, receipt_handle) else: print("starting job...") processItem(message, snsTopic, snsRole) print("started job...") - print('Deleting item from queue...') + print("Deleting item from queue...") # Delete received message from queue - sqs.delete_message( - QueueUrl=qUrl, - ReceiptHandle=receipt_handle - ) - print('Deleted item from queue...') + sqs.delete_message(QueueUrl=qUrl, ReceiptHandle=receipt_handle) + print("Deleted item from queue...") jc += 1 except Exception as e: print("Error while starting job or deleting from queue: {}".format(e)) changeVisibility(sqs, qUrl, receipt_handle) - if(e.__class__.__name__ == 'LimitExceededException' - or e.__class__.__name__ == "ProvisionedThroughputExceededException"): + if ( + e.__class__.__name__ == "LimitExceededException" + or e.__class__.__name__ == "ProvisionedThroughputExceededException" + ): hitLimit = True limitException = e - if(hitLimit): + if hitLimit: raise limitException return totalMessages, jc + def processRequest(request): - qUrl = request['qUrl'] - snsTopic = request['snsTopic'] - snsRole = request['snsRole'] + qUrl = request["qUrl"] + snsTopic = request["snsTopic"] + snsRole = request["snsRole"] i = 0 max = 100 @@ -166,24 +168,24 @@ def processRequest(request): hitLimit = False provisionedThroughputExceededCount = 0 - while(i < max): + while i < max: try: tc, jc = processItems(qUrl, snsTopic, snsRole) totalJobsScheduled += jc - if(tc == 0): + if tc == 0: i = max except Exception as e: - if(e.__class__.__name__ == 'LimitExceededException'): + if e.__class__.__name__ == "LimitExceededException": print("Exception: Hit limit.") hitLimit = True i = max - elif(e.__class__.__name__ == "ProvisionedThroughputExceededException"): + elif e.__class__.__name__ == "ProvisionedThroughputExceededException": print("ProvisionedThroughputExceededException.") provisionedThroughputExceededCount += 1 - if(provisionedThroughputExceededCount > 5): + if provisionedThroughputExceededCount > 5: i = max else: print("Waiting for few seconds...") @@ -193,15 +195,13 @@ def processRequest(request): i += 1 output = "Started {} jobs.".format(totalJobsScheduled) - if(hitLimit): + if hitLimit: output += " Hit limit." print(output) - return { - 'statusCode': 200, - 'body': output - } + return {"statusCode": 200, "body": output} + def lambda_handler(event, context): @@ -209,8 +209,8 @@ def lambda_handler(event, context): request = {} - request["qUrl"] = os.environ['ASYNC_QUEUE_URL'] - request["snsTopic"] = os.environ['SNS_TOPIC_ARN'] - request["snsRole"] = os.environ['SNS_ROLE_ARN'] + request["qUrl"] = os.environ["ASYNC_QUEUE_URL"] + request["snsTopic"] = os.environ["SNS_TOPIC_ARN"] + request["snsRole"] = os.environ["SNS_ROLE_ARN"] return processRequest(request) diff --git a/src/datastore.py b/src/datastore.py index d4ff570c..1733aca4 100644 --- a/src/datastore.py +++ b/src/datastore.py @@ -1,10 +1,12 @@ +import datetime + import boto3 from botocore.exceptions import ClientError + from helper import AwsHelper -import datetime -class DocumentStore: +class DocumentStore: def __init__(self, documentsTableName, outputTableName): self._documentsTableName = documentsTableName self._outputTableName = outputTableName @@ -18,21 +20,21 @@ def createDocument(self, documentId, bucketName, objectName): try: table.update_item( - Key = { "documentId": documentId }, - UpdateExpression = 'SET bucketName = :bucketNameValue, objectName = :objectNameValue, documentStatus = :documentstatusValue, documentCreatedOn = :documentCreatedOnValue', - ConditionExpression = 'attribute_not_exists(documentId)', - ExpressionAttributeValues = { - ':bucketNameValue': bucketName, - ':objectNameValue': objectName, - ':documentstatusValue': 'IN_PROGRESS', - ':documentCreatedOnValue': str(datetime.datetime.utcnow()) - } + Key={"documentId": documentId}, + UpdateExpression="SET bucketName = :bucketNameValue, objectName = :objectNameValue, documentStatus = :documentstatusValue, documentCreatedOn = :documentCreatedOnValue", + ConditionExpression="attribute_not_exists(documentId)", + ExpressionAttributeValues={ + ":bucketNameValue": bucketName, + ":objectNameValue": objectName, + ":documentstatusValue": "IN_PROGRESS", + ":documentCreatedOnValue": str(datetime.datetime.utcnow()), + }, ) except ClientError as e: print(e) - if e.response['Error']['Code'] == "ConditionalCheckFailedException": - print(e.response['Error']['Message']) - err = {'Error' : 'Document already exist.'} + if e.response["Error"]["Code"] == "ConditionalCheckFailedException": + print(e.response["Error"]["Message"]) + err = {"Error": "Document already exist."} else: raise @@ -47,17 +49,15 @@ def updateDocumentStatus(self, documentId, documentStatus): try: table.update_item( - Key = { 'documentId': documentId }, - UpdateExpression = 'SET documentStatus= :documentstatusValue', - ConditionExpression = 'attribute_exists(documentId)', - ExpressionAttributeValues = { - ':documentstatusValue': documentStatus - } + Key={"documentId": documentId}, + UpdateExpression="SET documentStatus= :documentstatusValue", + ConditionExpression="attribute_exists(documentId)", + ExpressionAttributeValues={":documentstatusValue": documentStatus}, ) except ClientError as e: - if e.response['Error']['Code'] == "ConditionalCheckFailedException": - print(e.response['Error']['Message']) - err = {'Error' : 'Document does not exist.'} + if e.response["Error"]["Code"] == "ConditionalCheckFailedException": + print(e.response["Error"]["Message"]) + err = {"Error": "Document does not exist."} else: raise @@ -72,18 +72,18 @@ def markDocumentComplete(self, documentId): try: table.update_item( - Key = { 'documentId': documentId }, - UpdateExpression = 'SET documentStatus= :documentstatusValue, documentCompletedOn = :documentCompletedOnValue', - ConditionExpression = 'attribute_exists(documentId)', - ExpressionAttributeValues = { - ':documentstatusValue': "SUCCEEDED", - ':documentCompletedOnValue': str(datetime.datetime.utcnow()) - } + Key={"documentId": documentId}, + UpdateExpression="SET documentStatus= :documentstatusValue, documentCompletedOn = :documentCompletedOnValue", + ConditionExpression="attribute_exists(documentId)", + ExpressionAttributeValues={ + ":documentstatusValue": "SUCCEEDED", + ":documentCompletedOnValue": str(datetime.datetime.utcnow()), + }, ) except ClientError as e: - if e.response['Error']['Code'] == "ConditionalCheckFailedException": - print(e.response['Error']['Message']) - err = {'Error' : 'Document does not exist.'} + if e.response["Error"]["Code"] == "ConditionalCheckFailedException": + print(e.response["Error"]["Message"]) + err = {"Error": "Document does not exist."} else: raise @@ -94,17 +94,18 @@ def getDocument(self, documentId): dynamodb = AwsHelper().getClient("dynamodb") ddbGetItemResponse = dynamodb.get_item( - Key={'documentId': {'S': documentId} }, - TableName=self._documentsTableName + Key={"documentId": {"S": documentId}}, TableName=self._documentsTableName ) itemToReturn = None - if('Item' in ddbGetItemResponse): - itemToReturn = { 'documentId' : ddbGetItemResponse['Item']['documentId']['S'], - 'bucketName' : ddbGetItemResponse['Item']['bucketName']['S'], - 'objectName' : ddbGetItemResponse['Item']['objectName']['S'], - 'documentStatus' : ddbGetItemResponse['Item']['documentStatus']['S'] } + if "Item" in ddbGetItemResponse: + itemToReturn = { + "documentId": ddbGetItemResponse["Item"]["documentId"]["S"], + "bucketName": ddbGetItemResponse["Item"]["bucketName"]["S"], + "objectName": ddbGetItemResponse["Item"]["objectName"]["S"], + "documentStatus": ddbGetItemResponse["Item"]["documentStatus"]["S"], + } return itemToReturn @@ -113,11 +114,7 @@ def deleteDocument(self, documentId): dynamodb = AwsHelper().getResource("dynamodb") table = dynamodb.Table(self._documentsTableName) - table.delete_item( - Key={ - 'documentId': documentId - } - ) + table.delete_item(Key={"documentId": documentId}) def getDocuments(self, nextToken=None): @@ -126,8 +123,10 @@ def getDocuments(self, nextToken=None): pageSize = 25 - if(nextToken): - response = table.scan(ExclusiveStartKey={ "documentId" : nextToken}, Limit=pageSize) + if nextToken: + response = table.scan( + ExclusiveStartKey={"documentId": nextToken}, Limit=pageSize + ) else: response = table.scan(Limit=pageSize) @@ -135,16 +134,14 @@ def getDocuments(self, nextToken=None): data = [] - if('Items' in response): - data = response['Items'] + if "Items" in response: + data = response["Items"] - documents = { - "documents" : data - } + documents = {"documents": data} - if 'LastEvaluatedKey' in response: - nextToken = response['LastEvaluatedKey']['documentId'] + if "LastEvaluatedKey" in response: + nextToken = response["LastEvaluatedKey"]["documentId"] print("nexToken: {}".format(nextToken)) documents["nextToken"] = nextToken - return documents \ No newline at end of file + return documents diff --git a/src/docproc.py b/src/docproc.py index f0d58964..cd858018 100644 --- a/src/docproc.py +++ b/src/docproc.py @@ -1,18 +1,18 @@ import json import os -from helper import FileHelper, AwsHelper + +from helper import AwsHelper, FileHelper + def postMessage(client, qUrl, jsonMessage): message = json.dumps(jsonMessage) - client.send_message( - QueueUrl=qUrl, - MessageBody=message - ) + client.send_message(QueueUrl=qUrl, MessageBody=message) print("Submitted message to queue: {}".format(message)) + def processRequest(request): output = "" @@ -28,76 +28,90 @@ def processRequest(request): ext = FileHelper.getFileExtenstion(objectName.lower()) print("Extension: {}".format(ext)) - if(ext and ext in ["jpg", "jpeg", "png"]): - qUrl = request['syncQueueUrl'] - elif (ext in ["pdf"]): - qUrl = request['asyncQueueUrl'] + if ext and ext in ["jpg", "jpeg", "png"]: + qUrl = request["syncQueueUrl"] + elif ext in ["pdf"]: + qUrl = request["asyncQueueUrl"] - if(qUrl): + if qUrl: features = ["Text", "Forms", "Tables"] - jsonMessage = { 'documentId' : documentId, - "features" : features, - 'bucketName': bucketName, - 'objectName' : objectName } + jsonMessage = { + "documentId": documentId, + "features": features, + "bucketName": bucketName, + "objectName": objectName, + } - client = AwsHelper().getClient('sqs') + client = AwsHelper().getClient("sqs") postMessage(client, qUrl, jsonMessage) - output = "Completed routing for documentId: {}, object: {}/{}".format(documentId, bucketName, objectName) + output = "Completed routing for documentId: {}, object: {}/{}".format( + documentId, bucketName, objectName + ) print(output) + def processRecord(record, syncQueueUrl, asyncQueueUrl): - + newImage = record["dynamodb"]["NewImage"] - + documentId = None bucketName = None objectName = None documentStatus = None - - if("documentId" in newImage and "S" in newImage["documentId"]): + + if "documentId" in newImage and "S" in newImage["documentId"]: documentId = newImage["documentId"]["S"] - if("bucketName" in newImage and "S" in newImage["bucketName"]): + if "bucketName" in newImage and "S" in newImage["bucketName"]: bucketName = newImage["bucketName"]["S"] - if("objectName" in newImage and "S" in newImage["objectName"]): + if "objectName" in newImage and "S" in newImage["objectName"]: objectName = newImage["objectName"]["S"] - if("documentStatus" in newImage and "S" in newImage["documentStatus"]): + if "documentStatus" in newImage and "S" in newImage["documentStatus"]: documentStatus = newImage["documentStatus"]["S"] - print("DocumentId: {}, BucketName: {}, ObjectName: {}, DocumentStatus: {}".format(documentId, bucketName, objectName, documentStatus)) + print( + "DocumentId: {}, BucketName: {}, ObjectName: {}, DocumentStatus: {}".format( + documentId, bucketName, objectName, documentStatus + ) + ) - if(documentId and bucketName and objectName and documentStatus): + if documentId and bucketName and objectName and documentStatus: request = {} request["documentId"] = documentId request["bucketName"] = bucketName request["objectName"] = objectName - request['syncQueueUrl'] = syncQueueUrl - request['asyncQueueUrl'] = asyncQueueUrl + request["syncQueueUrl"] = syncQueueUrl + request["asyncQueueUrl"] = asyncQueueUrl processRequest(request) + def lambda_handler(event, context): try: - + print("event: {}".format(event)) - syncQueueUrl = os.environ['SYNC_QUEUE_URL'] - asyncQueueUrl = os.environ['ASYNC_QUEUE_URL'] + syncQueueUrl = os.environ["SYNC_QUEUE_URL"] + asyncQueueUrl = os.environ["ASYNC_QUEUE_URL"] - if("Records" in event and event["Records"]): + if "Records" in event and event["Records"]: for record in event["Records"]: try: print("Processing record: {}".format(record)) - if("eventName" in record and record["eventName"] == "INSERT"): - if("dynamodb" in record and record["dynamodb"] and "NewImage" in record["dynamodb"]): + if "eventName" in record and record["eventName"] == "INSERT": + if ( + "dynamodb" in record + and record["dynamodb"] + and "NewImage" in record["dynamodb"] + ): processRecord(record, syncQueueUrl, asyncQueueUrl) except Exception as e: print("Faild to process record. Exception: {}".format(e)) except Exception as e: - print("Failed to process records. Exception: {}".format(e)) \ No newline at end of file + print("Failed to process records. Exception: {}".format(e)) diff --git a/src/events.py b/src/events.py index bc9fbb79..06712684 100644 --- a/src/events.py +++ b/src/events.py @@ -1,119 +1,89 @@ import json + def S3BatchOperationsEvent(bucketArn, objectName): return { - "job" : { - "id" : "1" - }, - "invocationId" : "1", - "invocationSchemaVersion" : "1", - "tasks" : [ - { - "taskId" : "1", - "s3Key" : objectName, - "s3BucketArn" : bucketArn - } - ] + "job": {"id": "1"}, + "invocationId": "1", + "invocationSchemaVersion": "1", + "tasks": [{"taskId": "1", "s3Key": objectName, "s3BucketArn": bucketArn}], } + def s3Event(bucketName, objectName): return { - "Records" : [{ - "s3" : { - "bucket" : { - "name" : bucketName - }, - "object" : { - "key" : objectName - } - } - }] + "Records": [ + {"s3": {"bucket": {"name": bucketName}, "object": {"key": objectName}}} + ] } + def documentEvent(documentId, bucketName, objectName): return { - "Records" : [{ - "eventName" : "INSERT", - "dynamodb" : { - 'NewImage': { - 'documentId': {'S': documentId}, - 'bucketName': {'S': bucketName}, - 'objectName': {'S': objectName}, - 'documentStatus': {'S': "IN_PROGRESS"} - } + "Records": [ + { + "eventName": "INSERT", + "dynamodb": { + "NewImage": { + "documentId": {"S": documentId}, + "bucketName": {"S": bucketName}, + "objectName": {"S": objectName}, + "documentStatus": {"S": "IN_PROGRESS"}, + } + }, } - }] + ] } + def syncQueueDocument(documentId, bucketName, objectName): - + body = { - "documentId": documentId, - "bucketName": bucketName, - "objectName": objectName, - "features" : ["Text", "Forms", "Tables"] + "documentId": documentId, + "bucketName": bucketName, + "objectName": objectName, + "features": ["Text", "Forms", "Tables"], } - return { - "Records" : [{ - "body" : json.dumps(body) - }] - } + return {"Records": [{"body": json.dumps(body)}]} + def jobResultsEvent(jobId, jobTag, jobStatus, jobAPI, bucketName, objectName): - + message = { - "JobId" : jobId, - "JobTag" : jobTag, - "Status" : jobStatus, - "API" : jobAPI, - "DocumentLocation" : { - "S3Bucket" : bucketName, - "S3ObjectName" : objectName - } + "JobId": jobId, + "JobTag": jobTag, + "Status": jobStatus, + "API": jobAPI, + "DocumentLocation": {"S3Bucket": bucketName, "S3ObjectName": objectName}, } - body = { - "Message" : json.dumps(message) - } + body = {"Message": json.dumps(message)} + + return {"Records": [{"body": json.dumps(body)}]} - return { - "Records" : [{ - "body" : json.dumps(body) - }] - } def searchEvent(keyword): - return { - "resource" : "/search", - "queryStringParameters" : { - "k" : keyword - } - } + return {"resource": "/search", "queryStringParameters": {"k": keyword}} + def createDocumentEvent(bucketName, objectName): - + return { - "resource" : "/document", - "queryStringParameters" : { - "bucketname" : bucketName, - "objectName" : objectName - } + "resource": "/document", + "queryStringParameters": {"bucketname": bucketName, "objectName": objectName}, } - + + def getDocumentEvent(documentId): return { - "resource" : "/document", - "queryStringParameters" : { - "documentid" : documentId - } + "resource": "/document", + "queryStringParameters": {"documentid": documentId}, } + def getDocumentsEvent(): - - return { - "resource" : "/documents" - } + return {"resource": "/documents"} diff --git a/src/helper.py b/src/helper.py index 77c55dda..0c29b40d 100644 --- a/src/helper.py +++ b/src/helper.py @@ -1,12 +1,13 @@ -import boto3 -from botocore.client import Config -import os import csv import io +import os + +import boto3 from boto3.dynamodb.conditions import Key +from botocore.client import Config -class DynamoDBHelper: +class DynamoDBHelper: @staticmethod def getItems(tableName, key, value): items = None @@ -17,7 +18,7 @@ def getItems(tableName, key, value): if key is not None and value is not None: filter = Key(key).eq(value) queryResult = table.query(KeyConditionExpression=filter) - if(queryResult and "Items" in queryResult): + if queryResult and "Items" in queryResult: items = queryResult["Items"] return items @@ -35,50 +36,40 @@ def insertItem(tableName, itemData): @staticmethod def deleteItems(tableName, key, value, sk): items = DynamoDBHelper.getItems(tableName, key, value) - if(items): + if items: ddb = AwsHelper().getResource("dynamodb") table = ddb.Table(tableName) for item in items: print("Deleting...") print("{} : {}".format(key, item[key])) print("{} : {}".format(sk, item[sk])) - table.delete_item( - Key={ - key: value, - sk : item[sk] - }) + table.delete_item(Key={key: value, sk: item[sk]}) print("Deleted...") + class AwsHelper: def getClient(self, name, awsRegion=None): - config = Config( - retries = dict( - max_attempts = 30 - ) - ) - if(awsRegion): + config = Config(retries=dict(max_attempts=30)) + if awsRegion: return boto3.client(name, region_name=awsRegion, config=config) else: return boto3.client(name, config=config) def getResource(self, name, awsRegion=None): - config = Config( - retries = dict( - max_attempts = 30 - ) - ) + config = Config(retries=dict(max_attempts=30)) - if(awsRegion): + if awsRegion: return boto3.resource(name, region_name=awsRegion, config=config) else: return boto3.resource(name, config=config) + class S3Helper: @staticmethod def getS3BucketRegion(bucketName): - client = boto3.client('s3') + client = boto3.client("s3") response = client.get_bucket_location(Bucket=bucketName) - awsRegion = response['LocationConstraint'] + awsRegion = response["LocationConstraint"] return awsRegion @staticmethod @@ -90,49 +81,50 @@ def getFileNames(bucketName, prefix, maxPages, allowedFileTypes, awsRegion=None) hasMoreContent = True continuationToken = None - s3client = AwsHelper().getClient('s3', awsRegion) + s3client = AwsHelper().getClient("s3", awsRegion) - while(hasMoreContent and currentPage <= maxPages): - if(continuationToken): + while hasMoreContent and currentPage <= maxPages: + if continuationToken: listObjectsResponse = s3client.list_objects_v2( Bucket=bucketName, Prefix=prefix, - ContinuationToken=continuationToken) + ContinuationToken=continuationToken, + ) else: listObjectsResponse = s3client.list_objects_v2( - Bucket=bucketName, - Prefix=prefix) + Bucket=bucketName, Prefix=prefix + ) - if(listObjectsResponse['IsTruncated']): - continuationToken = listObjectsResponse['NextContinuationToken'] + if listObjectsResponse["IsTruncated"]: + continuationToken = listObjectsResponse["NextContinuationToken"] else: hasMoreContent = False - for doc in listObjectsResponse['Contents']: - docName = doc['Key'] + for doc in listObjectsResponse["Contents"]: + docName = doc["Key"] docExt = FileHelper.getFileExtenstion(docName) docExtLower = docExt.lower() - if(docExtLower in allowedFileTypes): + if docExtLower in allowedFileTypes: files.append(docName) return files @staticmethod def writeToS3(content, bucketName, s3FileName, awsRegion=None): - s3 = AwsHelper().getResource('s3', awsRegion) + s3 = AwsHelper().getResource("s3", awsRegion) object = s3.Object(bucketName, s3FileName) object.put(Body=content) @staticmethod def readFromS3(bucketName, s3FileName, awsRegion=None): - s3 = AwsHelper().getResource('s3', awsRegion) + s3 = AwsHelper().getResource("s3", awsRegion) obj = s3.Object(bucketName, s3FileName) - return obj.get()['Body'].read().decode('utf-8') + return obj.get()["Body"].read().decode("utf-8") @staticmethod def writeCSV(fieldNames, csvData, bucketName, s3FileName, awsRegion=None): csv_file = io.StringIO() - #with open(fileName, 'w') as csv_file: + # with open(fileName, 'w') as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fieldNames) writer.writeheader() @@ -148,7 +140,7 @@ def writeCSV(fieldNames, csvData, bucketName, s3FileName, awsRegion=None): @staticmethod def writeCSVRaw(csvData, bucketName, s3FileName): csv_file = io.StringIO() - #with open(fileName, 'w') as csv_file: + # with open(fileName, 'w') as csv_file: writer = csv.writer(csv_file) for item in csvData: writer.writerow(item) @@ -174,27 +166,27 @@ def getFileExtenstion(fileName): dn, dext = os.path.splitext(basename) return dext[1:] - @staticmethod def readFile(fileName): - with open(fileName, 'r') as document: + with open(fileName, "r") as document: return document.read() @staticmethod def writeToFile(fileName, content): - with open(fileName, 'w') as document: + with open(fileName, "w") as document: document.write(content) @staticmethod def writeToFileWithMode(fileName, content, mode): with open(fileName, mode) as document: document.write(content) + @staticmethod def getFilesInFolder(path, fileTypes): for file in os.listdir(path): if os.path.isfile(os.path.join(path, file)): ext = FileHelper.getFileExtenstion(file) - if(ext.lower() in fileTypes): + if ext.lower() in fileTypes: yield file @staticmethod @@ -208,7 +200,7 @@ def getFileNames(path, allowedLocalFileTypes): @staticmethod def writeCSV(fileName, fieldNames, csvData): - with open(fileName, 'w') as csv_file: + with open(fileName, "w") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fieldNames) writer.writeheader() @@ -222,7 +214,7 @@ def writeCSV(fileName, fieldNames, csvData): @staticmethod def writeCSVRaw(fileName, csvData): - with open(fileName, 'w') as csv_file: + with open(fileName, "w") as csv_file: writer = csv.writer(csv_file) for item in csvData: writer.writerow(item) diff --git a/src/jobresultsproc.py b/src/jobresultsproc.py index cdc8c260..48212c5b 100644 --- a/src/jobresultsproc.py +++ b/src/jobresultsproc.py @@ -1,10 +1,13 @@ import json import os -import boto3 import time + +import boto3 + +import datastore from helper import AwsHelper from og import OutputGenerator -import datastore + def getJobResults(api, jobId): @@ -12,47 +15,50 @@ def getJobResults(api, jobId): time.sleep(5) - client = AwsHelper().getClient('textract') - if(api == "StartDocumentTextDetection"): + client = AwsHelper().getClient("textract") + if api == "StartDocumentTextDetection": response = client.get_document_text_detection(JobId=jobId) else: response = client.get_document_analysis(JobId=jobId) pages.append(response) print("Resultset page recieved: {}".format(len(pages))) nextToken = None - if('NextToken' in response): - nextToken = response['NextToken'] + if "NextToken" in response: + nextToken = response["NextToken"] print("Next token: {}".format(nextToken)) - while(nextToken): + while nextToken: time.sleep(5) - if(api == "StartDocumentTextDetection"): - response = client.get_document_text_detection(JobId=jobId, NextToken=nextToken) + if api == "StartDocumentTextDetection": + response = client.get_document_text_detection( + JobId=jobId, NextToken=nextToken + ) else: response = client.get_document_analysis(JobId=jobId, NextToken=nextToken) pages.append(response) print("Resultset page recieved: {}".format(len(pages))) nextToken = None - if('NextToken' in response): - nextToken = response['NextToken'] + if "NextToken" in response: + nextToken = response["NextToken"] print("Next token: {}".format(nextToken)) return pages + def processRequest(request): output = "" print(request) - jobId = request['jobId'] - jobTag = request['jobTag'] - jobStatus = request['jobStatus'] - jobAPI = request['jobAPI'] - bucketName = request['bucketName'] - objectName = request['objectName'] + jobId = request["jobId"] + jobTag = request["jobTag"] + jobStatus = request["jobStatus"] + jobAPI = request["jobAPI"] + bucketName = request["bucketName"] + objectName = request["objectName"] outputTable = request["outputTable"] documentsTable = request["documentsTable"] @@ -65,14 +71,16 @@ def processRequest(request): detectForms = False detectTables = False - if(jobAPI == "StartDocumentAnalysis"): + if jobAPI == "StartDocumentAnalysis": detectForms = True detectTables = True - dynamodb = AwsHelper().getResource('dynamodb') + dynamodb = AwsHelper().getResource("dynamodb") ddb = dynamodb.Table(outputTable) - opg = OutputGenerator(jobTag, pages, bucketName, objectName, detectForms, detectTables, ddb) + opg = OutputGenerator( + jobTag, pages, bucketName, objectName, detectForms, detectTables, ddb + ) opg.run() print("DocumentId: {}".format(jobTag)) @@ -80,38 +88,39 @@ def processRequest(request): ds = datastore.DocumentStore(documentsTable, outputTable) ds.markDocumentComplete(jobTag) - output = "Processed -> Document: {}, Object: {}/{} processed.".format(jobTag, bucketName, objectName) + output = "Processed -> Document: {}, Object: {}/{} processed.".format( + jobTag, bucketName, objectName + ) print(output) - return { - 'statusCode': 200, - 'body': output - } + return {"statusCode": 200, "body": output} + def lambda_handler(event, context): print("event: {}".format(event)) - body = json.loads(event['Records'][0]['body']) - message = json.loads(body['Message']) + body = json.loads(event["Records"][0]["body"]) + message = json.loads(body["Message"]) print("Message: {}".format(message)) request = {} - request["jobId"] = message['JobId'] - request["jobTag"] = message['JobTag'] - request["jobStatus"] = message['Status'] - request["jobAPI"] = message['API'] - request["bucketName"] = message['DocumentLocation']['S3Bucket'] - request["objectName"] = message['DocumentLocation']['S3ObjectName'] - - request["outputTable"] = os.environ['OUTPUT_TABLE'] - request["documentsTable"] = os.environ['DOCUMENTS_TABLE'] + request["jobId"] = message["JobId"] + request["jobTag"] = message["JobTag"] + request["jobStatus"] = message["Status"] + request["jobAPI"] = message["API"] + request["bucketName"] = message["DocumentLocation"]["S3Bucket"] + request["objectName"] = message["DocumentLocation"]["S3ObjectName"] + + request["outputTable"] = os.environ["OUTPUT_TABLE"] + request["documentsTable"] = os.environ["DOCUMENTS_TABLE"] return processRequest(request) + def lambda_handler_local(event, context): print("event: {}".format(event)) return processRequest(event) diff --git a/src/og.py b/src/og.py index 7e3638f6..d7ce3296 100644 --- a/src/og.py +++ b/src/og.py @@ -1,10 +1,15 @@ import json + +import boto3 + from helper import FileHelper, S3Helper from trp import Document -import boto3 + class OutputGenerator: - def __init__(self, documentId, response, bucketName, objectName, forms, tables, ddb): + def __init__( + self, documentId, response, bucketName, objectName, forms, tables, ddb + ): self.documentId = documentId self.response = response self.bucketName = bucketName @@ -20,9 +25,9 @@ def __init__(self, documentId, response, bucketName, objectName, forms, tables, def saveItem(self, pk, sk, output): jsonItem = {} - jsonItem['documentId'] = pk - jsonItem['outputType'] = sk - jsonItem['outputPath'] = output + jsonItem["documentId"] = pk + jsonItem["outputType"] = sk + jsonItem["outputPath"] = output self.ddb.put_item(Item=jsonItem) @@ -40,17 +45,17 @@ def _outputText(self, page, p): def _outputForm(self, page, p): csvData = [] for field in page.form.fields: - csvItem = [] - if(field.key): + csvItem = [] + if field.key: csvItem.append(field.key.text) else: csvItem.append("") - if(field.value): + if field.value: csvItem.append(field.value.text) else: csvItem.append("") csvData.append(csvItem) - csvFieldNames = ['Key', 'Value'] + csvFieldNames = ["Key", "Value"] opath = "{}page-{}-forms.csv".format(self.outputPath, p) S3Helper.writeCSV(csvFieldNames, csvData, self.bucketName, opath) self.saveItem(self.documentId, "page-{}-Forms".format(p), opath) @@ -63,7 +68,7 @@ def _outputTable(self, page, p): csvRow.append("Table") csvData.append(csvRow) for row in table.rows: - csvRow = [] + csvRow = [] for cell in row.cells: csvRow.append(cell.text) csvData.append(csvRow) @@ -76,12 +81,12 @@ def _outputTable(self, page, p): def run(self): - if(not self.document.pages): + if not self.document.pages: return opath = "{}response.json".format(self.outputPath) S3Helper.writeToS3(json.dumps(self.response), self.bucketName, opath) - self.saveItem(self.documentId, 'Response', opath) + self.saveItem(self.documentId, "Response", opath) print("Total Pages in Document: {}".format(len(self.document.pages))) @@ -98,10 +103,10 @@ def run(self): docText = docText + page.text + "\n" - if(self.forms): + if self.forms: self._outputForm(page, p) - if(self.tables): + if self.tables: self._outputTable(page, p) - p = p + 1 \ No newline at end of file + p = p + 1 diff --git a/src/s3batchproc.py b/src/s3batchproc.py index 8c95703a..daf4749b 100644 --- a/src/s3batchproc.py +++ b/src/s3batchproc.py @@ -1,10 +1,12 @@ import json import os -import uuid import urllib +import uuid + import datastore from helper import FileHelper + def processRequest(request): output = "" @@ -17,37 +19,44 @@ def processRequest(request): outputTable = request["outputTable"] jobId = request["jobId"] - invocationId = request['invocationId'] - invocationSchemaVersion = request['invocationSchemaVersion'] - taskId = request['taskId'] + invocationId = request["invocationId"] + invocationSchemaVersion = request["invocationSchemaVersion"] + taskId = request["taskId"] print("Input Object: {}/{}".format(bucketName, objectName)) ext = FileHelper.getFileExtenstion(objectName.lower()) print("Extension: {}".format(ext)) - if(ext and ext in ["jpg", "jpeg", "png", "pdf"]): + if ext and ext in ["jpg", "jpeg", "png", "pdf"]: documentId = str(uuid.uuid1()) ds = datastore.DocumentStore(documentsTable, outputTable) ds.createDocument(documentId, bucketName, objectName) - output = "Saved document {} for {}/{}".format(documentId, bucketName, objectName) + output = "Saved document {} for {}/{}".format( + documentId, bucketName, objectName + ) print(output) - results = [{ - 'taskId': taskId, - 'resultCode': 'Succeeded', - 'resultString': "Document submitted for processing with Id: {}".format(documentId) - }] - + results = [ + { + "taskId": taskId, + "resultCode": "Succeeded", + "resultString": "Document submitted for processing with Id: {}".format( + documentId + ), + } + ] + return { - 'invocationSchemaVersion': invocationSchemaVersion, - 'treatMissingKeysAs': 'PermanentFailure', - 'invocationId': invocationId, - 'results': results + "invocationSchemaVersion": invocationSchemaVersion, + "treatMissingKeysAs": "PermanentFailure", + "invocationId": invocationId, + "results": results, } + def lambda_handler(event, context): print("event: {}".format(event)) @@ -55,19 +64,19 @@ def lambda_handler(event, context): request = {} # Parse job parameters - request["jobId"] = event['job']['id'] - request["invocationId"] = event['invocationId'] - request["invocationSchemaVersion"] = event['invocationSchemaVersion'] + request["jobId"] = event["job"]["id"] + request["invocationId"] = event["invocationId"] + request["invocationSchemaVersion"] = event["invocationSchemaVersion"] # Task - request["task"] = event['tasks'][0] - request["taskId"] = event['tasks'][0]['taskId'] - request["objectName"] = urllib.parse.unquote_plus(event['tasks'][0]['s3Key']) - request["s3VersionId"] = event['tasks'][0]['s3VersionId'] - request["s3BucketArn"] = event['tasks'][0]['s3BucketArn'] - request["bucketName"] = request["s3BucketArn"].split(':')[-1] - - request["documentsTable"] = os.environ['DOCUMENTS_TABLE'] - request["outputTable"] = os.environ['OUTPUT_TABLE'] + request["task"] = event["tasks"][0] + request["taskId"] = event["tasks"][0]["taskId"] + request["objectName"] = urllib.parse.unquote_plus(event["tasks"][0]["s3Key"]) + request["s3VersionId"] = event["tasks"][0]["s3VersionId"] + request["s3BucketArn"] = event["tasks"][0]["s3BucketArn"] + request["bucketName"] = request["s3BucketArn"].split(":")[-1] + + request["documentsTable"] = os.environ["DOCUMENTS_TABLE"] + request["outputTable"] = os.environ["OUTPUT_TABLE"] return processRequest(request) diff --git a/src/s3proc.py b/src/s3proc.py index 06dc812a..aa220f8a 100644 --- a/src/s3proc.py +++ b/src/s3proc.py @@ -1,10 +1,12 @@ import json import os -import uuid import urllib +import uuid + import datastore from helper import FileHelper + def processRequest(request): output = "" @@ -21,28 +23,30 @@ def processRequest(request): ext = FileHelper.getFileExtenstion(objectName.lower()) print("Extension: {}".format(ext)) - if(ext and ext in ["jpg", "jpeg", "png", "pdf"]): + if ext and ext in ["jpg", "jpeg", "png", "pdf"]: documentId = str(uuid.uuid1()) ds = datastore.DocumentStore(documentsTable, outputTable) ds.createDocument(documentId, bucketName, objectName) - output = "Saved document {} for {}/{}".format(documentId, bucketName, objectName) + output = "Saved document {} for {}/{}".format( + documentId, bucketName, objectName + ) print(output) - return { - 'statusCode': 200, - 'body': json.dumps(output) - } + return {"statusCode": 200, "body": json.dumps(output)} + def lambda_handler(event, context): print("event: {}".format(event)) request = {} - request["bucketName"] = event['Records'][0]['s3']['bucket']['name'] - request["objectName"] = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key']) - request["documentsTable"] = os.environ['DOCUMENTS_TABLE'] - request["outputTable"] = os.environ['OUTPUT_TABLE'] + request["bucketName"] = event["Records"][0]["s3"]["bucket"]["name"] + request["objectName"] = urllib.parse.unquote_plus( + event["Records"][0]["s3"]["object"]["key"] + ) + request["documentsTable"] = os.environ["DOCUMENTS_TABLE"] + request["outputTable"] = os.environ["OUTPUT_TABLE"] return processRequest(request) diff --git a/src/syncproc.py b/src/syncproc.py index 5756d765..c7f648f4 100644 --- a/src/syncproc.py +++ b/src/syncproc.py @@ -1,56 +1,55 @@ -import boto3 -from decimal import Decimal import json import os -from helper import AwsHelper, S3Helper, DynamoDBHelper -from og import OutputGenerator +from decimal import Decimal + +import boto3 + import datastore +from helper import AwsHelper, DynamoDBHelper, S3Helper +from og import OutputGenerator + def callTextract(bucketName, objectName, detectText, detectForms, detectTables): - textract = AwsHelper().getClient('textract') - if(not detectForms and not detectTables): + textract = AwsHelper().getClient("textract") + if not detectForms and not detectTables: response = textract.detect_document_text( - Document={ - 'S3Object': { - 'Bucket': bucketName, - 'Name': objectName - } - } + Document={"S3Object": {"Bucket": bucketName, "Name": objectName}} ) else: - features = [] - if(detectTables): + features = [] + if detectTables: features.append("TABLES") - if(detectForms): + if detectForms: features.append("FORMS") - + response = textract.analyze_document( - Document={ - 'S3Object': { - 'Bucket': bucketName, - 'Name': objectName - } - }, - FeatureTypes=features + Document={"S3Object": {"Bucket": bucketName, "Name": objectName}}, + FeatureTypes=features, ) return response -def processImage(documentId, features, bucketName, objectName, outputTableName, documentsTableName): +def processImage( + documentId, features, bucketName, objectName, outputTableName, documentsTableName +): detectText = "Text" in features detectForms = "Forms" in features detectTables = "Tables" in features - response = callTextract(bucketName, objectName, detectText, detectForms, detectTables) + response = callTextract( + bucketName, objectName, detectText, detectForms, detectTables + ) dynamodb = AwsHelper().getResource("dynamodb") ddb = dynamodb.Table(outputTableName) print("Generating output for DocumentId: {}".format(documentId)) - opg = OutputGenerator(documentId, response, bucketName, objectName, detectForms, detectTables, ddb) + opg = OutputGenerator( + documentId, response, bucketName, objectName, detectForms, detectTables, ddb + ) opg.run() print("DocumentId: {}".format(documentId)) @@ -58,47 +57,55 @@ def processImage(documentId, features, bucketName, objectName, outputTableName, ds = datastore.DocumentStore(documentsTableName, outputTableName) ds.markDocumentComplete(documentId) + # --------------- Main handler ------------------ + def processRequest(request): output = "" print("request: {}".format(request)) - bucketName = request['bucketName'] - objectName = request['objectName'] - features = request['features'] - documentId = request['documentId'] - outputTable = request['outputTable'] - documentsTable = request['documentsTable'] + bucketName = request["bucketName"] + objectName = request["objectName"] + features = request["features"] + documentId = request["documentId"] + outputTable = request["outputTable"] + documentsTable = request["documentsTable"] documentsTable = request["documentsTable"] - - if(documentId and bucketName and objectName and features): - print("DocumentId: {}, features: {}, Object: {}/{}".format(documentId, features, bucketName, objectName)) - processImage(documentId, features, bucketName, objectName, outputTable, documentsTable) + if documentId and bucketName and objectName and features: + print( + "DocumentId: {}, features: {}, Object: {}/{}".format( + documentId, features, bucketName, objectName + ) + ) - output = "Document: {}, features: {}, Object: {}/{} processed.".format(documentId, features, bucketName, objectName) + processImage( + documentId, features, bucketName, objectName, outputTable, documentsTable + ) + + output = "Document: {}, features: {}, Object: {}/{} processed.".format( + documentId, features, bucketName, objectName + ) print(output) - return { - 'statusCode': 200, - 'body': output - } + return {"statusCode": 200, "body": output} + def lambda_handler(event, context): print("event: {}".format(event)) - message = json.loads(event['Records'][0]['body']) + message = json.loads(event["Records"][0]["body"]) print("Message: {}".format(message)) request = {} - request["documentId"] = message['documentId'] - request["bucketName"] = message['bucketName'] - request["objectName"] = message['objectName'] - request["features"] = message['features'] - request["outputTable"] = os.environ['OUTPUT_TABLE'] - request["documentsTable"] = os.environ['DOCUMENTS_TABLE'] - - return processRequest(request) \ No newline at end of file + request["documentId"] = message["documentId"] + request["bucketName"] = message["bucketName"] + request["objectName"] = message["objectName"] + request["features"] = message["features"] + request["outputTable"] = os.environ["OUTPUT_TABLE"] + request["documentsTable"] = os.environ["DOCUMENTS_TABLE"] + + return processRequest(request) diff --git a/src/test.py b/src/test.py index 647e1ac3..44500201 100644 --- a/src/test.py +++ b/src/test.py @@ -1,14 +1,15 @@ +import json import os +import uuid + +import asyncproc +import datastore +import docproc import events +import helper +import jobresultsproc import s3proc -import docproc import syncproc -import asyncproc -import jobresultsproc -import helper -import uuid -import json -import datastore # Update variables below according to your infrastructure # You only need this if you want to test lambda code locally @@ -26,120 +27,133 @@ s3Pdf = "pdfdoc.pdf" s3LargePdf = "pdfdoc.pdf" + def clearEnvironment(): - os.environ['SYNC_QUEUE_URL'] = "" - os.environ['ASYNC_QUEUE_URL'] = "" - os.environ['DOCUMENTS_TABLE'] = "" - os.environ['OUTPUT_TABLE'] = "" - os.environ['SNS_TOPIC_ARN'] = "" - os.environ['SNS_ROLE_ARN'] = "" + os.environ["SYNC_QUEUE_URL"] = "" + os.environ["ASYNC_QUEUE_URL"] = "" + os.environ["DOCUMENTS_TABLE"] = "" + os.environ["OUTPUT_TABLE"] = "" + os.environ["SNS_TOPIC_ARN"] = "" + os.environ["SNS_ROLE_ARN"] = "" + def createImageDocument(documentCount=1): - + event = events.s3Event(bucketName, s3Image) - + clearEnvironment() - os.environ['DOCUMENTS_TABLE'] = documentsTableName - os.environ['OUTPUT_TABLE'] = outputTableName + os.environ["DOCUMENTS_TABLE"] = documentsTableName + os.environ["OUTPUT_TABLE"] = outputTableName i = 0 - while(i < documentCount): + while i < documentCount: s3proc.lambda_handler(event, None) i += 1 -def processImageDocument(documentId=str(uuid.uuid1()), documentCount = 1): - + +def processImageDocument(documentId=str(uuid.uuid1()), documentCount=1): + clearEnvironment() - os.environ['SYNC_QUEUE_URL'] = syncQueueUrl - os.environ['ASYNC_QUEUE_URL'] = asyncQueueUrl + os.environ["SYNC_QUEUE_URL"] = syncQueueUrl + os.environ["ASYNC_QUEUE_URL"] = asyncQueueUrl i = 0 - while(i < documentCount): + while i < documentCount: event = events.documentEvent(documentId, bucketName, s3Image) docproc.lambda_handler(event, None) i += 1 + def createPdfDocument(documentCount=1): - + event = events.s3Event(bucketName, s3Pdf) - + clearEnvironment() - os.environ['DOCUMENTS_TABLE'] = documentsTableName - os.environ['OUTPUT_TABLE'] = outputTableName + os.environ["DOCUMENTS_TABLE"] = documentsTableName + os.environ["OUTPUT_TABLE"] = outputTableName i = 0 - while(i < documentCount): + while i < documentCount: s3proc.lambda_handler(event, None) i += 1 -def processPdfDocument(documentId=str(uuid.uuid1()), documentCount = 1): - + +def processPdfDocument(documentId=str(uuid.uuid1()), documentCount=1): + clearEnvironment() - os.environ['SYNC_QUEUE_URL'] = syncQueueUrl - os.environ['ASYNC_QUEUE_URL'] = asyncQueueUrl + os.environ["SYNC_QUEUE_URL"] = syncQueueUrl + os.environ["ASYNC_QUEUE_URL"] = asyncQueueUrl i = 0 - while(i < documentCount): + while i < documentCount: event = events.documentEvent(documentId, bucketName, s3Pdf) docproc.lambda_handler(event, None) i += 1 + def processSyncJob(documentId="e5ea2b4a-7162-11e9-958a-c4b301c10017"): event = events.syncQueueDocument(documentId, bucketName, s3Image) clearEnvironment() - os.environ['OUTPUT_TABLE'] = outputTableName - os.environ['DOCUMENTS_TABLE'] = documentsTableName + os.environ["OUTPUT_TABLE"] = outputTableName + os.environ["DOCUMENTS_TABLE"] = documentsTableName syncproc.lambda_handler(event, None) + def processAsyncJobs(): event = {} clearEnvironment() - os.environ['SNS_TOPIC_ARN'] = snsTopic - os.environ['SNS_ROLE_ARN'] = snsRole - os.environ['ASYNC_QUEUE_URL'] = asyncQueueUrl + os.environ["SNS_TOPIC_ARN"] = snsTopic + os.environ["SNS_ROLE_ARN"] = snsRole + os.environ["ASYNC_QUEUE_URL"] = asyncQueueUrl asyncproc.lambda_handler(event, None) + def processJobResults(): - - event = events.jobResultsEvent("2e8462d30cb50e66e67d2709b3cce90f01118594016c0df328534185000ae32f", - "12917fdc-6357-11e9-b05d-42237b865595", - "SUCCESS", - "['Text', 'FORMS', 'TABLES']", - bucketName, s3Pdf) - clearEnvironment() - os.environ['OUTPUT_TABLE'] = outputTableName - os.environ['DOCUMENTS_TABLE'] = documentsTableName + event = events.jobResultsEvent( + "2e8462d30cb50e66e67d2709b3cce90f01118594016c0df328534185000ae32f", + "12917fdc-6357-11e9-b05d-42237b865595", + "SUCCESS", + "['Text', 'FORMS', 'TABLES']", + bucketName, + s3Pdf, + ) + + clearEnvironment() + os.environ["OUTPUT_TABLE"] = outputTableName + os.environ["DOCUMENTS_TABLE"] = documentsTableName jobresultsproc.lambda_handler(event, None) + def dataStore_getDocuments(): - - #Document - print("*******************") - dstore = datastore.DocumentStore(documentsTableName, outputTableName) - docs = dstore.getDocuments() + + # Document + print("*******************") + dstore = datastore.DocumentStore(documentsTableName, outputTableName) + docs = dstore.getDocuments() + print(docs) + print("------------") + while "nextToken" in docs: + print(docs["nextToken"]) + docs = dstore.getDocuments(docs["nextToken"]) print(docs) - print("------------") - while("nextToken" in docs): - print(docs["nextToken"]) - docs = dstore.getDocuments(docs["nextToken"]) - print(docs) - print("------------") - -#Sync Pipeline -#createImageDocument() -#processImageDocument("822927b4-7798-11e9-8495-4a0007597ab0") -#processSyncJob("822927b4-7798-11e9-8495-4a0007597ab0") - -#Async Pipeline -#createPdfDocument(1) -#processPdfDocument() -#processAsyncJobs() -#processJobResults() + print("------------") + + +# Sync Pipeline +# createImageDocument() +# processImageDocument("822927b4-7798-11e9-8495-4a0007597ab0") +# processSyncJob("822927b4-7798-11e9-8495-4a0007597ab0") + +# Async Pipeline +# createPdfDocument(1) +# processPdfDocument() +# processAsyncJobs() +# processJobResults() diff --git a/src/trp.py b/src/trp.py index 2f707ce6..693e94aa 100644 --- a/src/trp.py +++ b/src/trp.py @@ -1,5 +1,6 @@ import json + class BoundingBox: def __init__(self, width, height, left, top): self._width = width @@ -8,7 +9,9 @@ def __init__(self, width, height, left, top): self._top = top def __str__(self): - return "width: {}, height: {}, left: {}, top: {}".format(self._width, self._height, self._left, self._top) + return "width: {}, height: {}, left: {}, top: {}".format( + self._width, self._height, self._left, self._top + ) @property def width(self): @@ -26,6 +29,7 @@ def left(self): def top(self): return self._top + class Polygon: def __init__(self, x, y): self._x = x @@ -42,11 +46,17 @@ def x(self): def y(self): return self._y + class Geometry: def __init__(self, geometry): boundingBox = geometry["BoundingBox"] polygon = geometry["Polygon"] - bb = BoundingBox(boundingBox["Width"], boundingBox["Height"], boundingBox["Left"], boundingBox["Top"]) + bb = BoundingBox( + boundingBox["Width"], + boundingBox["Height"], + boundingBox["Left"], + boundingBox["Top"], + ) pgs = [] for pg in polygon: pgs.append(Polygon(pg["X"], pg["Y"])) @@ -66,15 +76,16 @@ def boundingBox(self): def polygon(self): return self._polygon + class Word: def __init__(self, block, blockMap): self._block = block - self._confidence = block['Confidence'] - self._geometry = Geometry(block['Geometry']) - self._id = block['Id'] + self._confidence = block["Confidence"] + self._geometry = Geometry(block["Geometry"]) + self._id = block["Id"] self._text = "" - if(block['Text']): - self._text = block['Text'] + if block["Text"]: + self._text = block["Text"] def __str__(self): return self._text @@ -99,25 +110,27 @@ def text(self): def block(self): return self._block + class Line: def __init__(self, block, blockMap): self._block = block - self._confidence = block['Confidence'] - self._geometry = Geometry(block['Geometry']) - self._id = block['Id'] + self._confidence = block["Confidence"] + self._geometry = Geometry(block["Geometry"]) + self._id = block["Id"] self._text = "" - if(block['Text']): - self._text = block['Text'] + if block["Text"]: + self._text = block["Text"] self._words = [] - if('Relationships' in block and block['Relationships']): - for rs in block['Relationships']: - if(rs['Type'] == 'CHILD'): - for cid in rs['Ids']: - if(blockMap[cid]["BlockType"] == "WORD"): + if "Relationships" in block and block["Relationships"]: + for rs in block["Relationships"]: + if rs["Type"] == "CHILD": + for cid in rs["Ids"]: + if blockMap[cid]["BlockType"] == "WORD": self._words.append(Word(blockMap[cid], blockMap)) + def __str__(self): s = "Line\n==========\n" s = s + self._text + "\n" @@ -150,12 +163,13 @@ def text(self): def block(self): return self._block + class SelectionElement: def __init__(self, block, blockMap): - self._confidence = block['Confidence'] - self._geometry = Geometry(block['Geometry']) - self._id = block['Id'] - self._selectionStatus = block['SelectionStatus'] + self._confidence = block["Confidence"] + self._geometry = Geometry(block["Geometry"]) + self._id = block["Id"] + self._selectionStatus = block["SelectionStatus"] @property def confidence(self): @@ -173,12 +187,13 @@ def id(self): def selectionStatus(self): return self._selectionStatus + class FieldKey: def __init__(self, block, children, blockMap): self._block = block - self._confidence = block['Confidence'] - self._geometry = Geometry(block['Geometry']) - self._id = block['Id'] + self._confidence = block["Confidence"] + self._geometry = Geometry(block["Geometry"]) + self._id = block["Id"] self._text = "" self._content = [] @@ -186,13 +201,13 @@ def __init__(self, block, children, blockMap): for eid in children: wb = blockMap[eid] - if(wb['BlockType'] == "WORD"): + if wb["BlockType"] == "WORD": w = Word(wb, blockMap) self._content.append(w) t.append(w.text) - if(t): - self._text = ' '.join(t) + if t: + self._text = " ".join(t) def __str__(self): return self._text @@ -221,12 +236,13 @@ def text(self): def block(self): return self._block + class FieldValue: def __init__(self, block, children, blockMap): self._block = block - self._confidence = block['Confidence'] - self._geometry = Geometry(block['Geometry']) - self._id = block['Id'] + self._confidence = block["Confidence"] + self._geometry = Geometry(block["Geometry"]) + self._id = block["Id"] self._text = "" self._content = [] @@ -234,17 +250,17 @@ def __init__(self, block, children, blockMap): for eid in children: wb = blockMap[eid] - if(wb['BlockType'] == "WORD"): + if wb["BlockType"] == "WORD": w = Word(wb, blockMap) self._content.append(w) t.append(w.text) - elif(wb['BlockType'] == "SELECTION_ELEMENT"): + elif wb["BlockType"] == "SELECTION_ELEMENT": se = SelectionElement(wb, blockMap) self._content.append(se) self._text = se.selectionStatus - if(t): - self._text = ' '.join(t) + if t: + self._text = " ".join(t) def __str__(self): return self._text @@ -268,34 +284,38 @@ def content(self): @property def text(self): return self._text - + @property def block(self): return self._block + class Field: def __init__(self, block, blockMap): self._key = None self._value = None - for item in block['Relationships']: - if(item["Type"] == "CHILD"): - self._key = FieldKey(block, item['Ids'], blockMap) - elif(item["Type"] == "VALUE"): - for eid in item['Ids']: + for item in block["Relationships"]: + if item["Type"] == "CHILD": + self._key = FieldKey(block, item["Ids"], blockMap) + elif item["Type"] == "VALUE": + for eid in item["Ids"]: vkvs = blockMap[eid] - if 'VALUE' in vkvs['EntityTypes']: - if('Relationships' in vkvs): - for vitem in vkvs['Relationships']: - if(vitem["Type"] == "CHILD"): - self._value = FieldValue(vkvs, vitem['Ids'], blockMap) + if "VALUE" in vkvs["EntityTypes"]: + if "Relationships" in vkvs: + for vitem in vkvs["Relationships"]: + if vitem["Type"] == "CHILD": + self._value = FieldValue( + vkvs, vitem["Ids"], blockMap + ) + def __str__(self): s = "\nField\n==========\n" k = "" v = "" - if(self._key): + if self._key: k = str(self._key) - if(self._value): + if self._value: v = str(self._value) s = s + "Key: {}\nValue: {}".format(k, v) return s @@ -308,6 +328,7 @@ def key(self): def value(self): return self._value + class Form: def __init__(self): self._fields = [] @@ -329,44 +350,44 @@ def fields(self): def getFieldByKey(self, key): field = None - if(key in self._fieldsMap): + if key in self._fieldsMap: field = self._fieldsMap[key] return field - + def searchFieldsByKey(self, key): searchKey = key.lower() results = [] for field in self._fields: - if(field.key and searchKey in field.key.text.lower()): + if field.key and searchKey in field.key.text.lower(): results.append(field) return results -class Cell: +class Cell: def __init__(self, block, blockMap): self._block = block - self._confidence = block['Confidence'] - self._rowIndex = block['RowIndex'] - self._columnIndex = block['ColumnIndex'] - self._rowSpan = block['RowSpan'] - self._columnSpan = block['ColumnSpan'] - self._geometry = Geometry(block['Geometry']) - self._id = block['Id'] + self._confidence = block["Confidence"] + self._rowIndex = block["RowIndex"] + self._columnIndex = block["ColumnIndex"] + self._rowSpan = block["RowSpan"] + self._columnSpan = block["ColumnSpan"] + self._geometry = Geometry(block["Geometry"]) + self._id = block["Id"] self._content = [] self._text = "" - if('Relationships' in block and block['Relationships']): - for rs in block['Relationships']: - if(rs['Type'] == 'CHILD'): - for cid in rs['Ids']: + if "Relationships" in block and block["Relationships"]: + for rs in block["Relationships"]: + if rs["Type"] == "CHILD": + for cid in rs["Ids"]: blockType = blockMap[cid]["BlockType"] - if(blockType == "WORD"): + if blockType == "WORD": w = Word(blockMap[cid], blockMap) self._content.append(w) - self._text = self._text + w.text + ' ' - elif(blockType == "SELECTION_ELEMENT"): + self._text = self._text + w.text + " " + elif blockType == "SELECTION_ELEMENT": se = SelectionElement(blockMap[cid], blockMap) self._content.append(se) - self._text = self._text + se.selectionStatus + ', ' + self._text = self._text + se.selectionStatus + ", " def __str__(self): return self._text @@ -411,6 +432,7 @@ def text(self): def block(self): return self._block + class Row: def __init__(self): self._cells = [] @@ -425,32 +447,32 @@ def __str__(self): def cells(self): return self._cells -class Table: +class Table: def __init__(self, block, blockMap): self._block = block - self._confidence = block['Confidence'] - self._geometry = Geometry(block['Geometry']) + self._confidence = block["Confidence"] + self._geometry = Geometry(block["Geometry"]) - self._id = block['Id'] + self._id = block["Id"] self._rows = [] ri = 1 row = Row() cell = None - if('Relationships' in block and block['Relationships']): - for rs in block['Relationships']: - if(rs['Type'] == 'CHILD'): - for cid in rs['Ids']: + if "Relationships" in block and block["Relationships"]: + for rs in block["Relationships"]: + if rs["Type"] == "CHILD": + for cid in rs["Ids"]: cell = Cell(blockMap[cid], blockMap) - if(cell.rowIndex > ri): + if cell.rowIndex > ri: self._rows.append(row) row = Row() ri = cell.rowIndex row.cells.append(cell) - if(row and row.cells): + if row and row.cells: self._rows.append(row) def __str__(self): @@ -480,8 +502,8 @@ def rows(self): def block(self): return self._block -class Page: +class Page: def __init__(self, blocks, blockMap): self._blocks = blocks self._text = "" @@ -501,25 +523,27 @@ def __str__(self): def _parse(self, blockMap): for item in self._blocks: if item["BlockType"] == "PAGE": - self._geometry = Geometry(item['Geometry']) - self._id = item['Id'] + self._geometry = Geometry(item["Geometry"]) + self._id = item["Id"] elif item["BlockType"] == "LINE": l = Line(item, blockMap) self._lines.append(l) self._content.append(l) - self._text = self._text + l.text + '\n' + self._text = self._text + l.text + "\n" elif item["BlockType"] == "TABLE": t = Table(item, blockMap) self._tables.append(t) self._content.append(t) elif item["BlockType"] == "KEY_VALUE_SET": - if 'KEY' in item['EntityTypes']: + if "KEY" in item["EntityTypes"]: f = Field(item, blockMap) - if(f.key): + if f.key: self._form.addField(f) self._content.append(f) else: - print("WARNING: Detected K/V where key does not have content. Excluding key from output.") + print( + "WARNING: Detected K/V where key does not have content. Excluding key from output." + ) print(f) print(item) @@ -527,20 +551,32 @@ def getLinesInReadingOrder(self): columns = [] lines = [] for item in self._lines: - column_found=False - for index, column in enumerate(columns): - bbox_left = item.geometry.boundingBox.left - bbox_right = item.geometry.boundingBox.left + item.geometry.boundingBox.width - bbox_centre = item.geometry.boundingBox.left + item.geometry.boundingBox.width/2 - column_centre = column['left'] + column['right']/2 - if (bbox_centre > column['left'] and bbox_centre < column['right']) or (column_centre > bbox_left and column_centre < bbox_right): - #Bbox appears inside the column - lines.append([index, item.text]) - column_found=True - break - if not column_found: - columns.append({'left':item.geometry.boundingBox.left, 'right':item.geometry.boundingBox.left + item.geometry.boundingBox.width}) - lines.append([len(columns)-1, item.text]) + column_found = False + for index, column in enumerate(columns): + bbox_left = item.geometry.boundingBox.left + bbox_right = ( + item.geometry.boundingBox.left + item.geometry.boundingBox.width + ) + bbox_centre = ( + item.geometry.boundingBox.left + item.geometry.boundingBox.width / 2 + ) + column_centre = column["left"] + column["right"] / 2 + if (bbox_centre > column["left"] and bbox_centre < column["right"]) or ( + column_centre > bbox_left and column_centre < bbox_right + ): + # Bbox appears inside the column + lines.append([index, item.text]) + column_found = True + break + if not column_found: + columns.append( + { + "left": item.geometry.boundingBox.left, + "right": item.geometry.boundingBox.left + + item.geometry.boundingBox.width, + } + ) + lines.append([len(columns) - 1, item.text]) lines.sort(key=lambda x: x[0]) return lines @@ -549,7 +585,7 @@ def getTextInReadingOrder(self): lines = self.getLinesInReadingOrder() text = "" for line in lines: - text = text + line[1] + '\n' + text = text + line[1] + "\n" return text @property @@ -584,11 +620,11 @@ def geometry(self): def id(self): return self._id -class Document: +class Document: def __init__(self, responsePages): - if(not isinstance(responsePages, list)): + if not isinstance(responsePages, list): rps = [] rps.append(responsePages) responsePages = rps @@ -611,24 +647,27 @@ def _parseDocumentPagesAndBlockMap(self): documentPages = [] documentPage = None for page in self._responsePages: - for block in page['Blocks']: - if('BlockType' in block and 'Id' in block): - blockMap[block['Id']] = block + for block in page["Blocks"]: + if "BlockType" in block and "Id" in block: + blockMap[block["Id"]] = block - if(block['BlockType'] == 'PAGE'): - if(documentPage): - documentPages.append({"Blocks" : documentPage}) + if block["BlockType"] == "PAGE": + if documentPage: + documentPages.append({"Blocks": documentPage}) documentPage = [] documentPage.append(block) else: documentPage.append(block) - if(documentPage): - documentPages.append({"Blocks" : documentPage}) + if documentPage: + documentPages.append({"Blocks": documentPage}) return documentPages, blockMap def _parse(self): - self._responseDocumentPages, self._blockMap = self._parseDocumentPagesAndBlockMap() + ( + self._responseDocumentPages, + self._blockMap, + ) = self._parseDocumentPagesAndBlockMap() for documentPage in self._responseDocumentPages: page = Page(documentPage["Blocks"], self._blockMap) self._pages.append(page) @@ -647,7 +686,6 @@ def pages(self): def getBlockById(self, blockId): block = None - if(self._blockMap and blockId in self._blockMap): + if self._blockMap and blockId in self._blockMap: block = self._blockMap[blockId] return block - diff --git a/textract-pipeline/lambda/asyncprocessor/lambda_function.py b/textract-pipeline/lambda/asyncprocessor/lambda_function.py index 623d9c61..f4173e39 100644 --- a/textract-pipeline/lambda/asyncprocessor/lambda_function.py +++ b/textract-pipeline/lambda/asyncprocessor/lambda_function.py @@ -1,113 +1,116 @@ import json -import boto3 import os -from helper import AwsHelper import time -def startJob(bucketName, objectName, documentId, snsTopic, snsRole, detectForms, detectTables): +import boto3 + +from helper import AwsHelper - print("Starting job with documentId: {}, bucketName: {}, objectName: {}".format(documentId, bucketName, objectName)) + +def startJob( + bucketName, objectName, documentId, snsTopic, snsRole, detectForms, detectTables +): + + print( + "Starting job with documentId: {}, bucketName: {}, objectName: {}".format( + documentId, bucketName, objectName + ) + ) response = None - client = AwsHelper().getClient('textract') - if(not detectForms and not detectTables): + client = AwsHelper().getClient("textract") + if not detectForms and not detectTables: response = client.start_document_text_detection( - ClientRequestToken = documentId, - DocumentLocation={ - 'S3Object': { - 'Bucket': bucketName, - 'Name': objectName - } - }, - NotificationChannel= { - "RoleArn": snsRole, - "SNSTopicArn": snsTopic - }, - JobTag = documentId) + ClientRequestToken=documentId, + DocumentLocation={"S3Object": {"Bucket": bucketName, "Name": objectName}}, + NotificationChannel={"RoleArn": snsRole, "SNSTopicArn": snsTopic}, + JobTag=documentId, + ) else: - features = [] - if(detectTables): + features = [] + if detectTables: features.append("TABLES") - if(detectForms): + if detectForms: features.append("FORMS") response = client.start_document_analysis( - ClientRequestToken = documentId, - DocumentLocation={ - 'S3Object': { - 'Bucket': bucketName, - 'Name': objectName - } - }, + ClientRequestToken=documentId, + DocumentLocation={"S3Object": {"Bucket": bucketName, "Name": objectName}}, FeatureTypes=features, - NotificationChannel= { - "RoleArn": snsRole, - "SNSTopicArn": snsTopic - }, - JobTag = documentId) + NotificationChannel={"RoleArn": snsRole, "SNSTopicArn": snsTopic}, + JobTag=documentId, + ) return response["JobId"] def processItem(message, snsTopic, snsRole): - print('message:') + print("message:") print(message) - messageBody = json.loads(message['Body']) + messageBody = json.loads(message["Body"]) - bucketName = messageBody['bucketName'] - objectName = messageBody['objectName'] - documentId = messageBody['documentId'] - features = messageBody['features'] + bucketName = messageBody["bucketName"] + objectName = messageBody["objectName"] + documentId = messageBody["documentId"] + features = messageBody["features"] - print('Bucket Name: ' + bucketName) - print('Object Name: ' + objectName) - print('Task ID: ' + documentId) + print("Bucket Name: " + bucketName) + print("Object Name: " + objectName) + print("Task ID: " + documentId) print("API: {}".format(features)) - print('starting Textract job...') + print("starting Textract job...") - detectForms = 'Forms' in features - detectTables = 'Tables' in features + detectForms = "Forms" in features + detectTables = "Tables" in features - jobId = startJob(bucketName, objectName, documentId, snsTopic, snsRole, detectForms, detectTables) + jobId = startJob( + bucketName, objectName, documentId, snsTopic, snsRole, detectForms, detectTables + ) - if(jobId): + if jobId: print("Started Job with Id: {}".format(jobId)) return jobId + def changeVisibility(sqs, qUrl, receipt_handle): try: sqs.change_message_visibility( - QueueUrl=qUrl, - ReceiptHandle=receipt_handle, - VisibilityTimeout=0 - ) + QueueUrl=qUrl, ReceiptHandle=receipt_handle, VisibilityTimeout=0 + ) except Exception as e: - print("Failed to change visibility for {} with error: {}".format(receipt_handle, e)) + print( + "Failed to change visibility for {} with error: {}".format( + receipt_handle, e + ) + ) + -def getMessagesFromQueue(sqs, qUrl,): +def getMessagesFromQueue( + sqs, + qUrl, +): # Receive message from SQS queue response = sqs.receive_message( - QueueUrl=qUrl, - MaxNumberOfMessages=1, - VisibilityTimeout=60 #14400 + QueueUrl=qUrl, MaxNumberOfMessages=1, VisibilityTimeout=60 # 14400 ) - print('SQS Response Recieved:') + print("SQS Response Recieved:") print(response) - if('Messages' in response): - return response['Messages'] + if "Messages" in response: + return response["Messages"] else: print("No messages in queue.") return None + def processItems(qUrl, snsTopic, snsRole): - sqs = AwsHelper().getClient('sqs') + sqs = AwsHelper().getClient("sqs") messages = getMessagesFromQueue(sqs, qUrl) jc = 0 @@ -115,48 +118,47 @@ def processItems(qUrl, snsTopic, snsRole): hitLimit = False limitException = None - if(messages): - + if messages: totalMessages = len(messages) print("Total messages: {}".format(totalMessages)) for message in messages: - receipt_handle = message['ReceiptHandle'] + receipt_handle = message["ReceiptHandle"] try: - if(hitLimit): + if hitLimit: changeVisibility(sqs, qUrl, receipt_handle) else: print("starting job...") processItem(message, snsTopic, snsRole) print("started job...") - print('Deleting item from queue...') + print("Deleting item from queue...") # Delete received message from queue - sqs.delete_message( - QueueUrl=qUrl, - ReceiptHandle=receipt_handle - ) - print('Deleted item from queue...') + sqs.delete_message(QueueUrl=qUrl, ReceiptHandle=receipt_handle) + print("Deleted item from queue...") jc += 1 except Exception as e: print("Error while starting job or deleting from queue: {}".format(e)) changeVisibility(sqs, qUrl, receipt_handle) - if(e.__class__.__name__ == 'LimitExceededException' - or e.__class__.__name__ == "ProvisionedThroughputExceededException"): + if ( + e.__class__.__name__ == "LimitExceededException" + or e.__class__.__name__ == "ProvisionedThroughputExceededException" + ): hitLimit = True limitException = e - if(hitLimit): + if hitLimit: raise limitException return totalMessages, jc + def processRequest(request): - qUrl = request['qUrl'] - snsTopic = request['snsTopic'] - snsRole = request['snsRole'] + qUrl = request["qUrl"] + snsTopic = request["snsTopic"] + snsRole = request["snsRole"] i = 0 max = 100 @@ -166,24 +168,24 @@ def processRequest(request): hitLimit = False provisionedThroughputExceededCount = 0 - while(i < max): + while i < max: try: tc, jc = processItems(qUrl, snsTopic, snsRole) totalJobsScheduled += jc - if(tc == 0): + if tc == 0: i = max except Exception as e: - if(e.__class__.__name__ == 'LimitExceededException'): + if e.__class__.__name__ == "LimitExceededException": print("Exception: Hit limit.") hitLimit = True i = max - elif(e.__class__.__name__ == "ProvisionedThroughputExceededException"): + elif e.__class__.__name__ == "ProvisionedThroughputExceededException": print("ProvisionedThroughputExceededException.") provisionedThroughputExceededCount += 1 - if(provisionedThroughputExceededCount > 5): + if provisionedThroughputExceededCount > 5: i = max else: print("Waiting for few seconds...") @@ -193,15 +195,13 @@ def processRequest(request): i += 1 output = "Started {} jobs.".format(totalJobsScheduled) - if(hitLimit): + if hitLimit: output += " Hit limit." print(output) - return { - 'statusCode': 200, - 'body': output - } + return {"statusCode": 200, "body": output} + def lambda_handler(event, context): @@ -209,8 +209,8 @@ def lambda_handler(event, context): request = {} - request["qUrl"] = os.environ['ASYNC_QUEUE_URL'] - request["snsTopic"] = os.environ['SNS_TOPIC_ARN'] - request["snsRole"] = os.environ['SNS_ROLE_ARN'] + request["qUrl"] = os.environ["ASYNC_QUEUE_URL"] + request["snsTopic"] = os.environ["SNS_TOPIC_ARN"] + request["snsRole"] = os.environ["SNS_ROLE_ARN"] return processRequest(request) diff --git a/textract-pipeline/lambda/documentprocessor/lambda_function.py b/textract-pipeline/lambda/documentprocessor/lambda_function.py index f0d58964..cd858018 100644 --- a/textract-pipeline/lambda/documentprocessor/lambda_function.py +++ b/textract-pipeline/lambda/documentprocessor/lambda_function.py @@ -1,18 +1,18 @@ import json import os -from helper import FileHelper, AwsHelper + +from helper import AwsHelper, FileHelper + def postMessage(client, qUrl, jsonMessage): message = json.dumps(jsonMessage) - client.send_message( - QueueUrl=qUrl, - MessageBody=message - ) + client.send_message(QueueUrl=qUrl, MessageBody=message) print("Submitted message to queue: {}".format(message)) + def processRequest(request): output = "" @@ -28,76 +28,90 @@ def processRequest(request): ext = FileHelper.getFileExtenstion(objectName.lower()) print("Extension: {}".format(ext)) - if(ext and ext in ["jpg", "jpeg", "png"]): - qUrl = request['syncQueueUrl'] - elif (ext in ["pdf"]): - qUrl = request['asyncQueueUrl'] + if ext and ext in ["jpg", "jpeg", "png"]: + qUrl = request["syncQueueUrl"] + elif ext in ["pdf"]: + qUrl = request["asyncQueueUrl"] - if(qUrl): + if qUrl: features = ["Text", "Forms", "Tables"] - jsonMessage = { 'documentId' : documentId, - "features" : features, - 'bucketName': bucketName, - 'objectName' : objectName } + jsonMessage = { + "documentId": documentId, + "features": features, + "bucketName": bucketName, + "objectName": objectName, + } - client = AwsHelper().getClient('sqs') + client = AwsHelper().getClient("sqs") postMessage(client, qUrl, jsonMessage) - output = "Completed routing for documentId: {}, object: {}/{}".format(documentId, bucketName, objectName) + output = "Completed routing for documentId: {}, object: {}/{}".format( + documentId, bucketName, objectName + ) print(output) + def processRecord(record, syncQueueUrl, asyncQueueUrl): - + newImage = record["dynamodb"]["NewImage"] - + documentId = None bucketName = None objectName = None documentStatus = None - - if("documentId" in newImage and "S" in newImage["documentId"]): + + if "documentId" in newImage and "S" in newImage["documentId"]: documentId = newImage["documentId"]["S"] - if("bucketName" in newImage and "S" in newImage["bucketName"]): + if "bucketName" in newImage and "S" in newImage["bucketName"]: bucketName = newImage["bucketName"]["S"] - if("objectName" in newImage and "S" in newImage["objectName"]): + if "objectName" in newImage and "S" in newImage["objectName"]: objectName = newImage["objectName"]["S"] - if("documentStatus" in newImage and "S" in newImage["documentStatus"]): + if "documentStatus" in newImage and "S" in newImage["documentStatus"]: documentStatus = newImage["documentStatus"]["S"] - print("DocumentId: {}, BucketName: {}, ObjectName: {}, DocumentStatus: {}".format(documentId, bucketName, objectName, documentStatus)) + print( + "DocumentId: {}, BucketName: {}, ObjectName: {}, DocumentStatus: {}".format( + documentId, bucketName, objectName, documentStatus + ) + ) - if(documentId and bucketName and objectName and documentStatus): + if documentId and bucketName and objectName and documentStatus: request = {} request["documentId"] = documentId request["bucketName"] = bucketName request["objectName"] = objectName - request['syncQueueUrl'] = syncQueueUrl - request['asyncQueueUrl'] = asyncQueueUrl + request["syncQueueUrl"] = syncQueueUrl + request["asyncQueueUrl"] = asyncQueueUrl processRequest(request) + def lambda_handler(event, context): try: - + print("event: {}".format(event)) - syncQueueUrl = os.environ['SYNC_QUEUE_URL'] - asyncQueueUrl = os.environ['ASYNC_QUEUE_URL'] + syncQueueUrl = os.environ["SYNC_QUEUE_URL"] + asyncQueueUrl = os.environ["ASYNC_QUEUE_URL"] - if("Records" in event and event["Records"]): + if "Records" in event and event["Records"]: for record in event["Records"]: try: print("Processing record: {}".format(record)) - if("eventName" in record and record["eventName"] == "INSERT"): - if("dynamodb" in record and record["dynamodb"] and "NewImage" in record["dynamodb"]): + if "eventName" in record and record["eventName"] == "INSERT": + if ( + "dynamodb" in record + and record["dynamodb"] + and "NewImage" in record["dynamodb"] + ): processRecord(record, syncQueueUrl, asyncQueueUrl) except Exception as e: print("Faild to process record. Exception: {}".format(e)) except Exception as e: - print("Failed to process records. Exception: {}".format(e)) \ No newline at end of file + print("Failed to process records. Exception: {}".format(e)) diff --git a/textract-pipeline/lambda/helper/python/datastore.py b/textract-pipeline/lambda/helper/python/datastore.py index d4ff570c..1733aca4 100644 --- a/textract-pipeline/lambda/helper/python/datastore.py +++ b/textract-pipeline/lambda/helper/python/datastore.py @@ -1,10 +1,12 @@ +import datetime + import boto3 from botocore.exceptions import ClientError + from helper import AwsHelper -import datetime -class DocumentStore: +class DocumentStore: def __init__(self, documentsTableName, outputTableName): self._documentsTableName = documentsTableName self._outputTableName = outputTableName @@ -18,21 +20,21 @@ def createDocument(self, documentId, bucketName, objectName): try: table.update_item( - Key = { "documentId": documentId }, - UpdateExpression = 'SET bucketName = :bucketNameValue, objectName = :objectNameValue, documentStatus = :documentstatusValue, documentCreatedOn = :documentCreatedOnValue', - ConditionExpression = 'attribute_not_exists(documentId)', - ExpressionAttributeValues = { - ':bucketNameValue': bucketName, - ':objectNameValue': objectName, - ':documentstatusValue': 'IN_PROGRESS', - ':documentCreatedOnValue': str(datetime.datetime.utcnow()) - } + Key={"documentId": documentId}, + UpdateExpression="SET bucketName = :bucketNameValue, objectName = :objectNameValue, documentStatus = :documentstatusValue, documentCreatedOn = :documentCreatedOnValue", + ConditionExpression="attribute_not_exists(documentId)", + ExpressionAttributeValues={ + ":bucketNameValue": bucketName, + ":objectNameValue": objectName, + ":documentstatusValue": "IN_PROGRESS", + ":documentCreatedOnValue": str(datetime.datetime.utcnow()), + }, ) except ClientError as e: print(e) - if e.response['Error']['Code'] == "ConditionalCheckFailedException": - print(e.response['Error']['Message']) - err = {'Error' : 'Document already exist.'} + if e.response["Error"]["Code"] == "ConditionalCheckFailedException": + print(e.response["Error"]["Message"]) + err = {"Error": "Document already exist."} else: raise @@ -47,17 +49,15 @@ def updateDocumentStatus(self, documentId, documentStatus): try: table.update_item( - Key = { 'documentId': documentId }, - UpdateExpression = 'SET documentStatus= :documentstatusValue', - ConditionExpression = 'attribute_exists(documentId)', - ExpressionAttributeValues = { - ':documentstatusValue': documentStatus - } + Key={"documentId": documentId}, + UpdateExpression="SET documentStatus= :documentstatusValue", + ConditionExpression="attribute_exists(documentId)", + ExpressionAttributeValues={":documentstatusValue": documentStatus}, ) except ClientError as e: - if e.response['Error']['Code'] == "ConditionalCheckFailedException": - print(e.response['Error']['Message']) - err = {'Error' : 'Document does not exist.'} + if e.response["Error"]["Code"] == "ConditionalCheckFailedException": + print(e.response["Error"]["Message"]) + err = {"Error": "Document does not exist."} else: raise @@ -72,18 +72,18 @@ def markDocumentComplete(self, documentId): try: table.update_item( - Key = { 'documentId': documentId }, - UpdateExpression = 'SET documentStatus= :documentstatusValue, documentCompletedOn = :documentCompletedOnValue', - ConditionExpression = 'attribute_exists(documentId)', - ExpressionAttributeValues = { - ':documentstatusValue': "SUCCEEDED", - ':documentCompletedOnValue': str(datetime.datetime.utcnow()) - } + Key={"documentId": documentId}, + UpdateExpression="SET documentStatus= :documentstatusValue, documentCompletedOn = :documentCompletedOnValue", + ConditionExpression="attribute_exists(documentId)", + ExpressionAttributeValues={ + ":documentstatusValue": "SUCCEEDED", + ":documentCompletedOnValue": str(datetime.datetime.utcnow()), + }, ) except ClientError as e: - if e.response['Error']['Code'] == "ConditionalCheckFailedException": - print(e.response['Error']['Message']) - err = {'Error' : 'Document does not exist.'} + if e.response["Error"]["Code"] == "ConditionalCheckFailedException": + print(e.response["Error"]["Message"]) + err = {"Error": "Document does not exist."} else: raise @@ -94,17 +94,18 @@ def getDocument(self, documentId): dynamodb = AwsHelper().getClient("dynamodb") ddbGetItemResponse = dynamodb.get_item( - Key={'documentId': {'S': documentId} }, - TableName=self._documentsTableName + Key={"documentId": {"S": documentId}}, TableName=self._documentsTableName ) itemToReturn = None - if('Item' in ddbGetItemResponse): - itemToReturn = { 'documentId' : ddbGetItemResponse['Item']['documentId']['S'], - 'bucketName' : ddbGetItemResponse['Item']['bucketName']['S'], - 'objectName' : ddbGetItemResponse['Item']['objectName']['S'], - 'documentStatus' : ddbGetItemResponse['Item']['documentStatus']['S'] } + if "Item" in ddbGetItemResponse: + itemToReturn = { + "documentId": ddbGetItemResponse["Item"]["documentId"]["S"], + "bucketName": ddbGetItemResponse["Item"]["bucketName"]["S"], + "objectName": ddbGetItemResponse["Item"]["objectName"]["S"], + "documentStatus": ddbGetItemResponse["Item"]["documentStatus"]["S"], + } return itemToReturn @@ -113,11 +114,7 @@ def deleteDocument(self, documentId): dynamodb = AwsHelper().getResource("dynamodb") table = dynamodb.Table(self._documentsTableName) - table.delete_item( - Key={ - 'documentId': documentId - } - ) + table.delete_item(Key={"documentId": documentId}) def getDocuments(self, nextToken=None): @@ -126,8 +123,10 @@ def getDocuments(self, nextToken=None): pageSize = 25 - if(nextToken): - response = table.scan(ExclusiveStartKey={ "documentId" : nextToken}, Limit=pageSize) + if nextToken: + response = table.scan( + ExclusiveStartKey={"documentId": nextToken}, Limit=pageSize + ) else: response = table.scan(Limit=pageSize) @@ -135,16 +134,14 @@ def getDocuments(self, nextToken=None): data = [] - if('Items' in response): - data = response['Items'] + if "Items" in response: + data = response["Items"] - documents = { - "documents" : data - } + documents = {"documents": data} - if 'LastEvaluatedKey' in response: - nextToken = response['LastEvaluatedKey']['documentId'] + if "LastEvaluatedKey" in response: + nextToken = response["LastEvaluatedKey"]["documentId"] print("nexToken: {}".format(nextToken)) documents["nextToken"] = nextToken - return documents \ No newline at end of file + return documents diff --git a/textract-pipeline/lambda/helper/python/helper.py b/textract-pipeline/lambda/helper/python/helper.py index 77c55dda..0c29b40d 100644 --- a/textract-pipeline/lambda/helper/python/helper.py +++ b/textract-pipeline/lambda/helper/python/helper.py @@ -1,12 +1,13 @@ -import boto3 -from botocore.client import Config -import os import csv import io +import os + +import boto3 from boto3.dynamodb.conditions import Key +from botocore.client import Config -class DynamoDBHelper: +class DynamoDBHelper: @staticmethod def getItems(tableName, key, value): items = None @@ -17,7 +18,7 @@ def getItems(tableName, key, value): if key is not None and value is not None: filter = Key(key).eq(value) queryResult = table.query(KeyConditionExpression=filter) - if(queryResult and "Items" in queryResult): + if queryResult and "Items" in queryResult: items = queryResult["Items"] return items @@ -35,50 +36,40 @@ def insertItem(tableName, itemData): @staticmethod def deleteItems(tableName, key, value, sk): items = DynamoDBHelper.getItems(tableName, key, value) - if(items): + if items: ddb = AwsHelper().getResource("dynamodb") table = ddb.Table(tableName) for item in items: print("Deleting...") print("{} : {}".format(key, item[key])) print("{} : {}".format(sk, item[sk])) - table.delete_item( - Key={ - key: value, - sk : item[sk] - }) + table.delete_item(Key={key: value, sk: item[sk]}) print("Deleted...") + class AwsHelper: def getClient(self, name, awsRegion=None): - config = Config( - retries = dict( - max_attempts = 30 - ) - ) - if(awsRegion): + config = Config(retries=dict(max_attempts=30)) + if awsRegion: return boto3.client(name, region_name=awsRegion, config=config) else: return boto3.client(name, config=config) def getResource(self, name, awsRegion=None): - config = Config( - retries = dict( - max_attempts = 30 - ) - ) + config = Config(retries=dict(max_attempts=30)) - if(awsRegion): + if awsRegion: return boto3.resource(name, region_name=awsRegion, config=config) else: return boto3.resource(name, config=config) + class S3Helper: @staticmethod def getS3BucketRegion(bucketName): - client = boto3.client('s3') + client = boto3.client("s3") response = client.get_bucket_location(Bucket=bucketName) - awsRegion = response['LocationConstraint'] + awsRegion = response["LocationConstraint"] return awsRegion @staticmethod @@ -90,49 +81,50 @@ def getFileNames(bucketName, prefix, maxPages, allowedFileTypes, awsRegion=None) hasMoreContent = True continuationToken = None - s3client = AwsHelper().getClient('s3', awsRegion) + s3client = AwsHelper().getClient("s3", awsRegion) - while(hasMoreContent and currentPage <= maxPages): - if(continuationToken): + while hasMoreContent and currentPage <= maxPages: + if continuationToken: listObjectsResponse = s3client.list_objects_v2( Bucket=bucketName, Prefix=prefix, - ContinuationToken=continuationToken) + ContinuationToken=continuationToken, + ) else: listObjectsResponse = s3client.list_objects_v2( - Bucket=bucketName, - Prefix=prefix) + Bucket=bucketName, Prefix=prefix + ) - if(listObjectsResponse['IsTruncated']): - continuationToken = listObjectsResponse['NextContinuationToken'] + if listObjectsResponse["IsTruncated"]: + continuationToken = listObjectsResponse["NextContinuationToken"] else: hasMoreContent = False - for doc in listObjectsResponse['Contents']: - docName = doc['Key'] + for doc in listObjectsResponse["Contents"]: + docName = doc["Key"] docExt = FileHelper.getFileExtenstion(docName) docExtLower = docExt.lower() - if(docExtLower in allowedFileTypes): + if docExtLower in allowedFileTypes: files.append(docName) return files @staticmethod def writeToS3(content, bucketName, s3FileName, awsRegion=None): - s3 = AwsHelper().getResource('s3', awsRegion) + s3 = AwsHelper().getResource("s3", awsRegion) object = s3.Object(bucketName, s3FileName) object.put(Body=content) @staticmethod def readFromS3(bucketName, s3FileName, awsRegion=None): - s3 = AwsHelper().getResource('s3', awsRegion) + s3 = AwsHelper().getResource("s3", awsRegion) obj = s3.Object(bucketName, s3FileName) - return obj.get()['Body'].read().decode('utf-8') + return obj.get()["Body"].read().decode("utf-8") @staticmethod def writeCSV(fieldNames, csvData, bucketName, s3FileName, awsRegion=None): csv_file = io.StringIO() - #with open(fileName, 'w') as csv_file: + # with open(fileName, 'w') as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fieldNames) writer.writeheader() @@ -148,7 +140,7 @@ def writeCSV(fieldNames, csvData, bucketName, s3FileName, awsRegion=None): @staticmethod def writeCSVRaw(csvData, bucketName, s3FileName): csv_file = io.StringIO() - #with open(fileName, 'w') as csv_file: + # with open(fileName, 'w') as csv_file: writer = csv.writer(csv_file) for item in csvData: writer.writerow(item) @@ -174,27 +166,27 @@ def getFileExtenstion(fileName): dn, dext = os.path.splitext(basename) return dext[1:] - @staticmethod def readFile(fileName): - with open(fileName, 'r') as document: + with open(fileName, "r") as document: return document.read() @staticmethod def writeToFile(fileName, content): - with open(fileName, 'w') as document: + with open(fileName, "w") as document: document.write(content) @staticmethod def writeToFileWithMode(fileName, content, mode): with open(fileName, mode) as document: document.write(content) + @staticmethod def getFilesInFolder(path, fileTypes): for file in os.listdir(path): if os.path.isfile(os.path.join(path, file)): ext = FileHelper.getFileExtenstion(file) - if(ext.lower() in fileTypes): + if ext.lower() in fileTypes: yield file @staticmethod @@ -208,7 +200,7 @@ def getFileNames(path, allowedLocalFileTypes): @staticmethod def writeCSV(fileName, fieldNames, csvData): - with open(fileName, 'w') as csv_file: + with open(fileName, "w") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fieldNames) writer.writeheader() @@ -222,7 +214,7 @@ def writeCSV(fileName, fieldNames, csvData): @staticmethod def writeCSVRaw(fileName, csvData): - with open(fileName, 'w') as csv_file: + with open(fileName, "w") as csv_file: writer = csv.writer(csv_file) for item in csvData: writer.writerow(item) diff --git a/textract-pipeline/lambda/jobresultprocessor/lambda_function.py b/textract-pipeline/lambda/jobresultprocessor/lambda_function.py index cdc8c260..48212c5b 100644 --- a/textract-pipeline/lambda/jobresultprocessor/lambda_function.py +++ b/textract-pipeline/lambda/jobresultprocessor/lambda_function.py @@ -1,10 +1,13 @@ import json import os -import boto3 import time + +import boto3 + +import datastore from helper import AwsHelper from og import OutputGenerator -import datastore + def getJobResults(api, jobId): @@ -12,47 +15,50 @@ def getJobResults(api, jobId): time.sleep(5) - client = AwsHelper().getClient('textract') - if(api == "StartDocumentTextDetection"): + client = AwsHelper().getClient("textract") + if api == "StartDocumentTextDetection": response = client.get_document_text_detection(JobId=jobId) else: response = client.get_document_analysis(JobId=jobId) pages.append(response) print("Resultset page recieved: {}".format(len(pages))) nextToken = None - if('NextToken' in response): - nextToken = response['NextToken'] + if "NextToken" in response: + nextToken = response["NextToken"] print("Next token: {}".format(nextToken)) - while(nextToken): + while nextToken: time.sleep(5) - if(api == "StartDocumentTextDetection"): - response = client.get_document_text_detection(JobId=jobId, NextToken=nextToken) + if api == "StartDocumentTextDetection": + response = client.get_document_text_detection( + JobId=jobId, NextToken=nextToken + ) else: response = client.get_document_analysis(JobId=jobId, NextToken=nextToken) pages.append(response) print("Resultset page recieved: {}".format(len(pages))) nextToken = None - if('NextToken' in response): - nextToken = response['NextToken'] + if "NextToken" in response: + nextToken = response["NextToken"] print("Next token: {}".format(nextToken)) return pages + def processRequest(request): output = "" print(request) - jobId = request['jobId'] - jobTag = request['jobTag'] - jobStatus = request['jobStatus'] - jobAPI = request['jobAPI'] - bucketName = request['bucketName'] - objectName = request['objectName'] + jobId = request["jobId"] + jobTag = request["jobTag"] + jobStatus = request["jobStatus"] + jobAPI = request["jobAPI"] + bucketName = request["bucketName"] + objectName = request["objectName"] outputTable = request["outputTable"] documentsTable = request["documentsTable"] @@ -65,14 +71,16 @@ def processRequest(request): detectForms = False detectTables = False - if(jobAPI == "StartDocumentAnalysis"): + if jobAPI == "StartDocumentAnalysis": detectForms = True detectTables = True - dynamodb = AwsHelper().getResource('dynamodb') + dynamodb = AwsHelper().getResource("dynamodb") ddb = dynamodb.Table(outputTable) - opg = OutputGenerator(jobTag, pages, bucketName, objectName, detectForms, detectTables, ddb) + opg = OutputGenerator( + jobTag, pages, bucketName, objectName, detectForms, detectTables, ddb + ) opg.run() print("DocumentId: {}".format(jobTag)) @@ -80,38 +88,39 @@ def processRequest(request): ds = datastore.DocumentStore(documentsTable, outputTable) ds.markDocumentComplete(jobTag) - output = "Processed -> Document: {}, Object: {}/{} processed.".format(jobTag, bucketName, objectName) + output = "Processed -> Document: {}, Object: {}/{} processed.".format( + jobTag, bucketName, objectName + ) print(output) - return { - 'statusCode': 200, - 'body': output - } + return {"statusCode": 200, "body": output} + def lambda_handler(event, context): print("event: {}".format(event)) - body = json.loads(event['Records'][0]['body']) - message = json.loads(body['Message']) + body = json.loads(event["Records"][0]["body"]) + message = json.loads(body["Message"]) print("Message: {}".format(message)) request = {} - request["jobId"] = message['JobId'] - request["jobTag"] = message['JobTag'] - request["jobStatus"] = message['Status'] - request["jobAPI"] = message['API'] - request["bucketName"] = message['DocumentLocation']['S3Bucket'] - request["objectName"] = message['DocumentLocation']['S3ObjectName'] - - request["outputTable"] = os.environ['OUTPUT_TABLE'] - request["documentsTable"] = os.environ['DOCUMENTS_TABLE'] + request["jobId"] = message["JobId"] + request["jobTag"] = message["JobTag"] + request["jobStatus"] = message["Status"] + request["jobAPI"] = message["API"] + request["bucketName"] = message["DocumentLocation"]["S3Bucket"] + request["objectName"] = message["DocumentLocation"]["S3ObjectName"] + + request["outputTable"] = os.environ["OUTPUT_TABLE"] + request["documentsTable"] = os.environ["DOCUMENTS_TABLE"] return processRequest(request) + def lambda_handler_local(event, context): print("event: {}".format(event)) return processRequest(event) diff --git a/textract-pipeline/lambda/s3batchprocessor/lambda_function.py b/textract-pipeline/lambda/s3batchprocessor/lambda_function.py index 8c95703a..daf4749b 100644 --- a/textract-pipeline/lambda/s3batchprocessor/lambda_function.py +++ b/textract-pipeline/lambda/s3batchprocessor/lambda_function.py @@ -1,10 +1,12 @@ import json import os -import uuid import urllib +import uuid + import datastore from helper import FileHelper + def processRequest(request): output = "" @@ -17,37 +19,44 @@ def processRequest(request): outputTable = request["outputTable"] jobId = request["jobId"] - invocationId = request['invocationId'] - invocationSchemaVersion = request['invocationSchemaVersion'] - taskId = request['taskId'] + invocationId = request["invocationId"] + invocationSchemaVersion = request["invocationSchemaVersion"] + taskId = request["taskId"] print("Input Object: {}/{}".format(bucketName, objectName)) ext = FileHelper.getFileExtenstion(objectName.lower()) print("Extension: {}".format(ext)) - if(ext and ext in ["jpg", "jpeg", "png", "pdf"]): + if ext and ext in ["jpg", "jpeg", "png", "pdf"]: documentId = str(uuid.uuid1()) ds = datastore.DocumentStore(documentsTable, outputTable) ds.createDocument(documentId, bucketName, objectName) - output = "Saved document {} for {}/{}".format(documentId, bucketName, objectName) + output = "Saved document {} for {}/{}".format( + documentId, bucketName, objectName + ) print(output) - results = [{ - 'taskId': taskId, - 'resultCode': 'Succeeded', - 'resultString': "Document submitted for processing with Id: {}".format(documentId) - }] - + results = [ + { + "taskId": taskId, + "resultCode": "Succeeded", + "resultString": "Document submitted for processing with Id: {}".format( + documentId + ), + } + ] + return { - 'invocationSchemaVersion': invocationSchemaVersion, - 'treatMissingKeysAs': 'PermanentFailure', - 'invocationId': invocationId, - 'results': results + "invocationSchemaVersion": invocationSchemaVersion, + "treatMissingKeysAs": "PermanentFailure", + "invocationId": invocationId, + "results": results, } + def lambda_handler(event, context): print("event: {}".format(event)) @@ -55,19 +64,19 @@ def lambda_handler(event, context): request = {} # Parse job parameters - request["jobId"] = event['job']['id'] - request["invocationId"] = event['invocationId'] - request["invocationSchemaVersion"] = event['invocationSchemaVersion'] + request["jobId"] = event["job"]["id"] + request["invocationId"] = event["invocationId"] + request["invocationSchemaVersion"] = event["invocationSchemaVersion"] # Task - request["task"] = event['tasks'][0] - request["taskId"] = event['tasks'][0]['taskId'] - request["objectName"] = urllib.parse.unquote_plus(event['tasks'][0]['s3Key']) - request["s3VersionId"] = event['tasks'][0]['s3VersionId'] - request["s3BucketArn"] = event['tasks'][0]['s3BucketArn'] - request["bucketName"] = request["s3BucketArn"].split(':')[-1] - - request["documentsTable"] = os.environ['DOCUMENTS_TABLE'] - request["outputTable"] = os.environ['OUTPUT_TABLE'] + request["task"] = event["tasks"][0] + request["taskId"] = event["tasks"][0]["taskId"] + request["objectName"] = urllib.parse.unquote_plus(event["tasks"][0]["s3Key"]) + request["s3VersionId"] = event["tasks"][0]["s3VersionId"] + request["s3BucketArn"] = event["tasks"][0]["s3BucketArn"] + request["bucketName"] = request["s3BucketArn"].split(":")[-1] + + request["documentsTable"] = os.environ["DOCUMENTS_TABLE"] + request["outputTable"] = os.environ["OUTPUT_TABLE"] return processRequest(request) diff --git a/textract-pipeline/lambda/s3processor/lambda_function.py b/textract-pipeline/lambda/s3processor/lambda_function.py index 06dc812a..aa220f8a 100644 --- a/textract-pipeline/lambda/s3processor/lambda_function.py +++ b/textract-pipeline/lambda/s3processor/lambda_function.py @@ -1,10 +1,12 @@ import json import os -import uuid import urllib +import uuid + import datastore from helper import FileHelper + def processRequest(request): output = "" @@ -21,28 +23,30 @@ def processRequest(request): ext = FileHelper.getFileExtenstion(objectName.lower()) print("Extension: {}".format(ext)) - if(ext and ext in ["jpg", "jpeg", "png", "pdf"]): + if ext and ext in ["jpg", "jpeg", "png", "pdf"]: documentId = str(uuid.uuid1()) ds = datastore.DocumentStore(documentsTable, outputTable) ds.createDocument(documentId, bucketName, objectName) - output = "Saved document {} for {}/{}".format(documentId, bucketName, objectName) + output = "Saved document {} for {}/{}".format( + documentId, bucketName, objectName + ) print(output) - return { - 'statusCode': 200, - 'body': json.dumps(output) - } + return {"statusCode": 200, "body": json.dumps(output)} + def lambda_handler(event, context): print("event: {}".format(event)) request = {} - request["bucketName"] = event['Records'][0]['s3']['bucket']['name'] - request["objectName"] = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key']) - request["documentsTable"] = os.environ['DOCUMENTS_TABLE'] - request["outputTable"] = os.environ['OUTPUT_TABLE'] + request["bucketName"] = event["Records"][0]["s3"]["bucket"]["name"] + request["objectName"] = urllib.parse.unquote_plus( + event["Records"][0]["s3"]["object"]["key"] + ) + request["documentsTable"] = os.environ["DOCUMENTS_TABLE"] + request["outputTable"] = os.environ["OUTPUT_TABLE"] return processRequest(request) diff --git a/textract-pipeline/lambda/syncprocessor/lambda_function.py b/textract-pipeline/lambda/syncprocessor/lambda_function.py index 5756d765..c7f648f4 100644 --- a/textract-pipeline/lambda/syncprocessor/lambda_function.py +++ b/textract-pipeline/lambda/syncprocessor/lambda_function.py @@ -1,56 +1,55 @@ -import boto3 -from decimal import Decimal import json import os -from helper import AwsHelper, S3Helper, DynamoDBHelper -from og import OutputGenerator +from decimal import Decimal + +import boto3 + import datastore +from helper import AwsHelper, DynamoDBHelper, S3Helper +from og import OutputGenerator + def callTextract(bucketName, objectName, detectText, detectForms, detectTables): - textract = AwsHelper().getClient('textract') - if(not detectForms and not detectTables): + textract = AwsHelper().getClient("textract") + if not detectForms and not detectTables: response = textract.detect_document_text( - Document={ - 'S3Object': { - 'Bucket': bucketName, - 'Name': objectName - } - } + Document={"S3Object": {"Bucket": bucketName, "Name": objectName}} ) else: - features = [] - if(detectTables): + features = [] + if detectTables: features.append("TABLES") - if(detectForms): + if detectForms: features.append("FORMS") - + response = textract.analyze_document( - Document={ - 'S3Object': { - 'Bucket': bucketName, - 'Name': objectName - } - }, - FeatureTypes=features + Document={"S3Object": {"Bucket": bucketName, "Name": objectName}}, + FeatureTypes=features, ) return response -def processImage(documentId, features, bucketName, objectName, outputTableName, documentsTableName): +def processImage( + documentId, features, bucketName, objectName, outputTableName, documentsTableName +): detectText = "Text" in features detectForms = "Forms" in features detectTables = "Tables" in features - response = callTextract(bucketName, objectName, detectText, detectForms, detectTables) + response = callTextract( + bucketName, objectName, detectText, detectForms, detectTables + ) dynamodb = AwsHelper().getResource("dynamodb") ddb = dynamodb.Table(outputTableName) print("Generating output for DocumentId: {}".format(documentId)) - opg = OutputGenerator(documentId, response, bucketName, objectName, detectForms, detectTables, ddb) + opg = OutputGenerator( + documentId, response, bucketName, objectName, detectForms, detectTables, ddb + ) opg.run() print("DocumentId: {}".format(documentId)) @@ -58,47 +57,55 @@ def processImage(documentId, features, bucketName, objectName, outputTableName, ds = datastore.DocumentStore(documentsTableName, outputTableName) ds.markDocumentComplete(documentId) + # --------------- Main handler ------------------ + def processRequest(request): output = "" print("request: {}".format(request)) - bucketName = request['bucketName'] - objectName = request['objectName'] - features = request['features'] - documentId = request['documentId'] - outputTable = request['outputTable'] - documentsTable = request['documentsTable'] + bucketName = request["bucketName"] + objectName = request["objectName"] + features = request["features"] + documentId = request["documentId"] + outputTable = request["outputTable"] + documentsTable = request["documentsTable"] documentsTable = request["documentsTable"] - - if(documentId and bucketName and objectName and features): - print("DocumentId: {}, features: {}, Object: {}/{}".format(documentId, features, bucketName, objectName)) - processImage(documentId, features, bucketName, objectName, outputTable, documentsTable) + if documentId and bucketName and objectName and features: + print( + "DocumentId: {}, features: {}, Object: {}/{}".format( + documentId, features, bucketName, objectName + ) + ) - output = "Document: {}, features: {}, Object: {}/{} processed.".format(documentId, features, bucketName, objectName) + processImage( + documentId, features, bucketName, objectName, outputTable, documentsTable + ) + + output = "Document: {}, features: {}, Object: {}/{} processed.".format( + documentId, features, bucketName, objectName + ) print(output) - return { - 'statusCode': 200, - 'body': output - } + return {"statusCode": 200, "body": output} + def lambda_handler(event, context): print("event: {}".format(event)) - message = json.loads(event['Records'][0]['body']) + message = json.loads(event["Records"][0]["body"]) print("Message: {}".format(message)) request = {} - request["documentId"] = message['documentId'] - request["bucketName"] = message['bucketName'] - request["objectName"] = message['objectName'] - request["features"] = message['features'] - request["outputTable"] = os.environ['OUTPUT_TABLE'] - request["documentsTable"] = os.environ['DOCUMENTS_TABLE'] - - return processRequest(request) \ No newline at end of file + request["documentId"] = message["documentId"] + request["bucketName"] = message["bucketName"] + request["objectName"] = message["objectName"] + request["features"] = message["features"] + request["outputTable"] = os.environ["OUTPUT_TABLE"] + request["documentsTable"] = os.environ["DOCUMENTS_TABLE"] + + return processRequest(request) diff --git a/textract-pipeline/lambda/textractor/python/og.py b/textract-pipeline/lambda/textractor/python/og.py index 7e3638f6..d7ce3296 100644 --- a/textract-pipeline/lambda/textractor/python/og.py +++ b/textract-pipeline/lambda/textractor/python/og.py @@ -1,10 +1,15 @@ import json + +import boto3 + from helper import FileHelper, S3Helper from trp import Document -import boto3 + class OutputGenerator: - def __init__(self, documentId, response, bucketName, objectName, forms, tables, ddb): + def __init__( + self, documentId, response, bucketName, objectName, forms, tables, ddb + ): self.documentId = documentId self.response = response self.bucketName = bucketName @@ -20,9 +25,9 @@ def __init__(self, documentId, response, bucketName, objectName, forms, tables, def saveItem(self, pk, sk, output): jsonItem = {} - jsonItem['documentId'] = pk - jsonItem['outputType'] = sk - jsonItem['outputPath'] = output + jsonItem["documentId"] = pk + jsonItem["outputType"] = sk + jsonItem["outputPath"] = output self.ddb.put_item(Item=jsonItem) @@ -40,17 +45,17 @@ def _outputText(self, page, p): def _outputForm(self, page, p): csvData = [] for field in page.form.fields: - csvItem = [] - if(field.key): + csvItem = [] + if field.key: csvItem.append(field.key.text) else: csvItem.append("") - if(field.value): + if field.value: csvItem.append(field.value.text) else: csvItem.append("") csvData.append(csvItem) - csvFieldNames = ['Key', 'Value'] + csvFieldNames = ["Key", "Value"] opath = "{}page-{}-forms.csv".format(self.outputPath, p) S3Helper.writeCSV(csvFieldNames, csvData, self.bucketName, opath) self.saveItem(self.documentId, "page-{}-Forms".format(p), opath) @@ -63,7 +68,7 @@ def _outputTable(self, page, p): csvRow.append("Table") csvData.append(csvRow) for row in table.rows: - csvRow = [] + csvRow = [] for cell in row.cells: csvRow.append(cell.text) csvData.append(csvRow) @@ -76,12 +81,12 @@ def _outputTable(self, page, p): def run(self): - if(not self.document.pages): + if not self.document.pages: return opath = "{}response.json".format(self.outputPath) S3Helper.writeToS3(json.dumps(self.response), self.bucketName, opath) - self.saveItem(self.documentId, 'Response', opath) + self.saveItem(self.documentId, "Response", opath) print("Total Pages in Document: {}".format(len(self.document.pages))) @@ -98,10 +103,10 @@ def run(self): docText = docText + page.text + "\n" - if(self.forms): + if self.forms: self._outputForm(page, p) - if(self.tables): + if self.tables: self._outputTable(page, p) - p = p + 1 \ No newline at end of file + p = p + 1 diff --git a/textract-pipeline/lambda/textractor/python/trp.py b/textract-pipeline/lambda/textractor/python/trp.py index 2f707ce6..693e94aa 100644 --- a/textract-pipeline/lambda/textractor/python/trp.py +++ b/textract-pipeline/lambda/textractor/python/trp.py @@ -1,5 +1,6 @@ import json + class BoundingBox: def __init__(self, width, height, left, top): self._width = width @@ -8,7 +9,9 @@ def __init__(self, width, height, left, top): self._top = top def __str__(self): - return "width: {}, height: {}, left: {}, top: {}".format(self._width, self._height, self._left, self._top) + return "width: {}, height: {}, left: {}, top: {}".format( + self._width, self._height, self._left, self._top + ) @property def width(self): @@ -26,6 +29,7 @@ def left(self): def top(self): return self._top + class Polygon: def __init__(self, x, y): self._x = x @@ -42,11 +46,17 @@ def x(self): def y(self): return self._y + class Geometry: def __init__(self, geometry): boundingBox = geometry["BoundingBox"] polygon = geometry["Polygon"] - bb = BoundingBox(boundingBox["Width"], boundingBox["Height"], boundingBox["Left"], boundingBox["Top"]) + bb = BoundingBox( + boundingBox["Width"], + boundingBox["Height"], + boundingBox["Left"], + boundingBox["Top"], + ) pgs = [] for pg in polygon: pgs.append(Polygon(pg["X"], pg["Y"])) @@ -66,15 +76,16 @@ def boundingBox(self): def polygon(self): return self._polygon + class Word: def __init__(self, block, blockMap): self._block = block - self._confidence = block['Confidence'] - self._geometry = Geometry(block['Geometry']) - self._id = block['Id'] + self._confidence = block["Confidence"] + self._geometry = Geometry(block["Geometry"]) + self._id = block["Id"] self._text = "" - if(block['Text']): - self._text = block['Text'] + if block["Text"]: + self._text = block["Text"] def __str__(self): return self._text @@ -99,25 +110,27 @@ def text(self): def block(self): return self._block + class Line: def __init__(self, block, blockMap): self._block = block - self._confidence = block['Confidence'] - self._geometry = Geometry(block['Geometry']) - self._id = block['Id'] + self._confidence = block["Confidence"] + self._geometry = Geometry(block["Geometry"]) + self._id = block["Id"] self._text = "" - if(block['Text']): - self._text = block['Text'] + if block["Text"]: + self._text = block["Text"] self._words = [] - if('Relationships' in block and block['Relationships']): - for rs in block['Relationships']: - if(rs['Type'] == 'CHILD'): - for cid in rs['Ids']: - if(blockMap[cid]["BlockType"] == "WORD"): + if "Relationships" in block and block["Relationships"]: + for rs in block["Relationships"]: + if rs["Type"] == "CHILD": + for cid in rs["Ids"]: + if blockMap[cid]["BlockType"] == "WORD": self._words.append(Word(blockMap[cid], blockMap)) + def __str__(self): s = "Line\n==========\n" s = s + self._text + "\n" @@ -150,12 +163,13 @@ def text(self): def block(self): return self._block + class SelectionElement: def __init__(self, block, blockMap): - self._confidence = block['Confidence'] - self._geometry = Geometry(block['Geometry']) - self._id = block['Id'] - self._selectionStatus = block['SelectionStatus'] + self._confidence = block["Confidence"] + self._geometry = Geometry(block["Geometry"]) + self._id = block["Id"] + self._selectionStatus = block["SelectionStatus"] @property def confidence(self): @@ -173,12 +187,13 @@ def id(self): def selectionStatus(self): return self._selectionStatus + class FieldKey: def __init__(self, block, children, blockMap): self._block = block - self._confidence = block['Confidence'] - self._geometry = Geometry(block['Geometry']) - self._id = block['Id'] + self._confidence = block["Confidence"] + self._geometry = Geometry(block["Geometry"]) + self._id = block["Id"] self._text = "" self._content = [] @@ -186,13 +201,13 @@ def __init__(self, block, children, blockMap): for eid in children: wb = blockMap[eid] - if(wb['BlockType'] == "WORD"): + if wb["BlockType"] == "WORD": w = Word(wb, blockMap) self._content.append(w) t.append(w.text) - if(t): - self._text = ' '.join(t) + if t: + self._text = " ".join(t) def __str__(self): return self._text @@ -221,12 +236,13 @@ def text(self): def block(self): return self._block + class FieldValue: def __init__(self, block, children, blockMap): self._block = block - self._confidence = block['Confidence'] - self._geometry = Geometry(block['Geometry']) - self._id = block['Id'] + self._confidence = block["Confidence"] + self._geometry = Geometry(block["Geometry"]) + self._id = block["Id"] self._text = "" self._content = [] @@ -234,17 +250,17 @@ def __init__(self, block, children, blockMap): for eid in children: wb = blockMap[eid] - if(wb['BlockType'] == "WORD"): + if wb["BlockType"] == "WORD": w = Word(wb, blockMap) self._content.append(w) t.append(w.text) - elif(wb['BlockType'] == "SELECTION_ELEMENT"): + elif wb["BlockType"] == "SELECTION_ELEMENT": se = SelectionElement(wb, blockMap) self._content.append(se) self._text = se.selectionStatus - if(t): - self._text = ' '.join(t) + if t: + self._text = " ".join(t) def __str__(self): return self._text @@ -268,34 +284,38 @@ def content(self): @property def text(self): return self._text - + @property def block(self): return self._block + class Field: def __init__(self, block, blockMap): self._key = None self._value = None - for item in block['Relationships']: - if(item["Type"] == "CHILD"): - self._key = FieldKey(block, item['Ids'], blockMap) - elif(item["Type"] == "VALUE"): - for eid in item['Ids']: + for item in block["Relationships"]: + if item["Type"] == "CHILD": + self._key = FieldKey(block, item["Ids"], blockMap) + elif item["Type"] == "VALUE": + for eid in item["Ids"]: vkvs = blockMap[eid] - if 'VALUE' in vkvs['EntityTypes']: - if('Relationships' in vkvs): - for vitem in vkvs['Relationships']: - if(vitem["Type"] == "CHILD"): - self._value = FieldValue(vkvs, vitem['Ids'], blockMap) + if "VALUE" in vkvs["EntityTypes"]: + if "Relationships" in vkvs: + for vitem in vkvs["Relationships"]: + if vitem["Type"] == "CHILD": + self._value = FieldValue( + vkvs, vitem["Ids"], blockMap + ) + def __str__(self): s = "\nField\n==========\n" k = "" v = "" - if(self._key): + if self._key: k = str(self._key) - if(self._value): + if self._value: v = str(self._value) s = s + "Key: {}\nValue: {}".format(k, v) return s @@ -308,6 +328,7 @@ def key(self): def value(self): return self._value + class Form: def __init__(self): self._fields = [] @@ -329,44 +350,44 @@ def fields(self): def getFieldByKey(self, key): field = None - if(key in self._fieldsMap): + if key in self._fieldsMap: field = self._fieldsMap[key] return field - + def searchFieldsByKey(self, key): searchKey = key.lower() results = [] for field in self._fields: - if(field.key and searchKey in field.key.text.lower()): + if field.key and searchKey in field.key.text.lower(): results.append(field) return results -class Cell: +class Cell: def __init__(self, block, blockMap): self._block = block - self._confidence = block['Confidence'] - self._rowIndex = block['RowIndex'] - self._columnIndex = block['ColumnIndex'] - self._rowSpan = block['RowSpan'] - self._columnSpan = block['ColumnSpan'] - self._geometry = Geometry(block['Geometry']) - self._id = block['Id'] + self._confidence = block["Confidence"] + self._rowIndex = block["RowIndex"] + self._columnIndex = block["ColumnIndex"] + self._rowSpan = block["RowSpan"] + self._columnSpan = block["ColumnSpan"] + self._geometry = Geometry(block["Geometry"]) + self._id = block["Id"] self._content = [] self._text = "" - if('Relationships' in block and block['Relationships']): - for rs in block['Relationships']: - if(rs['Type'] == 'CHILD'): - for cid in rs['Ids']: + if "Relationships" in block and block["Relationships"]: + for rs in block["Relationships"]: + if rs["Type"] == "CHILD": + for cid in rs["Ids"]: blockType = blockMap[cid]["BlockType"] - if(blockType == "WORD"): + if blockType == "WORD": w = Word(blockMap[cid], blockMap) self._content.append(w) - self._text = self._text + w.text + ' ' - elif(blockType == "SELECTION_ELEMENT"): + self._text = self._text + w.text + " " + elif blockType == "SELECTION_ELEMENT": se = SelectionElement(blockMap[cid], blockMap) self._content.append(se) - self._text = self._text + se.selectionStatus + ', ' + self._text = self._text + se.selectionStatus + ", " def __str__(self): return self._text @@ -411,6 +432,7 @@ def text(self): def block(self): return self._block + class Row: def __init__(self): self._cells = [] @@ -425,32 +447,32 @@ def __str__(self): def cells(self): return self._cells -class Table: +class Table: def __init__(self, block, blockMap): self._block = block - self._confidence = block['Confidence'] - self._geometry = Geometry(block['Geometry']) + self._confidence = block["Confidence"] + self._geometry = Geometry(block["Geometry"]) - self._id = block['Id'] + self._id = block["Id"] self._rows = [] ri = 1 row = Row() cell = None - if('Relationships' in block and block['Relationships']): - for rs in block['Relationships']: - if(rs['Type'] == 'CHILD'): - for cid in rs['Ids']: + if "Relationships" in block and block["Relationships"]: + for rs in block["Relationships"]: + if rs["Type"] == "CHILD": + for cid in rs["Ids"]: cell = Cell(blockMap[cid], blockMap) - if(cell.rowIndex > ri): + if cell.rowIndex > ri: self._rows.append(row) row = Row() ri = cell.rowIndex row.cells.append(cell) - if(row and row.cells): + if row and row.cells: self._rows.append(row) def __str__(self): @@ -480,8 +502,8 @@ def rows(self): def block(self): return self._block -class Page: +class Page: def __init__(self, blocks, blockMap): self._blocks = blocks self._text = "" @@ -501,25 +523,27 @@ def __str__(self): def _parse(self, blockMap): for item in self._blocks: if item["BlockType"] == "PAGE": - self._geometry = Geometry(item['Geometry']) - self._id = item['Id'] + self._geometry = Geometry(item["Geometry"]) + self._id = item["Id"] elif item["BlockType"] == "LINE": l = Line(item, blockMap) self._lines.append(l) self._content.append(l) - self._text = self._text + l.text + '\n' + self._text = self._text + l.text + "\n" elif item["BlockType"] == "TABLE": t = Table(item, blockMap) self._tables.append(t) self._content.append(t) elif item["BlockType"] == "KEY_VALUE_SET": - if 'KEY' in item['EntityTypes']: + if "KEY" in item["EntityTypes"]: f = Field(item, blockMap) - if(f.key): + if f.key: self._form.addField(f) self._content.append(f) else: - print("WARNING: Detected K/V where key does not have content. Excluding key from output.") + print( + "WARNING: Detected K/V where key does not have content. Excluding key from output." + ) print(f) print(item) @@ -527,20 +551,32 @@ def getLinesInReadingOrder(self): columns = [] lines = [] for item in self._lines: - column_found=False - for index, column in enumerate(columns): - bbox_left = item.geometry.boundingBox.left - bbox_right = item.geometry.boundingBox.left + item.geometry.boundingBox.width - bbox_centre = item.geometry.boundingBox.left + item.geometry.boundingBox.width/2 - column_centre = column['left'] + column['right']/2 - if (bbox_centre > column['left'] and bbox_centre < column['right']) or (column_centre > bbox_left and column_centre < bbox_right): - #Bbox appears inside the column - lines.append([index, item.text]) - column_found=True - break - if not column_found: - columns.append({'left':item.geometry.boundingBox.left, 'right':item.geometry.boundingBox.left + item.geometry.boundingBox.width}) - lines.append([len(columns)-1, item.text]) + column_found = False + for index, column in enumerate(columns): + bbox_left = item.geometry.boundingBox.left + bbox_right = ( + item.geometry.boundingBox.left + item.geometry.boundingBox.width + ) + bbox_centre = ( + item.geometry.boundingBox.left + item.geometry.boundingBox.width / 2 + ) + column_centre = column["left"] + column["right"] / 2 + if (bbox_centre > column["left"] and bbox_centre < column["right"]) or ( + column_centre > bbox_left and column_centre < bbox_right + ): + # Bbox appears inside the column + lines.append([index, item.text]) + column_found = True + break + if not column_found: + columns.append( + { + "left": item.geometry.boundingBox.left, + "right": item.geometry.boundingBox.left + + item.geometry.boundingBox.width, + } + ) + lines.append([len(columns) - 1, item.text]) lines.sort(key=lambda x: x[0]) return lines @@ -549,7 +585,7 @@ def getTextInReadingOrder(self): lines = self.getLinesInReadingOrder() text = "" for line in lines: - text = text + line[1] + '\n' + text = text + line[1] + "\n" return text @property @@ -584,11 +620,11 @@ def geometry(self): def id(self): return self._id -class Document: +class Document: def __init__(self, responsePages): - if(not isinstance(responsePages, list)): + if not isinstance(responsePages, list): rps = [] rps.append(responsePages) responsePages = rps @@ -611,24 +647,27 @@ def _parseDocumentPagesAndBlockMap(self): documentPages = [] documentPage = None for page in self._responsePages: - for block in page['Blocks']: - if('BlockType' in block and 'Id' in block): - blockMap[block['Id']] = block + for block in page["Blocks"]: + if "BlockType" in block and "Id" in block: + blockMap[block["Id"]] = block - if(block['BlockType'] == 'PAGE'): - if(documentPage): - documentPages.append({"Blocks" : documentPage}) + if block["BlockType"] == "PAGE": + if documentPage: + documentPages.append({"Blocks": documentPage}) documentPage = [] documentPage.append(block) else: documentPage.append(block) - if(documentPage): - documentPages.append({"Blocks" : documentPage}) + if documentPage: + documentPages.append({"Blocks": documentPage}) return documentPages, blockMap def _parse(self): - self._responseDocumentPages, self._blockMap = self._parseDocumentPagesAndBlockMap() + ( + self._responseDocumentPages, + self._blockMap, + ) = self._parseDocumentPagesAndBlockMap() for documentPage in self._responseDocumentPages: page = Page(documentPage["Blocks"], self._blockMap) self._pages.append(page) @@ -647,7 +686,6 @@ def pages(self): def getBlockById(self, blockId): block = None - if(self._blockMap and blockId in self._blockMap): + if self._blockMap and blockId in self._blockMap: block = self._blockMap[blockId] return block -