Post

Implementing the Batch Layer of Lambda Architecture using S3, Redshift and Apache Kafka

This post is a part of a series on Lambda Architecture consisting of:

You can also follow a walk-through of the code in this Youtube video:

and you can find the source code of the series here.


Purpose in Lambda Architecture:

  • store all the tweets that were produced by Kafka Producer into S3
  • export them into Redshift
  • perform aggregation on the tweets to get the desired output of batch layer
  • achieve this by:
    • every couple of hours get the latest unseen tweets produced by the Kafka Producer and store them into a S3 archive
    • every night run a sql query to compute the result of batch layer

Contents:

Required libraries

1
2
3
4
5
from kafka import KafkaConsumer
from io import StringIO
import boto3
import time
import random

Defining the Kafka consumer

  • setting the location of Kafka Broker
  • specifying the group_id and consumer_timeout
  • subsribing to a topic
1
2
3
4
5
6
7
8
9
consumer = KafkaConsumer(
                        bootstrap_servers='localhost:9092',
                        auto_offset_reset='latest',  # Reset partition offsets upon OffsetOutOfRangeError
                        group_id='test',   # must have a unique consumer group id 
                        consumer_timeout_ms=10000)  
                                # How long to listen for messages - we do it for 10 seconds 
                                # because we poll the kafka broker only each couple of hours

consumer.subscribe('tweets-lambda1')

Defining a Amazon Web Services S3 storage client

  • setting the autohrizaition and bucket
1
2
3
4
5
6
7
8
s3_resource = boto3.resource(
    's3',
    aws_access_key_id='x',
    aws_secret_access_key='x',
)

s3_client = s3_resource.meta.client
bucket_name = 'lambda-architecture123'

Writing the data into a S3 bucket

  • polling the Kafka Broker
  • aggregating the latest messages into a single object in the bucket
1
2
3
4
5
6
7
def store_twitter_data(path):
    csv_buffer = StringIO() # S3 storage is object storage -> our document is just a large string

    for message in consumer: # this acts as "get me an iterator over the latest messages I haven't seen"
        csv_buffer.write(message.value.decode() + 'n') 

    s3_resource.Object(bucket_name,path).put(Body=csv_buffer.getvalue())

Exporting data from S3 bucket to Amazon Redshift using COPY command

  • authenticate and create a connection using psycopg module
  • export data using COPY command from S3 to Redshift “raw” table
1
2
3
4
5
6
7
8
9
10
import psycopg2
config = { 'dbname': 'lambda', 
           'user':'x',
           'pwd':'x',
           'host':'data-warehouse.x.us-east-1.redshift.amazonaws.com',
           'port':'5439'
         }
conn =  psycopg2.connect(dbname=config['dbname'], host=config['host'], 
                              port=config['port'], user=config['user'], 
                              password=config['pwd'])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def copy_files(conn, path):
    curs = conn.cursor()
    curs.execute(""" 
        copy 
        batch_raw
        from 
        's3://lambda-architecture123/""" + path + """'  
        access_key_id 'x'
        secret_access_key 'x'
        delimiter ';'
        region 'eu-central-1'
    """)
    curs.close()
    conn.commit()

Computing the batch layer output

  • querying the raw tweets stored in redshift to get the desired batch layer output
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def compute_batch_layer(conn):
    curs = conn.cursor()
    curs.execute(""" 
        drop table if exists batch_layer;

        with raw_dedup as (
        SELECT
            distinct id,created_at,followers_count,location,favorite_count,retweet_count
        FROM
            batch_raw
        ),
        batch_result as (SELECT
            location,
            count(id) as count_id,
            sum(followers_count) as sum_followers_count,
            sum(favorite_count) as sum_favorite_count,
            sum(retweet_count) as sum_retweet_count
        FROM
            raw_dedup
        group by 
            location
        )
        select 
            *
        INTO
            batch_layer
        FROM
            batch_result""")
    curs.close()
    conn.commit()
1
# compute_batch_layer(conn)

Deployment

  • perform the task every couple of hours and wait in between
1
2
3
4
5
6
7
8
9
10
def periodic_work(interval):
    while True:
        path = 'apple-tweets/'+ time.strftime("%Y/%m/%d/%H") + '_tweets_' + str(random.randint(1,1000)) + '.log'
        store_twitter_data(path)
        time.sleep(interval/2)
        copy_files(conn, path)
        #interval should be an integer, the number of seconds to wait
        time.sleep(interval/2)

periodic_work(60* 4) ## 4 minutes !
1
2
3
4
# run at the end of the day
compute_batch_layer(conn)

conn.close()

You can find the code from this blog post in this github repository.

This post is licensed under CC BY 4.0 by the author.

Comments powered by Disqus.