getting the API object using authorization information
you can find more details on how to get the authorization here: https://developer.twitter.com/en/docs/basics/authentication/overview
1
2
3
4
5
6
7
8
9
10
11
# twitter setup
consumer_key="x"consumer_secret="x"access_token="x"access_token_secret="x"# Creating the authentication object
auth=tweepy.OAuthHandler(consumer_key,consumer_secret)# Setting your access token and secret
auth.set_access_token(access_token,access_token_secret)# Creating the API object by passing in auth information
api=tweepy.API(auth)
A helper function to normalize the time a tweet was created with the time of our system
1
2
3
4
5
6
fromdatetimeimportdatetime,timedeltadefnormalize_timestamp(time):mytime=datetime.strptime(time,"%Y-%m-%d %H:%M:%S")mytime+=timedelta(hours=1)# the tweets are timestamped in GMT timezone, while I am in +1 timezone
return (mytime.strftime("%Y-%m-%d %H:%M:%S"))
formatting and sending the data to proper topic on the Kafka Broker
resulting tweets have following attributes:
id
created_at
followers_count
location
favorite_count
retweet_count
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
defget_twitter_data():res=api.search("Apple OR iphone OR iPhone")foriinres:record=''record+=str(i.user.id_str)record+=';'record+=str(normalize_timestamp(str(i.created_at)))record+=';'record+=str(i.user.followers_count)record+=';'record+=str(i.user.location)record+=';'record+=str(i.favorite_count)record+=';'record+=str(i.retweet_count)record+=';'producer.send(topic_name,str.encode(record))
1
get_twitter_data()
Deployment
perform the task every couple of minutes and wait in between
1
2
3
4
5
defperiodic_work(interval):whileTrue:get_twitter_data()#interval should be an integer, the number of seconds to wait
time.sleep(interval)
1
periodic_work(60*1) # get data every couple of minutes
You can find the code from this blog post in this github repository.
Comments powered by Disqus.