Implementing the Batch Layer of Lambda Architecture using S3, Redshift and Apache Kafka
This post is a part of a series on Lambda Architecture consisting of:
- Introduction to Lambda Architecture
- Implementing Data Ingestion using Apache Kafka, Tweepy
- Implementing Batch Layer using Kafka, S3, Redshift
- Implementing Speed Layer using Spark Structured Streaming
- Implementing Serving Layer using Redshift
You can also follow a walk-through of the code in this Youtube video:
and you can find the source code of the series here.
Purpose in Lambda Architecture:
- store all the tweets that were produced by Kafka Producer into S3
- export them into Redshift
- perform aggregation on the tweets to get the desired output of batch layer
- achieve this by:
- every couple of hours get the latest unseen tweets produced by the Kafka Producer and store them into a S3 archive
- every night run a sql query to compute the result of batch layer
Contents:
- Defining the Kafka consumer
- Defining a Amazon Web Services S3 storage client
- Writing the data into a S3 bucket
- Exporting data from S3 bucket to Amazon Redshift using COPY command
- Aggregating “raw” tweets in Redshift
- Deployment
Required libraries
1
2
3
4
5
from kafka import KafkaConsumer
from io import StringIO
import boto3
import time
import random
Defining the Kafka consumer
- setting the location of Kafka Broker
- specifying the group_id and consumer_timeout
- subsribing to a topic
1
2
3
4
5
6
7
8
9
consumer = KafkaConsumer(
bootstrap_servers='localhost:9092',
auto_offset_reset='latest', # Reset partition offsets upon OffsetOutOfRangeError
group_id='test', # must have a unique consumer group id
consumer_timeout_ms=10000)
# How long to listen for messages - we do it for 10 seconds
# because we poll the kafka broker only each couple of hours
consumer.subscribe('tweets-lambda1')
Defining a Amazon Web Services S3 storage client
- setting the autohrizaition and bucket
1
2
3
4
5
6
7
8
s3_resource = boto3.resource(
's3',
aws_access_key_id='x',
aws_secret_access_key='x',
)
s3_client = s3_resource.meta.client
bucket_name = 'lambda-architecture123'
Writing the data into a S3 bucket
- polling the Kafka Broker
- aggregating the latest messages into a single object in the bucket
1
2
3
4
5
6
7
def store_twitter_data(path):
csv_buffer = StringIO() # S3 storage is object storage -> our document is just a large string
for message in consumer: # this acts as "get me an iterator over the latest messages I haven't seen"
csv_buffer.write(message.value.decode() + 'n')
s3_resource.Object(bucket_name,path).put(Body=csv_buffer.getvalue())
Exporting data from S3 bucket to Amazon Redshift using COPY command
- authenticate and create a connection using psycopg module
- export data using COPY command from S3 to Redshift “raw” table
1
2
3
4
5
6
7
8
9
10
import psycopg2
config = { 'dbname': 'lambda',
'user':'x',
'pwd':'x',
'host':'data-warehouse.x.us-east-1.redshift.amazonaws.com',
'port':'5439'
}
conn = psycopg2.connect(dbname=config['dbname'], host=config['host'],
port=config['port'], user=config['user'],
password=config['pwd'])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def copy_files(conn, path):
curs = conn.cursor()
curs.execute("""
copy
batch_raw
from
's3://lambda-architecture123/""" + path + """'
access_key_id 'x'
secret_access_key 'x'
delimiter ';'
region 'eu-central-1'
""")
curs.close()
conn.commit()
Computing the batch layer output
- querying the raw tweets stored in redshift to get the desired batch layer output
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def compute_batch_layer(conn):
curs = conn.cursor()
curs.execute("""
drop table if exists batch_layer;
with raw_dedup as (
SELECT
distinct id,created_at,followers_count,location,favorite_count,retweet_count
FROM
batch_raw
),
batch_result as (SELECT
location,
count(id) as count_id,
sum(followers_count) as sum_followers_count,
sum(favorite_count) as sum_favorite_count,
sum(retweet_count) as sum_retweet_count
FROM
raw_dedup
group by
location
)
select
*
INTO
batch_layer
FROM
batch_result""")
curs.close()
conn.commit()
1
# compute_batch_layer(conn)
Deployment
- perform the task every couple of hours and wait in between
1
2
3
4
5
6
7
8
9
10
def periodic_work(interval):
while True:
path = 'apple-tweets/'+ time.strftime("%Y/%m/%d/%H") + '_tweets_' + str(random.randint(1,1000)) + '.log'
store_twitter_data(path)
time.sleep(interval/2)
copy_files(conn, path)
#interval should be an integer, the number of seconds to wait
time.sleep(interval/2)
periodic_work(60* 4) ## 4 minutes !
1
2
3
4
# run at the end of the day
compute_batch_layer(conn)
conn.close()
You can find the code from this blog post in this github repository.
This post is licensed under CC BY 4.0 by the author.
Comments powered by Disqus.