Post

Implementing the Serving Layer of Lambda Architecture using Redshift

This post is a part of a series on Lambda Architecture consisting of:

You can also follow a walk-through of the code in this Youtube video:

and you can find the source code of the series here.


Purpose in Lambda Architecture:

  • merge the output of speed and batch layer aggregations
  • achieve this by:
    • every couple of hours run the re-computation
    • use the output of batch layer as base table
    • upsert the up-to-date values of speed layer into the base table

Contents:

Requirements

1
import psycopg2

Creating the serving layer

  • authenticate and create a connection using psycopg module
  • create and populate a temporary table with it’s base being batch layer and upserting the speed layer
  • drop the current serving layer and use the above mentioned temporary table for serving layer (no downtime)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
config = { 'dbname': 'lambda', 
           'user':'x',
           'pwd':'x',
           'host':'data-warehouse.x.us-east-1.redshift.amazonaws.com',
           'port':'5439'
         }
conn =  psycopg2.connect(dbname=config['dbname'], host=config['host'], 
                              port=config['port'], user=config['user'], 
                              password=config['pwd'])

curs = conn.cursor()
curs.execute(""" 
    DROP TABLE IF EXISTS serving_layer_temp; 

    SELECT 
         *
    INTO 
        serving_layer_temp
    FROM 
        batch_layer ;


    UPDATE 
        serving_layer_temp
    SET
        count_id = count_id + speed_layer."count(id)",
        sum_followers_count = sum_followers_count + speed_layer."sum(followers_count)",
        sum_favorite_count = sum_favorite_count + speed_layer."sum(favorite_count)",
        sum_retweet_count = sum_retweet_count + speed_layer."sum(retweet_count)"
    FROM
        speed_layer
    WHERE 
        serving_layer_temp.location = speed_layer.location ;


    INSERT INTO 
        serving_layer_temp
    SELECT 
        * 
    FROM 
        speed_layer
    WHERE 
        speed_layer.location 
    NOT IN (
        SELECT 
            DISTINCT location 
        FROM 
            serving_layer_temp 
    ) ;
    drop table serving_layer ;
    
    alter table serving_layer_temp
    rename to serving_layer ;        
    
""")
curs.close()
conn.commit()
conn.close()

You can find the code from this blog post in this github repository.

This post is licensed under CC BY 4.0 by the author.

Comments powered by Disqus.