Streaming tweets with Twitter API
Get Twitter API account
This was suggested preparations before the course started, but to remind you:
...head over to the Twitter Developer page and apply for a Developer Account. Get "Essential" access first. Then you can go on to apply for wider "Academic Research" access (but: that is mainly for Master students who have started the thesis work, so I am not sure what Twitter will say...). It is not mandatory to use Twitter data in the course, but it will be a convenient way to get started.'
The simple Essential account is a fine start and offers a lot of opportunities. When you have defined a more concrete project, you may apply for an Academic Research account. Then it is up to Twitter whether they will grant you access.
Usually, your Essential access will be granted quickly. Twitter offers several ways to authenticate yourself, but the easiest was is to use the Bearer Token. You can find it from your Twitter project page, under Keys and tokens. In your info319-exercises folder, create a keys/bearer_token file that only you can read:
(venv) $ mkdir keys (venv) $ chmod 700 keys (venv) $ touch keys/bearer_token (venv) $ chmod 600 keys/bearer_token (venv) $ echo "...long token string goes here..." > keys/bearer_token
Saving tweets to file
(venv) $ pip install tweepy
- Run this minimal boilerplate code to connect to Twitter's API and start downloading a stream of tweets (the code takes a few seconds to connect):
import json from time import sleep import threading import tweepy DURATION = 10 # hot many seconds to run for, including connection time class TweetHarvester: def __init__(self, bearer_token, duration): """Creates the tweepy client and starts the streaming thread.""" self.streaming_client = tweepy.StreamingClient(bearer_token) self.thread = threading.Thread( target=self.run, args=(duration,)) self.thread.start() def run(self, duration): """The streaming tweepy client runs this function.""" self.streaming_client.on_data = self.on_data self.streaming_client.sample(threaded=True) sleep(duration) self.streaming_client.disconnect() def on_data(self, json_data): """Tweepy calls this when it receives data from Twitter""" id_text = json.loads(json_data.decode()) print('Received tweet:', id_text) bearer_token = open('./keys/bearer_token').read().strip() harvester = TweetHarvester(bearer_token, DURATION)
Note how, every time a new tweet is received, tweepy calls the on_data()-function, which prints the tweeted text to the screen.
- Create code that saves the tweets to a file (for example as JSON or CSV). Create a subfolder for tweet files to keep your info319-exercise2 folder tidy.
- Change the code so you download and save additional information about each tweet, for example the handle (user name) of the tweeter, whether it is a retweet of another id, and perhaps the date and time.
- Refactor the code to prepare for cleaner termination. Remove the these lines from the run()-function (along with threaded=True in the call to sample() and along with everything about the duration variable):
sleep(duration) self.streaming_client.disconnect()
Instead, call disconnect() from on_data() when a given number of tweets (say 100 to start) have been received.
- Change the code so it instead writes the tweets to a file (which it closes properly on disconnect).
Streaming tweets to a socket
The following changes to the boilerplate code instead writes the stream of tweets to a network socket. Think of a socket as an internal internet connection from your machine back to itself. Different programs on your computer can use this socket to communicate using the regular internet APIs.
XXX
Change the code so it sends the stream of tweets to a socket (for example PORT 65000) instead of to a file. On termination, the program must close the socket properly using XXX.
You can use the nc utility to receive data from the socket. From another console window (inside or outside VS Code, a virtual environment is not needed):
$ apt install nc $ nc localhost 65000 # you must start StreamingClient in elsewhere before you run this command
IMPORTANT: When you debug, the socket will often remain open after your program has crashed (there is a timeout). So you may have to change PORT number often in the StreamingClient and the test code line (65001, 65002, ...).
Stream to Spark
Streaming Spark is the topic of Session3, but in this exercise we will write a simple Spark stream that receives and prints the Stream of tweets. It is based on [ https://sparkbyexamples.com/spark/spark-streaming-from-tcp-socket/ this example].
Get Twitter API account
This was suggested preparations before the course started, but to remind you:
...head over to the Twitter Developer page and apply for a Developer Account. Get "Essential" access first. Then you can go on to apply for wider "Academic Research" access (but: that is mainly for Master students who have started the thesis work, so I am not sure what Twitter will say...). It is not mandatory to use Twitter data in the course, but it will be a convenient way to get started.'
The simple Essential account is a fine start and offers a lot of opportunities. When you have defined a more concrete project, you may apply for an Academic Research account. Then it is up to Twitter whether they will grant you access.
Usually, your Essential access will be granted quickly. Twitter offers several ways to authenticate yourself, but the easiest was is to use the Bearer Token. You can find it from your Twitter project page, under Keys and tokens. In your info319-exercises folder, create a keys/bearer_token file that only you can read:
(venv) $ mkdir keys (venv) $ chmod 700 keys (venv) $ touch keys/bearer_token (venv) $ chmod 600 keys/bearer_token (venv) $ echo "...long token string goes here..." > keys/bearer_token
Saving tweets to file
(venv) $ pip install tweepy
Run the following minimal boilerplate code to connect to Twitter's API and start downloading a stream of tweets (the code takes a few seconds to connect):
import json from time import sleep import threading import tweepy DURATION = 10 # hot many seconds to run for, including connection time class TweetHarvester: def __init__(self, bearer_token, duration): """Creates the tweepy client and starts the streaming thread.""" self.streaming_client = tweepy.StreamingClient(bearer_token) self.thread = threading.Thread( target=self.run, args=(duration,)) self.thread.start() def run(self, duration): """The streaming tweepy client runs this function.""" self.streaming_client.on_data = self.on_data self.streaming_client.sample(threaded=True) sleep(duration) self.streaming_client.disconnect() def on_data(self, json_data): """Tweepy calls this when it receives data from Twitter""" id_text = json.loads(json_data.decode()) print('Received tweet:', id_text) bearer_token = open('./keys/bearer_token').read().strip() harvester = TweetHarvester(bearer_token, DURATION)
Note how, every time a new tweet is received, tweepy calls the on_data()-function, which prints the tweeted text to the screen.
Create code that saves the tweets to a file (for example as JSON or CSV). Create a subfolder for tweet files to keep your info319-exercise2 folder tidy.
Change the code so you download and save additional information about each tweet, for example the handle (user name) of the tweeter, whether it is a retweet of another id, and perhaps the date and time.
Refactor the code to prepare for cleaner termination. Remove the these lines from the run()-function (along with threaded=True in the call to sample() and along with everything about the duration variable):
sleep(duration) self.streaming_client.disconnect()
Instead, make the on_data()-function call disconnect() when a given number of tweets (say 100 to start) have been received.
- Change the code so it instead writes the tweets to a file (which it closes properly on disconnect).
Streaming tweets to a socket
Change the code to write the stream of tweets to a network socket instead. Think of a socket as an internal internet connection from your machine back to itself. Different programs on your computer can use this socket to communicate using the regular internet APIs.
Here are some critical pieces of code you need to include in your program:
import socket HOST = 'localhost' PORT = 65000 # for example, but it needs to be a high number
To open a socket for output (self. to add it inside TweetHarvester):
[self.]socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) [self.]socket.bind((host, port)) [self.]socket.listen() [self.]connection, self.address = self.socket.accept()
To send data on the socket:
def on_data(self, json_data): json_obj = json.loads(json_data.decode()) json_bytes = json.dumps(json_obj['data']).encode() self.connection.sendall(json_bytes) if self.tweet_count >= self.num_tweets: ...close the stream and the socket...
To run:
bearer_token = open('./keys/bearer_token').read().strip() harvester = TweetHarvester(bearer_token, HOST, PORT, NUM_TWEETS)
You can use the nc utility to receive data from the socket. From another console window (inside or outside VS Code, a virtual environment is not needed):
$ apt install nc $ nc localhost 65000 # you must start StreamingClient in elsewhere before you run this command
IMPORTANT: When you debug, the socket will often remain open after your program has crashed. So you may have to change PORT number often in the StreamingClient and the test code line (65001, 65002, ...).
Stream to Spark
Streaming Spark is the topic of Session 3, but you can run the following script spark_receiver.py to receive tweets from the socket as a Spark stream. It is based on this example:
import os import findspark from pyspark.sql import SparkSession HOST = os.environ['HOST'] if 'HOST' in os.environ else 'localhost' PORT = os.environ['PORT'] if 'PORT' in os.environ else '65000' PORT = int(PORT) findspark.init() spark = SparkSession.builder.appName('SparkTest').getOrCreate() spark.sparkContext.setLogLevel('ERROR') # incoming stream incoming_df = spark.readStream \ .format('socket') \ .option('host', HOST) \ .option('port', PORT) \ .load() # outgoing stream outgoing_df = incoming_df.select('value').writeStream \ .format('console') \ .outputMode('update') \ .start() \ .awaitTermination()
Run with:
(venv) $ export HOST=localhost PORT=65000; python spark_receiver.py
Instead of using import findspark; findspark.init() you can set the $SPARK_HOME environment variable, for example:
export SPARK_HOME=${PWD}/venv/lib/python3.8/site-packages/pyspark
Add this line to the end of your ~/.bashrc file to make it permanent.