Implementing the Serving Layer of Lambda Architecture using Redshift
This post is a part of a series on Lambda Architecture consisting of:
- Introduction to Lambda Architecture
- Implementing Data Ingestion using Apache Kafka, Tweepy
- Implementing Batch Layer using Kafka, S3, Redshift
- Implementing Speed Layer using Spark Structured Streaming
- Implementing Serving Layer using Redshift
You can also follow a walk-through of the code in this Youtube video:
and you can find the source code of the series here.
Purpose in Lambda Architecture:
- merge the output of speed and batch layer aggregations
- achieve this by:
- every couple of hours run the re-computation
- use the output of batch layer as base table
- upsert the up-to-date values of speed layer into the base table
Contents:
Requirements
1
import psycopg2
Creating the serving layer
- authenticate and create a connection using psycopg module
- create and populate a temporary table with it’s base being batch layer and upserting the speed layer
- drop the current serving layer and use the above mentioned temporary table for serving layer (no downtime)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
config = { 'dbname': 'lambda',
'user':'x',
'pwd':'x',
'host':'data-warehouse.x.us-east-1.redshift.amazonaws.com',
'port':'5439'
}
conn = psycopg2.connect(dbname=config['dbname'], host=config['host'],
port=config['port'], user=config['user'],
password=config['pwd'])
curs = conn.cursor()
curs.execute("""
DROP TABLE IF EXISTS serving_layer_temp;
SELECT
*
INTO
serving_layer_temp
FROM
batch_layer ;
UPDATE
serving_layer_temp
SET
count_id = count_id + speed_layer."count(id)",
sum_followers_count = sum_followers_count + speed_layer."sum(followers_count)",
sum_favorite_count = sum_favorite_count + speed_layer."sum(favorite_count)",
sum_retweet_count = sum_retweet_count + speed_layer."sum(retweet_count)"
FROM
speed_layer
WHERE
serving_layer_temp.location = speed_layer.location ;
INSERT INTO
serving_layer_temp
SELECT
*
FROM
speed_layer
WHERE
speed_layer.location
NOT IN (
SELECT
DISTINCT location
FROM
serving_layer_temp
) ;
drop table serving_layer ;
alter table serving_layer_temp
rename to serving_layer ;
""")
curs.close()
conn.commit()
conn.close()
You can find the code from this blog post in this github repository.
This post is licensed under CC BY 4.0 by the author.
Comments powered by Disqus.