diff --git a/.gitignore b/.gitignore index f43cf83..026b94b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ - # Created by https://www.gitignore.io/api/osx,linux,python,windows,pycharm,visualstudiocode # Exception for headers_keys.json @@ -254,6 +253,8 @@ custom_pytest.ini # End of https://www.gitignore.io/api/osx,linux,python,windows,pycharm,visualstudiocode run_*.sh +!run_vtdlp-ingest_example.sh +!run_vtdlp-testextract_example.sh lambda_local.sh results_files/ @@ -261,4 +262,12 @@ bin/ examples/ lib64/ pyvenv.cfg -lib64* \ No newline at end of file +lib64* +PR.md +*.log +test_event.json +run_vtdlp-ingest_testextract.sh +run_vtdlp-testextract.sh +run_vtdlp-ingest_testss.sh +python/ +requirements.txt \ No newline at end of file diff --git a/lambda_function.py b/lambda_function.py index 75af001..fafe239 100644 --- a/lambda_function.py +++ b/lambda_function.py @@ -8,6 +8,9 @@ env["script_root"] = os.path.abspath(os.path.dirname(__file__)) env["aws_src_bucket"] = os.getenv("AWS_SRC_BUCKET") env["aws_dest_bucket"] = os.getenv("AWS_DEST_BUCKET") +env["textract_bucket"] = os.getenv("TEXTRACT_BUCKET") +env["textract_line_table"] = os.getenv("TEXTRACT_LINE_TABLE") +env["textract_word_table"] = os.getenv("TEXTRACT_WORD_TABLE") env["collection_category"] = os.getenv("COLLECTION_CATEGORY") env["collection_identifier"] = os.getenv("COLLECTION_IDENTIFIER") env["collection_subdirectory"] = os.getenv("COLLECTION_SUBDIRECTORY") @@ -52,7 +55,12 @@ env["update_metadata"] = ( os.getenv("UPDATE_METADATA") is not None and os.getenv("UPDATE_METADATA").lower() == "true" ) - +env['local_textract'] = ( + os.getenv("LOCAL_TEXTRACT") is not None and os.getenv("LOCAL_TEXTRACT").lower() == "true" +) +env["process_textract"] = ( + os.getenv("PROCESS_TEXTRACT") is not None and os.getenv("PROCESS_TEXTRACT").lower() == "true" +) def new_media_type_handler(env, filename, bucket): media_type = media_types_map[env["media_type"]] diff --git a/requirements.txt b/requirements.txt index 2263918..2d9d860 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,6 @@ pytest pytest-cov pytest-env pytest-mock -requests \ No newline at end of file +requests +opencv-python +numpy \ No newline at end of file diff --git a/run_vtdlp-ingest_example.sh b/run_vtdlp-ingest_example.sh index d97f804..ac6e704 100644 --- a/run_vtdlp-ingest_example.sh +++ b/run_vtdlp-ingest_example.sh @@ -1,6 +1,9 @@ VERBOSE="true" \ AWS_SRC_BUCKET="" \ AWS_DEST_BUCKET="" \ +TEXTRACT_BUCKET="" \ +TEXTRACT_LINE_TABLE="" \ +TEXTRACT_WORD_TABLE="" \ COLLECTION_CATEGORY="" \ COLLECTION_IDENTIFIER="" \ COLLECTION_SUBDIRECTORY="" \ @@ -22,4 +25,5 @@ METADATA_INGEST="true" \ GENERATE_THUMBNAILS="false" \ DRY_RUN="false" \ UPDATE_METADATA="false" \ +PROCESS_TEXTRACT="false" \ python3 lambda_function.py "examples/testss/test_archive_metadata.csv" \ No newline at end of file diff --git a/run_vtdlp-testextract_example.sh b/run_vtdlp-testextract_example.sh new file mode 100644 index 0000000..e93d05d --- /dev/null +++ b/run_vtdlp-testextract_example.sh @@ -0,0 +1,4 @@ +TEXTRACT_LINE_TABLE="your_line_table_name" \ +TEXTRACT_WORD_TABLE="your_word_table_name" \ +TEXTRACT_BUCKET="your_textract_bucket_name" \ +python3 src/media_types/metadata/textract_lambda_handler.py \ No newline at end of file diff --git a/src/media_types/metadata/generic_metadata.py b/src/media_types/metadata/generic_metadata.py index 0762bfa..390a587 100644 --- a/src/media_types/metadata/generic_metadata.py +++ b/src/media_types/metadata/generic_metadata.py @@ -14,6 +14,7 @@ from boto3.dynamodb.conditions import Key, Attr from botocore.response import StreamingBody import re +from src.media_types.metadata.textract_lambda_handler import lambda_handler DUPLICATED = "Duplicated" @@ -217,7 +218,12 @@ def batch_import_archives(self, response): "Archive", idx ) - # Log trying to create an item that already exists + # --- TEXTRACT WORKFLOW --- + if self.env.get("process_textract", False): + print("\033[94mProcessing textract since PROCESS_TEXTRACT is set to True.\033[0m") + self.run_textract_workflow(collection_identifier, archive_dict["identifier"]) + else: + print("\033[94mSkipping Textract process as PROCESS_TEXTRACT is set to False.\033[0m") def get_table_name(self, table_name): return f"{table_name}-{self.env['dynamodb_table_suffix']}" @@ -329,7 +335,7 @@ def print_results(self): ) .put(Body=open(results_filename, "rb")) ) - print(s3_response) + #print(s3_response) status = s3_response["ResponseMetadata"]["HTTPStatusCode"] if status == 200: print("") @@ -896,4 +902,67 @@ def delete_NOID_record(self, noid): print("delete_NOID: SIMULATED.") else: self.env["mint_table"].delete_item(Key={"noid": noid}) - print(f"delete_NOID: {noid}") \ No newline at end of file + print(f"delete_NOID: {noid}") + + def run_textract_workflow(self, collection_identifier, archive_identifier): + """ + Finds images in S3 for the given archive and triggers Textract workflow for each. + In both local and AWS modes, first checks if the prefix exists in the textract bucket. + If it exists, skips processing. If not, copies files to textract bucket (AWS mode) + or triggers the lambda directly (local mode). + """ + s3_client = self.env["s3_client"] + print(f"Running Textract workflow for archive: {archive_identifier} in collection: {collection_identifier}") + print(f"\033[94mEnvironment settings: local_textract={self.env.get('local_textract', False)}\033[0m") + source_bucket = self.env["aws_src_bucket"] + # Build the s3 prefix for the archive + textract_bucket = self.env["textract_bucket"] + prefix = f"{self.env['collection_category']}/{collection_identifier}/{archive_identifier}/Access/" + + # Check if the prefix already exists in the Textract bucket + print(f"Checking if prefix {prefix} exists in Textract bucket {textract_bucket}...") + textract_objects = s3_client.list_objects_v2(Bucket=textract_bucket, Prefix=prefix) + #print(f"Textract bucket response: {textract_objects}")# + if "Contents" in textract_objects and len(textract_objects["Contents"]) > 0: + print(f"\033[91mWARNING: Identifier '{archive_identifier}' already exists in Textract bucket. Skipping Textract workflow.\033[0m") + return # Skip further processing + + response = s3_client.list_objects_v2(Bucket=source_bucket, Prefix=prefix) + #print(f"Source bucket response: {response}")# + if "Contents" in response: + print("Identifier does not exist in Textract bucket. Proceeding with Textract workflow.") + if self.env.get("local_textract", False): + # Run the Textract Lambda handler locally for each image + for obj in response["Contents"]: + key = obj["Key"] + if key.lower().endswith(('.jpg', '.jpeg', '.png', '.pdf')): + print(f"\033[94mRunning Textract Lambda handler locally for {key}...\033[0m") + event = { + "Records": [ + { + "body": json.dumps({ + "s3": { + "bucket": {"name": source_bucket}, + "object": {"key": key} + }, + "collection_identifier": collection_identifier + }) + } + ] + } + lambda_handler(event, None) + else: + # Copy files to textract bucket (AWS mode) + print(f"\033[94mCopying files from source bucket {source_bucket} to Textract bucket {textract_bucket} since local_textract is False...\033[0m") + for obj in response["Contents"]: + print(f"Found object in source bucket: {obj['Key']}") + key = obj["Key"] + if key.lower().endswith(('.jpg', '.jpeg', '.png', '.pdf')): + copy_source = {'Bucket': source_bucket, 'Key': key} + print(f"Copying {key} from {source_bucket} to {textract_bucket}...") + s3_client.copy_object( + Bucket=textract_bucket, + Key=key, + CopySource=copy_source + ) + print("Copy complete. S3 trigger on Textract bucket will invoke the Textract Lambda.") diff --git a/src/media_types/metadata/textract_lambda_handler.py b/src/media_types/metadata/textract_lambda_handler.py new file mode 100644 index 0000000..4d9abaa --- /dev/null +++ b/src/media_types/metadata/textract_lambda_handler.py @@ -0,0 +1,552 @@ +import json +import boto3 +import logging +from decimal import Decimal +import uuid +import sys +import os +import shutil +from decimal import Decimal +from botocore.exceptions import ClientError +from collections import defaultdict +from boto3.dynamodb.conditions import Key +import datetime +from PIL import Image +import pytesseract +import pytesseract +import os + +if os.environ.get("AWS_EXECUTION_ENV") is not None: +# # Running in AWS Lambda: use AWS Textract only + pytesseract.pytesseract.tesseract_cmd = '/opt/bin/tesseract' # or the correct path to your binary + +# Set up logging to write to a file with a timestamp in the filename +log_filename = f"textract_lambda_output_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.log" +logging.basicConfig( + filename=log_filename, + filemode='a', + format='%(asctime)s %(levelname)s: %(message)s', + level=logging.INFO +) +logger = logging.getLogger() +logger.setLevel("INFO") + +# Utility: recursively convert all float values to Decimal for DynamoDB +# This is needed because DynamoDB does not accept float types, only Decimal. +def convert_floats(obj): + # If the object is a float, convert it to Decimal + if isinstance(obj, float): + return Decimal(str(obj)) + # If the object is a dictionary, recursively convert its values + elif isinstance(obj, dict): + return {k: convert_floats(v) for k, v in obj.items()} + # If the object is a list, recursively convert its elements + elif isinstance(obj, list): + return [convert_floats(i) for i in obj] + # Otherwise, return the object as is + else: + return obj + +# AWS clients and logger setup +print("Initializing AWS Textract client and logger...") +logger.info("Initializing AWS Textract client and logger...") +textract_client = boto3.client('textract') # Textract client for OCR +logger = logging.getLogger() # Logger for debugging/info +logger.setLevel("INFO") + +# Things to DO +# 1. Parse collection name and filename from the message +# 2. Create folders +# 3. Integrate texteact logic + +#Left: change the output path and collectionanme using split + +# Parse Textract response and write line information to DynamoDB +def getLineInformation(collection_of_textract_responses, response_filename): + inserted_lines = [] + skipped_lines = [] + logger.info("Parsing line information from Textract response...") + print("Parsing line information from Textract response...") + # This method ingests the Textract responses in JSON format, parses it and returns the line information + total_text_with_info = [] + running_sequence_number = 0 + logger.info(f"Response filename: {response_filename}") + blocks = collection_of_textract_responses['Blocks'] + logger.info(f"Number of blocks in response: {len(blocks)}") + + # DynamoDB setup + logger.info("Setting up DynamoDB resource and table for line info...") + print("Setting up DynamoDB resource and table for line info...") + dynamodb = boto3.resource('dynamodb') + line_table_name = os.getenv("TEXTRACT_LINE_TABLE") + line_table = dynamodb.Table(line_table_name) + print("Scanning DynamoDB for existing items in textract line table using the unique_key...") + # Iterate through all blocks in the Textract response + for block in blocks: + logger.info(f"Processing block: {block.get('BlockType')}") + # Only process blocks of type 'LINE' + if block['BlockType'] == 'LINE': + item = {} + running_sequence_number += 1 + logger.info(f"Line block found. Sequence number: {running_sequence_number}") + #print(f"Line block found. Sequence number: {running_sequence_number}") + # 1. Extract identifier: everything but the last digits before underscore + # Example: "fchs_1950_001_015_001" -> "fchs_1950_001_015" + item['id'] =str(uuid.uuid4()) # Use the same UUID + identifier_full=response_filename + logger.info(f"Original identifier from JSON: {identifier_full}") + #print(f"Original identifier from JSON: {identifier_full}") + identifier_parts = identifier_full.split('_') + if len(identifier_parts) > 1: + item['identifier'] = '_'.join(identifier_parts[:-1]) + else: + item['identifier'] = identifier_full + logger.info(f"Processed identifier (without last digits): {item['identifier']}") + #print(f"Processed identifier (without last digits): {item['identifier']}") + + # 2. Set identifier_page to the whole identifier + + #item['child_ids'] = block.get('child_ids', []) + relation = block.get('Relationships', []) + child_ids = [] + # Extract child IDs from relationships if present + for sub in relation: + child_ids.extend(sub.get('Ids', [])) + item['child_ids'] = child_ids + logger.info(f"Child IDs: {item['child_ids']}") + item['collection_category'] = 'federated' + #item['collection_id'] = block.get('collectionId', '') + item['confidence'] = block.get('Confidence', 0) + logger.info(f"Confidence: {item['confidence']}") + from datetime import datetime, timezone + now = datetime.now(timezone.utc).isoformat() + item['created_at'] = now + #item['updated_at'] = block.get('updated_at', '') + # Geometry info + geom = block.get('Geometry', {}) + logger.info(f"Geometry: {geom}") + bounding_box = geom.get('BoundingBox', {}) + polygon = geom.get('Polygon', []) + # Store geometry information as strings for DynamoDB compatibility + item['geometry'] = { + 'boundingbox': { + 'Height': str(bounding_box.get('Height', '')), + 'Left': str(bounding_box.get('Left', '')), + 'Top': str(bounding_box.get('Top', '')), + 'Width': str(bounding_box.get('Width', '')), + }, + 'polygon': [ + {'X': str(p.get('X', '')), 'Y': str(p.get('Y', ''))} for p in polygon + ] + } + item['identifier_page'] = identifier_full + logger.info(f"identifier_page set to: {item['identifier_page']}") + item['isactive'] = True #default + item['output_id'] = block.get('Id','') + item['line_no'] = running_sequence_number + item['output_text'] = block.get('Text', '') + item['visibility'] = True + # Scan for existing item with same output_id + item['unique_key'] = f"{item['identifier_page']}_{item['line_no']}" # or line_no for lines + logger.info(f"Scanning DynamoDB for existing item with unique_key: {item['unique_key']}") + #print(f"Scanning DynamoDB for existing item with unique_key: {item['unique_key']}") + response = line_table.query( + IndexName='unique_key-index', + KeyConditionExpression=Key('unique_key').eq(item['unique_key']) + ) + if response['Items']: + skipped_lines.append(item['unique_key']) + logger.warning(f"Item with unique_key {item['unique_key']} already exists in DynamoDB. Skipping line insert/update.") + continue + else: + inserted_lines.append(item['unique_key']) + logger.info("No existing item found. Adding new item to DynamoDB.") + line_table.put_item(Item=convert_floats(item)) + total_text_with_info.append(item) + if inserted_lines: + print(f"\033[94mINFO: The following line unique_keys for {response_filename} did not exist in the line table and were inserted:\n{', '.join(inserted_lines)}\033[0m") + if skipped_lines: + print(f"\033[91mWARNING: The following line unique_keys for {response_filename} already exist in the line table and were skipped:\n{', '.join(skipped_lines)}\033[0m") + return total_text_with_info + +def getWordInformation(collection_of_textract_responses, response_filename): + inserted_words = [] + skipped_words = [] + logger.info("Parsing word information from Textract response...") + print("Parsing word information from Textract response...") + # This method ingests the texteact responses in JSON format, parses it and returns the word information + total_text_with_info = [] + running_sequence_number = 0 + logger.info(f"Response filename: {response_filename}") + blocks = collection_of_textract_responses['Blocks'] + logger.info(f"Number of blocks in response: {len(blocks)}") + + # DynamoDB setup + logger.info("Setting up DynamoDB resource and table for word info...") + print("Setting up DynamoDB resource and table for word info...") + dynamodb = boto3.resource('dynamodb') + word_table_name = os.getenv("TEXTRACT_WORD_TABLE") + word_table = dynamodb.Table(word_table_name) + print("Scanning DynamoDB for existing items in textract word table using the unique_key...") + # Iterate through all blocks in the Textract response + for block in blocks: + logger.info(f"Processing block: {block.get('BlockType')}") + # Only process blocks of type 'WORD' + if block['BlockType'] == 'WORD': + item = {} + running_sequence_number += 1 + logger.info(f"Word block found. Sequence number: {running_sequence_number}") + item['id'] = str(uuid.uuid4()) + identifier_full = response_filename + logger.info(f"Original identifier from JSON: {identifier_full}") + identifier_parts = identifier_full.split('_') + if len(identifier_parts) > 1: + item['identifier'] = '_'.join(identifier_parts[:-1]) + else: + item['identifier'] = identifier_full + logger.info(f"Processed identifier (without last digits): {item['identifier']}") + item['collection_category'] = 'federated' + item['confidence'] = block.get('Confidence', 0) + logger.info(f"Confidence: {item['confidence']}") + from datetime import datetime, timezone + now = datetime.now(timezone.utc).isoformat() + item['created_at'] = now + geom = block.get('Geometry', {}) + logger.info(f"Geometry: {geom}") + bounding_box = geom.get('BoundingBox', {}) + polygon = geom.get('Polygon', []) + # Store geometry information as strings for DynamoDB compatibility + item['geometry'] = { + 'boundingbox': { + 'Height': str(bounding_box.get('Height', '')), + 'Left': str(bounding_box.get('Left', '')), + 'Top': str(bounding_box.get('Top', '')), + 'Width': str(bounding_box.get('Width', '')), + }, + 'polygon': [ + {'X': str(p.get('X', '')), 'Y': str(p.get('Y', ''))} for p in polygon + ] + } + item['identifier_page'] = identifier_full + logger.info(f"identifier_page set to: {item['identifier_page']}") + item['isactive'] = True + item['output_id'] = block.get('Id','') + logger.info(f"output_id set to: {item['output_id']}") + item['texttype'] = block.get('TextType', '') + item['wordtext'] = block.get('Text', '') + logger.info(f"Word text: {item['wordtext']}") + item['word_no'] = running_sequence_number + item['visibility'] = True + + # Scan for existing item with same output_id + logger.info(f"Scanning DynamoDB for existing item with output_id in word table: {item['output_id']}") + item['unique_key'] = f"{item['identifier_page']}_{item['word_no']}" # or line_no for lines + response = word_table.query( + IndexName='unique_key-index', + KeyConditionExpression=Key('unique_key').eq(item['unique_key']) + ) + if response['Items']: + skipped_words.append(item['unique_key']) + logger.warning(f"Item with unique_key {item['unique_key']} already exists in DynamoDB. Skipping word insert/update.") + continue + else: + inserted_words.append(item['unique_key']) + logger.info(f"No existing item found. Adding new item to DynamoDB. {item}") + word_table.put_item(Item=convert_floats(item)) + total_text_with_info.append(item) + if inserted_words: + print(f"\033[94mINFO: The following word unique_keys for {response_filename} did not exist in the word table and were inserted:\n{', '.join(inserted_words)}\033[0m") + if skipped_words: + print(f"\033[91mWARNING: The following word unique_keys for {response_filename} already exist in the word table and were skipped:\n{', '.join(skipped_words)}\033[0m") + return total_text_with_info + +def parseJSON(jsonObject, response_filename): + logger.info("Parsing JSON object for line and word information...") + print("Parsing JSON object for line and word information...")# + # Helper function to parse line and word information. Takes in AWS JSON response + parsed_json = json.loads(jsonObject) + #shared_uuid = str(uuid.uuid4()) # Generate once + line_info = getLineInformation(parsed_json, response_filename) + word_info = getWordInformation(parsed_json, response_filename) + return line_info, word_info + +def clean_tmp(): + tmp_dir = '/tmp' + for filename in os.listdir(tmp_dir): + file_path = os.path.join(tmp_dir, filename) + try: + if os.path.isfile(file_path) or os.path.islink(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): + shutil.rmtree(file_path) + except Exception as e: + print(f'Failed to delete {file_path}. Reason: {e}') + logger.error(f'Failed to delete {file_path}. Reason: {e}') + +def preprocess_image_for_textract(s3, source_bucket, image, output_path, response_filename, textract_bucket): + """ + Preprocess image using OpenCV, upload to Textract bucket, and return image_document for Textract. + """ + import uuid + unique_id = str(uuid.uuid4()) + local_image_path = f'/tmp/preprocess_image_{unique_id}.jpg' + logger.info(f"Downloading image from S3 to: {local_image_path}") + print(f"Downloading image from S3 to: {local_image_path}") + s3.download_file(source_bucket, image, local_image_path) + response_filename = image.split('/')[-1].split('.')[0] + logger.info(f"Response filename for Textract: {response_filename}") + try: + print("Starting image preprocessing with OpenCV...") + logger.info("Starting image preprocessing with OpenCV...") + import cv2 + import numpy as np + img = cv2.imread(local_image_path) + logger.info(f"Original image shape: {img.shape}") + gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + logger.info("Converted image to grayscale.") + # Increase contrast + alpha = 1.5 # Contrast control + beta = -30 # Brightness control + contrast_img = cv2.convertScaleAbs(gray, alpha=alpha, beta=beta) + logger.info(f"Applied contrast (alpha={alpha}) and brightness (beta={beta}).") + # Adaptive thresholding to remove faint text + processed = cv2.adaptiveThreshold(contrast_img, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 21, 15) + logger.info("Applied adaptive thresholding to suppress reverse-side text.") + processed_path = f'/tmp/processed_image_{unique_id}.jpg' + cv2.imwrite(processed_path, processed) + logger.info(f"Processed image saved to: {processed_path}") + print(f"Processed image saved to: {processed_path}") + # Upload processed image to Textract bucket + processed_s3_key = output_path + '/textractResponse/' + str(response_filename) + '_preprocessed.jpg' + logger.info(f"Uploading processed image to Textract bucket at: {processed_s3_key}") + print(f"Uploading processed image to Textract bucket at: {processed_s3_key}") + s3.upload_file(processed_path, textract_bucket, processed_s3_key) + # Use processed image for Textract + image_document = { + 'S3Object' : { + 'Bucket' : textract_bucket, + 'Name' : processed_s3_key + } + } + print("Image preprocessing complete. Using processed image for Textract.") + logger.info("Image preprocessing complete. Using processed image for Textract.") + except Exception as e: + logger.error(f'Image preprocessing failed: {e}') + print(f'Image preprocessing failed: {e}') + logger.info(f"Image preprocessing failed: {e}. Using original image for Textract.") + print(f"Image preprocessing failed: {e}. Using original image for Textract.") + # Fallback to original image in textract bucket (should not happen unless you copy originals there) + image_document = { + 'S3Object' : { + 'Bucket' : textract_bucket, + 'Name' : image + } + } + return image_document + +def get_Text_Percentage_Images(imagePath): + tessdata_file = '/tmp/tessdata/eng.traineddata' + if not os.path.exists(tessdata_file): + raise RuntimeError(f"Missing tessdata file: {tessdata_file}") + images = Image.open(imagePath) + page = 0 + with_text = 0 + text = pytesseract.image_to_string(images) + if len(text) > 0: + with_text += 1 + page += 1 + percent_of_pages_with_text = (with_text / page) * 100 + return percent_of_pages_with_text + + +def download_tessdata_from_s3(bucket, key, download_path): + s3 = boto3.client('s3') + os.makedirs(os.path.dirname(download_path), exist_ok=True) + try: + s3.download_file(bucket, key, download_path) + print(f"Downloaded {key} from {bucket} to {download_path}") + except Exception as e: + print(f"Failed to download {key} from {bucket}: {e}") + raise + +# Remove call_textract from lambda_handler signature and logic (keep comments/prints) +def lambda_handler(event, context): + # Clean /tmp only once, before downloading tessdata + if os.environ.get("AWS_EXECUTION_ENV") is not None: + # Running in AWS Lambda + clean_tmp() + print("Cleaned /tmp directory.") + else: + print("Skipping /tmp cleanup for local run.") + logger.info("Skipping /tmp cleanup for local run.") + + # Download eng.traineddata from S3 to /tmp/tessdata/ + tessdata_bucket = 'opencvambdalay' + tessdata_key = 'tessdata/eng.traineddata' + tessdata_local_path = '/tmp/tessdata/eng.traineddata' + download_tessdata_from_s3(tessdata_bucket, tessdata_key, tessdata_local_path) + print("Files in /tmp/tessdata:", os.listdir('/tmp/tessdata')) + print("Does /tmp/tessdata/eng.traineddata exist?", os.path.exists('/tmp/tessdata/eng.traineddata')) + os.environ['TESSDATA_PREFIX'] = '/tmp/tessdata' + + print("Lambda handler started. Processing event...") + logger.info("Lambda handler started. Processing event...") + + print(f"Event received: {event}") + logger.info(f"Event received: {event}") + print("Initializing S3 client...") + logger.info("Initializing S3 client...") + s3 = boto3.client('s3') + records = event['Records'] + print(f"Number of records in event: {len(records)}") + logger.info(f"Number of records in event: {len(records)}") + # Use textract bucket for all response operations + textract_bucket = os.getenv("TEXTRACT_BUCKET") # fallback to your bucket name + for record in records: + body = record['body'] + logger.info(f"body: {body}") + logger.info(type(body)) + res = json.loads(body) + logger.info(f'Parsed JSON: {res}') + source_bucket = res['s3']['bucket']['name'] + filename = res["s3"]["object"]["key"] + response_filename = filename.split('/')[-1].split('.')[0] + output_path = '/'.join(filename.split('/')[:-2]) + logger.info(f'filename: {filename}') + logger.info(f'source_bucket: {source_bucket}') + logger.info(f'output_path: {output_path}') + # Extract collection name from the S3 key path (assumes a specific folder structure) + # Use passed collection_identifier if present, else fallback to extraction + collection_identifier = os.getenv("COLLECTION_IDENTIFIER") or res.get("collection_identifier") + if not collection_identifier: + collection_identifier = extract_collection_identifier(res) + if not collection_identifier: + raise ValueError("COLLECTION_IDENTIFIER is not set in the environment, event payload, or could not be inferred from the S3 key.") + logger.info(f"Collection identifier: {collection_identifier}") + try: + print("Starting image validation and preprocessing...") + logger.info("Starting image validation and preprocessing...") + image = str(filename) + # Only allow certain image types for Textract + if not image.lower().endswith(('.png', '.jpg','.jpeg')): + print("Image extension not valid. Raising error.") + logger.error("Image extension not valid. Raising error.") + raise ValueError("Invalid image source") + logger.info(f"Image to process: {image}") + + # Download image from source bucket to /tmp for analysis + local_image_path = f'/tmp/{os.path.basename(image)}' + s3.download_file(source_bucket, image, local_image_path) + + # --- Skip if <=10% text in the image --- + percent_text = get_Text_Percentage_Images(local_image_path) + print(f"Text percentage in image: {percent_text:.2f}%") + logger.info(f"Text percentage in image: {percent_text:.2f}%") + if percent_text <= 10: + logger.info(f"Image has {percent_text:.2f}% text. Skipping Textract and DynamoDB processing.") + print(f"\033[91mImage has {percent_text:.2f}% text. Skipping Textract and DynamoDB processing.\033[0m") + continue + print(f"\033[94mImage has {percent_text:.2f}% text. Proceeding with Textract processing.\033[0m") + logger.info(f"Image has {percent_text:.2f}% text. Proceeding with Textract processing.") + # Always check and save responses in textract bucket + existing_response_key = output_path + '/textractResponse/' + str(response_filename) + '.json' + try: + response_obj = s3.get_object(Bucket=textract_bucket, Key=existing_response_key) + response = json.loads(response_obj['Body'].read()) + print(f"\033[91mWARNING: Textract response already exists in the textract bucket.\033[0m") + print(f"\033[94mLoaded existing Textract response from S3.\033[0m") + logger.info(f"\033[94mLoaded existing Textract response from S3.\033[0m") + image_document = { + 'S3Object': { + 'Bucket': textract_bucket, + 'Name': image + } + } + except s3.exceptions.NoSuchKey: + print("No existing Textract response found. Preprocessing image...") + logger.info("No existing Textract response found. Preprocessing image...") + logger.info(f"s3: {s3}, textract_bucket: {textract_bucket}, image: {image}, output_path: {output_path}, response_filename: {response_filename}") + # Download image from source bucket, preprocess, and upload to textract bucket + image_document = preprocess_image_for_textract(s3, source_bucket, image, output_path, response_filename, textract_bucket) + # Update image_document to point to textract bucket + image_document['S3Object']['Bucket'] = textract_bucket + image_document['S3Object']['Name'] = output_path + '/textractResponse/' + str(response_filename) + '_preprocessed.jpg' + response = textract_client.detect_document_text(Document=image_document) + logger.info(f"Textract response: {response}") + + logger.info("Saving Textract response to S3...") + # Save the Textract response JSON to S3 for later use + s3.put_object( + Bucket=textract_bucket, + Key=output_path + '/textractResponse/' + str(response_filename) + '.json', + Body=json.dumps(response), + Metadata={'collection_identifier': collection_identifier} + ) + logger.info('Successfully saved textract responses') + print('Successfully saved textract responses') + + logger.info("Parsing Textract response for line and word info...") + print("Parsing Textract response for line and word info...") + # Parse the Textract response for line and word info + json_contents = json.dumps(response) + line_info, word_info = parseJSON(json_contents, response_filename) + logger.info(f"Line info: {line_info}") + logger.info("Saving line information to S3...") + print("Saving line information to S3...") + s3.put_object(Bucket= textract_bucket, Key=output_path+'/textractResponse/'+ str(response_filename)+'_line_information.json', Body=json.dumps(line_info),Metadata={'collection_identifier':collection_identifier}) + logger.info('Line information successfully saved!') + print('Line information successfully saved!') + logger.info("Saving word information to S3...") + print("Saving word information to S3...") + s3.put_object(Bucket= textract_bucket, Key=output_path+'/textractResponse/'+ str(response_filename)+'_word_information.json', Body=json.dumps(word_info),Metadata={'collection_identifier':collection_identifier}) + logger.info('Word information successfully saved!') + print('Word information successfully saved!') + + except ClientError as err: + error_message = "Couldn't analyze image. " + err.response['Error']['Message'] + print(f"ClientError: {error_message}") + logger.info(f"ClientError: {error_message}") + + print("Lambda handler completed.") + logger.info("Lambda handler completed.") + return { + 'statusCode': 200, + 'body': json.dumps('Success!') + } + + +def extract_collection_identifier(res): + # Try environment variable or event payload first + collection_identifier = os.getenv("COLLECTION_IDENTIFIER") or res.get("collection_identifier") + if not collection_identifier: + filename = res["s3"]["object"]["key"] + parts = filename.split('/') + + # Try to extract as folder below 'federated' + if 'federated' in parts: + federated_idx = parts.index('federated') + if len(parts) > federated_idx + 1: + collection_identifier = parts[federated_idx + 1] + + # If not found, try two folders above 'Access' + if (not collection_identifier or collection_identifier == "unknown") and 'Access' in parts: + access_idx = parts.index('Access') + if access_idx >= 2: + collection_identifier = parts[access_idx - 2] + + # Fallback to previous logic + if not collection_identifier or collection_identifier == "unknown": + collection_identifier = parts[1] if len(parts) >= 2 else "unknown" + + if not collection_identifier: + raise ValueError("COLLECTION_IDENTIFIER is not set in the environment, event payload, or could not be inferred from the S3 key.") + return collection_identifier + + +if __name__ == "__main__": + # For local testing: load a test event and run the handler + import json + with open("test_event.json") as f: + test_event = json.load(f) + lambda_handler(test_event, None) + diff --git a/test_event_example.json b/test_event_example.json new file mode 100644 index 0000000..a50d059 --- /dev/null +++ b/test_event_example.json @@ -0,0 +1,7 @@ +{ + "Records": [ + { + "body": "{\"s3\": {\"bucket\": {\"name\": \"i*****-***-*****-*****\"}, \"object\": {\"key\": \"f********/***********/*****************/******/********************.jpg\"}}}" + } + ] +} \ No newline at end of file