vardata_stream=spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","tweets-lambda1").option("startingOffsets","earliest")//or latest.load()// note how similar API is to the batch version
Result stage - performing transformations on the stream
extract the value column of kafka message
parse each row into a member of tweet class
filter to only look at todays tweets as results
perform aggregations
1
2
3
4
5
6
7
8
9
10
vardata_stream_cleaned=data_stream.selectExpr("CAST(value AS STRING) as string_value").as[String].map(x=>(x.split(";"))).map(x=>tweet(x(0),x(1),x(2),x(3),x(4),x(5))).selectExpr("cast(id as long) id","CAST(created_at as timestamp) created_at","cast(followers_count as int) followers_count","location","cast(favorite_count as int) favorite_count","cast(retweet_count as int) retweet_count").toDF().filter(col("created_at").gt(current_date()))// kafka will retain data for last 24 hours, this is needed because we are using complete mode as output.groupBy("location").agg(count("id"),sum("followers_count"),sum("favorite_count"),sum("retweet_count"))
Output stage
specify the following:
data sink - exporting to memory (table can be accessed similar to registerTempTable()/ createOrReplaceTempView() function )
trigger - time between running the pipeline (ie. when to do: polling for new data, data transformation)
output mode - complete, append or update - since in Result stage we use aggregates, we can only use Complete or Update out put mode
1
2
3
4
5
6
valquery=data_stream_cleaned.writeStream.format("memory").queryName("demo").trigger(ProcessingTime("60 seconds"))// means that that spark will look for new data only every minute.outputMode("complete")// could also be complete or update.start()
“overwriting” the table with results of query stored in memory as result of the speed layer
scheduling the function to run every hour
1
2
3
4
5
6
7
8
9
10
11
12
13
14
defexportToRedshift(){valdf=spark.sql("select * from demo")//writedatafromsparkdataframetodatabasedf.write.mode("overwrite").jdbc(url,table,prop)}valex=newScheduledThreadPoolExecutor(1)valtask=newRunnable{defrun()=exportToRedshift()}valf=ex.scheduleAtFixedRate(task,1,1,TimeUnit.HOURS)f.cancel(false)
Comments powered by Disqus.