Thursday, August 10, 2017

Download Files from AWS S3 Bucket via SQS message



Our Infrastructure put a log file in S3 bucket. I need to get only this new Log file from the S3 bucket and parse it put into DB. The Log parse Servers are clustered which might be inturruped while downloading and might download again which another cluster server is currently parsing the log file.
There is a solution. When new files are created, the S3 Bucket will send a message to a Simple Queue Service(SQS). The Log Parsing Servers will pull the messages every minute. If a message already pulled, SQS will hide a message in determined period until the delete action. The delete action will be called from parsing server after downloading.
To run the code, you will need a python3 or higher and boto3(Amazon Web Services (AWS) SDK for Python)
yum install epel-release
yum install python34-pip
pip3 install boto3



There is a python3 code sample.
import boto3
import json
import traceback
import os
import logging, sys
import tarfile

#logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)

# Get the service resource
sqs = boto3.resource('sqs', region_name='eu-central-1')
s3_client = boto3.client('s3', region_name='eu-central-1')

# Get the queue
queue = sqs.get_queue_by_name(QueueName='Your SQS Queue Name')

for message in queue.receive_messages(MaxNumberOfMessages=10):
  try:
    if message is not None:
      # Parsing event message from s3 bucket
      s3 = json.loads(message.body)['Records'][0]['s3']
      bucket = s3['bucket']['name']
      key = s3['object']['key']
      
      logging.debug('bucket :'+bucket)
      logging.debug('key :'+key)
      
      # Get filename and directory from key
      filename = os.path.basename(key)
      directory = '/your_prefix_dir/' + os.path.dirname(key)
      
      logging.debug('filename :'+filename)
      logging.debug('directory :'+directory)
      
      # Create Directory if it is not exist
      if not os.path.exists(directory):
          os.makedirs(directory)  
                          
      # Download Log File
      s3_client.download_file(bucket, key, directory + filename)
      logging.debug('Download completed')
      
      # Extract tar.gz File
      tar = tarfile.open(directory + filename, "r:gz")
      tar.extractall(directory)
      tar.close()  
      logging.debug('extract file completed')
         
      # Remove tar.gz File
      os.remove(directory + filename)
      
      # Remove SQS message    
      message.delete()
      
  except ValueError:
    # SQS is dedicated for S3 event message. If there is wrong message from other service, leave message body and remove the message
    logging.error('Message format is not valid. Delete message :' + message.body)
    message.delete()
  except Exception:
    logging.error(traceback.format_exc()) 
  else:
    logging.info('finish')



If you see message from the S3 bucket like below command. It is ready to pull the log file by the python script. Watch out, the SQS hide the message default 20 second. it won't be visiable.
aws sqs receive-message --queue-url https://sqs.eu-central-1.amazonaws.com/your account number/queueName --max-number-of-messages 1 --region eu-central-1

you can excute the python script like below 20 seconds later. flock is for avoiding concurrent excution.
flock -n /home/your Dir/lock/s3-copy.lock -c "/usr/bin/python3 /your_prefix_dir/s3-copy.py"

The log file are downloaded and the tar.gz files are extracted.

No comments:

Post a Comment