Our Infrastructure put a log file in S3 bucket. I need to get only this new Log file from the S3 bucket and parse it put into DB. The Log parse Servers are clustered which might be inturruped while downloading and might download again which another cluster server is currently parsing the log file.
There is a solution. When new files are created, the S3 Bucket will send a message to a Simple Queue Service(SQS). The Log Parsing Servers will pull the messages every minute. If a message already pulled, SQS will hide a message in determined period until the delete action. The delete action will be called from parsing server after downloading.
To run the code, you will need a python3 or higher and boto3(Amazon Web Services (AWS) SDK for Python)
yum install epel-release yum install python34-pip pip3 install boto3
There is a python3 code sample.
import boto3 import json import traceback import os import logging, sys import tarfile #logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) # Get the service resource sqs = boto3.resource('sqs', region_name='eu-central-1') s3_client = boto3.client('s3', region_name='eu-central-1') # Get the queue queue = sqs.get_queue_by_name(QueueName='Your SQS Queue Name') for message in queue.receive_messages(MaxNumberOfMessages=10): try: if message is not None: # Parsing event message from s3 bucket s3 = json.loads(message.body)['Records'][0]['s3'] bucket = s3['bucket']['name'] key = s3['object']['key'] logging.debug('bucket :'+bucket) logging.debug('key :'+key) # Get filename and directory from key filename = os.path.basename(key) directory = '/your_prefix_dir/' + os.path.dirname(key) logging.debug('filename :'+filename) logging.debug('directory :'+directory) # Create Directory if it is not exist if not os.path.exists(directory): os.makedirs(directory) # Download Log File s3_client.download_file(bucket, key, directory + filename) logging.debug('Download completed') # Extract tar.gz File tar = tarfile.open(directory + filename, "r:gz") tar.extractall(directory) tar.close() logging.debug('extract file completed') # Remove tar.gz File os.remove(directory + filename) # Remove SQS message message.delete() except ValueError: # SQS is dedicated for S3 event message. If there is wrong message from other service, leave message body and remove the message logging.error('Message format is not valid. Delete message :' + message.body) message.delete() except Exception: logging.error(traceback.format_exc()) else: logging.info('finish')
If you see message from the S3 bucket like below command. It is ready to pull the log file by the python script. Watch out, the SQS hide the message default 20 second. it won't be visiable.
aws sqs receive-message --queue-url https://sqs.eu-central-1.amazonaws.com/your account number/queueName --max-number-of-messages 1 --region eu-central-1
you can excute the python script like below 20 seconds later. flock is for avoiding concurrent excution.
flock -n /home/your Dir/lock/s3-copy.lock -c "/usr/bin/python3 /your_prefix_dir/s3-copy.py"
The log file are downloaded and the tar.gz files are extracted.
No comments:
Post a Comment