Streaming tweets with Kafka and Spark: Difference between revisions
| (37 intermediate revisions by the same user not shown) | |||
| Line 1: | Line 1: | ||
''This | ''This and later exercises assume you are running Ubuntu Linux, either natively or through WSL 2.'' | ||
== Download and install Kafka == | == Download and install Kafka == | ||
| Line 7: | Line 7: | ||
$ cd /opt | $ cd /opt | ||
Download and install a recent Kafka, for example: | Download and install a [https://kafka.apache.org/downloads recent Kafka], for example: | ||
$ wget https://dlcdn.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz # to download | $ wget https://dlcdn.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz # to download | ||
$ tar xzf kafka_2.13-3.2.1.tgz # to unzip and unpack | $ tar xzf kafka_2.13-3.2.1.tgz # to unzip and unpack | ||
| Line 15: | Line 15: | ||
$ bin/zookeeper-server-start.sh config/zookeeper.properties | $ bin/zookeeper-server-start.sh config/zookeeper.properties | ||
$ bin/kafka-server-start.sh config/server.properties | $ bin/kafka-server-start.sh config/server.properties | ||
(You can also run them in the same window by adding ''&'' | (You can also run them in the same window by adding ''&'' to the end of each line.) | ||
Steps 2-5 in [https://kafka.apache.org/quickstart this guide] shows how you can test basic Kafka from the command line. | Steps 2-5 in [https://kafka.apache.org/quickstart this guide] shows how you can test basic Kafka from the command line. | ||
| Line 23: | Line 23: | ||
== Streaming tweets to Kafka == | == Streaming tweets to Kafka == | ||
Create a Kafka topic called ''tweets'': | Create a Kafka topic called ''tweets'': | ||
$ bin/kafka- | $ bin/kafka-topics.sh --create --topic tweets --bootstrap-server localhost:9092 | ||
Task 1: Go back to '''tweet_harvester.py''' from Exercise 2, and change the code to write the stream of tweets to a Kafka topic instead. Here are some basic steps and code pieces you will need: | '''Task 1:''' Go back to '''tweet_harvester.py''' from Exercise 2, and change the code to write the stream of tweets to a Kafka topic instead of a socket. Here are some basic steps and code pieces you will need: | ||
$ pip install kafka-python | $ pip install kafka-python | ||
| Line 37: | Line 37: | ||
producer.send('tweets', json_bytes) | producer.send('tweets', json_bytes) | ||
Step 5 in [https://kafka.apache.org/quickstart this guide] shows how you can receive Kafka messages from the command line: | |||
$ bin/kafka-console-consumer.sh --topic tweets --from-beginning --bootstrap-server localhost:9092 | |||
This guide presents more [https://towardsdatascience.com/3-libraries-you-should-know-to-master-apache-kafka-in-python-c95fdf8700f2 examples of using kafka-python]. | |||
== Building a distributed architecture == | |||
The following illustration shows one way to organise the solution to this exercise: | |||
[[File:Twitterpipe.png]] | |||
== Receive tweets from Kafka with streaming Spark == | == Receive tweets from Kafka with streaming Spark == | ||
'''Task 2:''' Go back to the '''spark_receiver.py''' from Exercise 2, and change the code to receive | '''Task 2:''' Go back to the '''spark_receiver.py''' from Exercise 2, and change the code to receive tweets from a Kafka topic instead of a socket. | ||
This time you do not need to '''import kafka-python''', but you need a package called ''spark-sql-kafka'' to run Kafka with streaming Spark. The easiest way to get started is something like: | This time you do not need to '''import kafka-python''', but you need a package called ''spark-sql-kafka'' to run Kafka with structured streaming Spark. The easiest way to get started is something like: | ||
import findspark | import findspark | ||
| Line 49: | Line 57: | ||
findspark.add_packages('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0') | findspark.add_packages('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0') | ||
''It is important to get the versions right!'': | ''It is important to get the Scala and Spark versions right!'' In the example above: | ||
* 0-10 is the version of ''spark-sql-kafka'' | * 0-10 is the version of ''spark-sql-kafka'' | ||
* 2.12 is the version of Scala you are running | * 2.12 is the version of Scala you are running | ||
* 3.3.0 is your version of Spark | * 3.3.0 is your version of Spark | ||
One way to find out which Scala and Spark versions you need is to look at the Java packages installed in ''venv/lib/python3.x/site-packages/pyspark/jars/''. | |||
The line '''findspark.add_packages(...)''' downloads ''spark-sql-kafka'' and its dependencies anew each time your application starts. Note that you most likely ''cannot'' use the Java packages you installed along with Kafka because they may be a different version. | |||
Here is generic Python code to read a structured stream from Kafka: | |||
Here is generic code to read from Kafka: | |||
incoming_df = spark.readStream \ | incoming_df = spark.readStream \ | ||
.format('kafka') \ | .format('kafka') \ | ||
| Line 66: | Line 73: | ||
.load() | .load() | ||
You can use the same '''.writeStream''' as before to print the stream to console. | |||
This transformation decodes the tweet into a string: | |||
from pyspark.sql.functions import decode | from pyspark.sql.functions import decode | ||
streaming_df = incoming_df \ | streaming_df = incoming_df \ | ||
.select(decode(incoming_df.value, 'utf-8').alias('tweet')) | .select(decode(incoming_df.value, 'utf-8').alias('tweet')) | ||
== Sentiment analysis of tweets == | |||
[https://pypi.org/project/afinn/ Afinn] is a much used Python API for sentiment analysis: | |||
(venv) $ pip install afinn | |||
'''Task 3:''' Create a Spark user-defined function (UDF) that takes a text as input and returns its Afinn sentiment score. (The slides and notebook from Session 3 contains a UDF example.) | |||
'''Task 4:''' Create a Spark pipeline that reads the ''tweet'' topic produced by ''tweet_harvester.py'' as before, and uses the new UDF to add an ''afinn_score'' to each tweet. | |||
== Twitter language metadata == | |||
'''Task 5:''' Use Twitter's language detector to filter out languages you do not want to analyse. | |||
To get the language code along with the ''id'' and ''text'' for each tweet, you must add '''lang''' to the '''tweet_fields''' option when you call '''StreamingTwitter.sample()''' in ''tweet_harvester.py''. | |||
For more details, see the [https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/tweet Twitter API v2 data dictionary] and tweepy's [https://docs.tweepy.org/en/stable/streamingclient.html StreamingClient documentation]. There are many other interesting fields available, for example ''context_annotations'' and ''entities''. Unfortunately, the ''geo'' field is often '''null'''... | |||
''The next few tasks are examples of further things you can do with Spark and Kafka. All of them are highly relevant for you programming projects! At the end you will also find some general advice for how to proceed.'' | |||
== Twitter media metadata == | |||
'''Task 6:''' Get information about included media for each tweet. | |||
To achieve this, you must set the '''extensions''' option to '''['attachments.media_keys']''' when you call '''StreamingTwitter.sample()''' and also add fields like '''media_key,type,url''' to the '''media_fields''' option. | |||
In ''tweet_harvester.py'', when you receive the streaming tweets with media fields, code like the one suggested earlier will not quite do the job: | |||
def on_data(json_data): | |||
json_obj = json.loads(json_data.decode()) | |||
json_str = json.dumps(json_obj['data'])+'\r\n' | |||
json_bytes = json_str.encode() | |||
producer.send('tweets', json_bytes) | |||
The reason is that the media fields are not in '''json_obj['data']''', but in '''json_obj['includes']['media']'''. To handle this, you can either | |||
# pick out the JSON fields you need already in the tweet harvester and receive them in a "flat" JSON object in your Spark pipeline, or | |||
# pass all the '''json_data''' on as they are and use the Spark pipeline to pick out the fields you want. | |||
If you choose the latter option, here is how you can pass all the '''json_data''' on unprocessed: | |||
def on_data(json_data): | |||
producer.send('tweets', json_data) | |||
If you pass all the '''json_data''' on unprocessed, here is how you can pick out (a few of the) nested JSON fields in Spark: | |||
from pyspark.sql.functions import decode, get_json_object | |||
tweet_sdf = spark.readStream. (...as before...) .load() | |||
selected_tweet_sdf = tweet_sdf \ | |||
.select('*', decode(tweet_sdf.value, 'utf-8').alias('json_str')) \ | |||
.select('*', get_json_object('json_str', '$.data.id').alias('id')) \ | |||
.select('*', get_json_object('json_str', '$.includes.media').alias('media')) | |||
The new ''media'' field is a JSON string that represents a list of JSON objects, which must be unpacked further. Tips on how to do this: | |||
from pyspark.sql.types import ArrayType, StringType | |||
from pyspark.sql.functions import from_json | |||
list_schema = ArrayType(StringType()) | |||
... | |||
.select('*', from_json('media', list_schema).alias('media_list')) \ | |||
... | |||
You can then chose to '''.explode''' the ''media_list'' further. | |||
== Spark output stream to Kafka == | |||
In a terminal window, create new ''media'' topic. As before, you can use a command like this: | |||
$ bin/kafka-topics.sh --create --topic media --bootstrap-server localhost:9092 | |||
(To remove all messages from a Kafka topic, you can use '''--delete''' instead of '''--create''' in the above command, and then use '''--create''' to recreate the topic.) | |||
'''Task 7:''' To your Spark pipeline, add a new output stream to the new Kafka ''media'' topic. For each tweet with an ''id'', a (media) ''type'' and a ''url'', the output stream should receive at least these three fields. | |||
== Media downloader == | |||
'''Task 8:''' Create a new Python program ''media_downloader.py'' that consumes the ''media'' topic. Here are some useful pieces of code: | |||
from kafka import KafkaConsumer, KafkaProducer | |||
... | |||
self.consumer = KafkaConsumer( | |||
'media', | |||
bootstrap_servers='localhost:9092', | |||
client_id='media-downloader', | |||
group_id='media-downloader-group', | |||
enable_auto_commit=False) | |||
... | |||
try: | |||
for message in self.consumer: | |||
...handle each message... | |||
except KeyboardInterrupt: | |||
...shut down... | |||
The first version of the program can do something really simple, like using Python's '''requests''' library to download media with type ''photo'', and perhaps return the photo size in bytes. | |||
'''Task 9:''' Create a new Kafka topic, for example ''photos''. Extend ''media_downloader.py'' so that, for each photo, it writes (at least) the tweet 'id' and photo 'size' back to Kafka. | |||
== Spark join == | |||
'''Task 10:''' Add a Spark pipeline to read tweet ''id'' and photo ''size'' from the ''photos'' topic. Perform an '''inner''' join on the ''id'' column to combine the results of sentiment analysis and external photo analysis. | |||
'''Task 11:''' The inner join in task 10 only returns tweets with photos. A ''left outer join'' of the sentiment analysis frame (left) and photo analysis frame (right) will return a Row for every tweet, but with some null values for the tweets without photos. | |||
Perform such an '''outerLeft''' join on the tweet ids. To achieve this, you need to: | |||
# pick out the ''timestamp''-s in addition to the ''value''-s from the incoming Kafka streams of tweets and photos, | |||
# rename the ''id'' and ''timestamp'' columns in the streaming photo DataFrame, | |||
# define the ''timestamp'' and (renamed) ''photo_timestamp'' columns as watermarks, for example with a 10 minute threshold, and | |||
# add an explicit '''outerLeft''' join expression over the ''id''-s and ''timestamp''-s. | |||
Tips: | |||
* to pick out timestamps from incoming Kafka streams, for example: | |||
.select(photo_sdf.timestamp, | |||
decode(photo_sdf.value, 'utf-8').alias('json_str')) \ | |||
* to rename columns, for example: | |||
.withColumnRenamed('timestamp', 'photo_timestamp') \ | |||
* to add a watermark, for example: | |||
.withWatermark('photo_timestamp', '10 minutes') | |||
For more inforation, see [https://spark.apache.org/docs/3.3.0/structured-streaming-programming-guide.html#handling-late-data-and-watermarking Handling Late Data and Watermarking] and [https://spark.apache.org/docs/3.3.0/structured-streaming-programming-guide.html#outer-joins-with-watermarking Outer Joins with Watermarking] in the [https://spark.apache.org/docs/3.3.0/structured-streaming-programming-guide.html Structured Streaming Programming Guide]. | |||
== Some practical advice == | |||
* Do all of this incrementally, making sure that each step works completely before you start the next. Keep the whole pipeline running as you proceed. | |||
* It is easiest to write this as separate Python programs that you run in separate windows, for example: ''tweet_harvester.py'', ''spark_pipeline.py'', and ''media_downloader.py''. It is possible to run everything as a single complete program, but you risk race conditions that can be hard to debug. | |||
* Also start a kafka-consumer for each topic in separate windows, and keep them running as you develop. That way you have more control of what happens, e.g.: | |||
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic media --from-beginning | |||
* You do not need to flood Kafka by sampling tweets continuously. Instead you can configure ''tweet_harvester.py'' to harvest a small number of tweets (e.g., 3) each time you want to test something new. | |||
* When a Spark producer crashes, and in a few other cases, a Kafka topic can be left in an inconsistent state. If that happens, you can try to delete your ''checkpoint''-folder. | |||
Latest revision as of 15:22, 13 October 2022
This and later exercises assume you are running Ubuntu Linux, either natively or through WSL 2.
Download and install Kafka
You need to install Kafka "by hand" outside the usual package managers apt, yum, homebrew, etc. It is good practice to have a separate folder for such software, for example /opt:
$ sudo mkdir /opt $ sudo chown your_username:your_username /opt $ cd /opt
Download and install a recent Kafka, for example:
$ wget https://dlcdn.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz # to download $ tar xzf kafka_2.13-3.2.1.tgz # to unzip and unpack $ cd kafka_2.13-3.2.1 # to test
To start Kakfa, run the following two commands in sequence in two separate terminal windows (as for Spark, you need to have Java >8 on your machine):
$ bin/zookeeper-server-start.sh config/zookeeper.properties $ bin/kafka-server-start.sh config/server.properties
(You can also run them in the same window by adding & to the end of each line.)
Steps 2-5 in this guide shows how you can test basic Kafka from the command line.
(If you want, you can now add /opt/kafka_2.13-3.2.1/bin to the end of your PATH and put it in your .bashrc file. To restart Zookeper and Kafka after you reboot your machine, you can add the appropriate commands to your .profile file - use ending &-s.)
Streaming tweets to Kafka
Create a Kafka topic called tweets:
$ bin/kafka-topics.sh --create --topic tweets --bootstrap-server localhost:9092
Task 1: Go back to tweet_harvester.py from Exercise 2, and change the code to write the stream of tweets to a Kafka topic instead of a socket. Here are some basic steps and code pieces you will need:
$ pip install kafka-python
In your Python code:
from kafka import KafkaProducer
...
# connect to the kafka topic
producer = KafkaProducer(bootstrap_servers='localhost:9092')
...
# send a bytearray to the Kafka topic 'tweets'
producer.send('tweets', json_bytes)
Step 5 in this guide shows how you can receive Kafka messages from the command line:
$ bin/kafka-console-consumer.sh --topic tweets --from-beginning --bootstrap-server localhost:9092
This guide presents more examples of using kafka-python.
Building a distributed architecture
The following illustration shows one way to organise the solution to this exercise:
Receive tweets from Kafka with streaming Spark
Task 2: Go back to the spark_receiver.py from Exercise 2, and change the code to receive tweets from a Kafka topic instead of a socket.
This time you do not need to import kafka-python, but you need a package called spark-sql-kafka to run Kafka with structured streaming Spark. The easiest way to get started is something like:
import findspark
...
findspark.init()
findspark.add_packages('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
It is important to get the Scala and Spark versions right! In the example above:
- 0-10 is the version of spark-sql-kafka
- 2.12 is the version of Scala you are running
- 3.3.0 is your version of Spark
One way to find out which Scala and Spark versions you need is to look at the Java packages installed in venv/lib/python3.x/site-packages/pyspark/jars/.
The line findspark.add_packages(...) downloads spark-sql-kafka and its dependencies anew each time your application starts. Note that you most likely cannot use the Java packages you installed along with Kafka because they may be a different version.
Here is generic Python code to read a structured stream from Kafka:
incoming_df = spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'localhost:9092') \
.option('subscribe', 'tweets') \
.option('startingOffsets', 'earliest') \
.load()
You can use the same .writeStream as before to print the stream to console.
This transformation decodes the tweet into a string:
from pyspark.sql.functions import decode
streaming_df = incoming_df \
.select(decode(incoming_df.value, 'utf-8').alias('tweet'))
Sentiment analysis of tweets
Afinn is a much used Python API for sentiment analysis:
(venv) $ pip install afinn
Task 3: Create a Spark user-defined function (UDF) that takes a text as input and returns its Afinn sentiment score. (The slides and notebook from Session 3 contains a UDF example.)
Task 4: Create a Spark pipeline that reads the tweet topic produced by tweet_harvester.py as before, and uses the new UDF to add an afinn_score to each tweet.
Twitter language metadata
Task 5: Use Twitter's language detector to filter out languages you do not want to analyse.
To get the language code along with the id and text for each tweet, you must add lang to the tweet_fields option when you call StreamingTwitter.sample() in tweet_harvester.py.
For more details, see the Twitter API v2 data dictionary and tweepy's StreamingClient documentation. There are many other interesting fields available, for example context_annotations and entities. Unfortunately, the geo field is often null...
The next few tasks are examples of further things you can do with Spark and Kafka. All of them are highly relevant for you programming projects! At the end you will also find some general advice for how to proceed.
Twitter media metadata
Task 6: Get information about included media for each tweet.
To achieve this, you must set the extensions option to ['attachments.media_keys'] when you call StreamingTwitter.sample() and also add fields like media_key,type,url to the media_fields option.
In tweet_harvester.py, when you receive the streaming tweets with media fields, code like the one suggested earlier will not quite do the job:
def on_data(json_data):
json_obj = json.loads(json_data.decode())
json_str = json.dumps(json_obj['data'])+'\r\n'
json_bytes = json_str.encode()
producer.send('tweets', json_bytes)
The reason is that the media fields are not in json_obj['data'], but in json_obj['includes']['media']. To handle this, you can either
- pick out the JSON fields you need already in the tweet harvester and receive them in a "flat" JSON object in your Spark pipeline, or
- pass all the json_data on as they are and use the Spark pipeline to pick out the fields you want.
If you choose the latter option, here is how you can pass all the json_data on unprocessed:
def on_data(json_data):
producer.send('tweets', json_data)
If you pass all the json_data on unprocessed, here is how you can pick out (a few of the) nested JSON fields in Spark:
from pyspark.sql.functions import decode, get_json_object
tweet_sdf = spark.readStream. (...as before...) .load()
selected_tweet_sdf = tweet_sdf \
.select('*', decode(tweet_sdf.value, 'utf-8').alias('json_str')) \
.select('*', get_json_object('json_str', '$.data.id').alias('id')) \
.select('*', get_json_object('json_str', '$.includes.media').alias('media'))
The new media field is a JSON string that represents a list of JSON objects, which must be unpacked further. Tips on how to do this:
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import from_json
list_schema = ArrayType(StringType())
...
.select('*', from_json('media', list_schema).alias('media_list')) \
...
You can then chose to .explode the media_list further.
Spark output stream to Kafka
In a terminal window, create new media topic. As before, you can use a command like this:
$ bin/kafka-topics.sh --create --topic media --bootstrap-server localhost:9092
(To remove all messages from a Kafka topic, you can use --delete instead of --create in the above command, and then use --create to recreate the topic.)
Task 7: To your Spark pipeline, add a new output stream to the new Kafka media topic. For each tweet with an id, a (media) type and a url, the output stream should receive at least these three fields.
Media downloader
Task 8: Create a new Python program media_downloader.py that consumes the media topic. Here are some useful pieces of code:
from kafka import KafkaConsumer, KafkaProducer
...
self.consumer = KafkaConsumer(
'media',
bootstrap_servers='localhost:9092',
client_id='media-downloader',
group_id='media-downloader-group',
enable_auto_commit=False)
...
try:
for message in self.consumer:
...handle each message...
except KeyboardInterrupt:
...shut down...
The first version of the program can do something really simple, like using Python's requests library to download media with type photo, and perhaps return the photo size in bytes.
Task 9: Create a new Kafka topic, for example photos. Extend media_downloader.py so that, for each photo, it writes (at least) the tweet 'id' and photo 'size' back to Kafka.
Spark join
Task 10: Add a Spark pipeline to read tweet id and photo size from the photos topic. Perform an inner join on the id column to combine the results of sentiment analysis and external photo analysis.
Task 11: The inner join in task 10 only returns tweets with photos. A left outer join of the sentiment analysis frame (left) and photo analysis frame (right) will return a Row for every tweet, but with some null values for the tweets without photos.
Perform such an outerLeft join on the tweet ids. To achieve this, you need to:
- pick out the timestamp-s in addition to the value-s from the incoming Kafka streams of tweets and photos,
- rename the id and timestamp columns in the streaming photo DataFrame,
- define the timestamp and (renamed) photo_timestamp columns as watermarks, for example with a 10 minute threshold, and
- add an explicit outerLeft join expression over the id-s and timestamp-s.
Tips:
- to pick out timestamps from incoming Kafka streams, for example:
.select(photo_sdf.timestamp,
decode(photo_sdf.value, 'utf-8').alias('json_str')) \
- to rename columns, for example:
.withColumnRenamed('timestamp', 'photo_timestamp') \
- to add a watermark, for example:
.withWatermark('photo_timestamp', '10 minutes')
For more inforation, see Handling Late Data and Watermarking and Outer Joins with Watermarking in the Structured Streaming Programming Guide.
Some practical advice
- Do all of this incrementally, making sure that each step works completely before you start the next. Keep the whole pipeline running as you proceed.
- It is easiest to write this as separate Python programs that you run in separate windows, for example: tweet_harvester.py, spark_pipeline.py, and media_downloader.py. It is possible to run everything as a single complete program, but you risk race conditions that can be hard to debug.
- Also start a kafka-consumer for each topic in separate windows, and keep them running as you develop. That way you have more control of what happens, e.g.:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic media --from-beginning
- You do not need to flood Kafka by sampling tweets continuously. Instead you can configure tweet_harvester.py to harvest a small number of tweets (e.g., 3) each time you want to test something new.
- When a Spark producer crashes, and in a few other cases, a Kafka topic can be left in an inconsistent state. If that happens, you can try to delete your checkpoint-folder.

