Streaming tweets with Kafka and Spark

From info319

This exercise assumes you are running Ubuntu Linux, either natively or through WSL 2.

Download and install Kafka

You need to install Kafka "by hand" outside the usual package managers apt, yum, homebrew, etc. It is good practice to have a separate folder for such software, for example /opt:

$ sudo mkdir /opt
$ sudo chown your_username:your_username /opt
$ cd /opt

Download and install a recent Kafka, for example:

$ wget https://dlcdn.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz  # to download
$ tar xzf kafka_2.13-3.2.1.tgz  # to unzip and unpack
$ cd kafka_2.13-3.2.1  # to test

To start Kakfa, run the following two commands in sequence in two separate terminal windows (as for Spark, you need to have Java >8 on your machine):

$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties

(You can also run them in the same window by adding & to the end of each line.)

Steps 2-5 in this guide shows how you can test basic Kafka from the command line.

(If you want, you can now add /opt/kafka_2.13-3.2.1/bin to the end of your PATH and put it in your .bashrc file. To restart Zookeper and Kafka after you reboot your machine, you can add the appropriate commands to your .profile file - use ending &-s.)

Streaming tweets to Kafka

Create a Kafka topic called tweets:

$ bin/kafka-console-consumer.sh --topic tweets --from-beginning --bootstrap-server localhost:9092

Task 1: Go back to tweet_harvester.py from Exercise 2, and change the code to write the stream of tweets to a Kafka topic instead of a socket. Here are some basic steps and code pieces you will need:

$ pip install kafka-python

In your Python code:

from kafka import KafkaProducer
...
# connect to the kafka topic
producer = KafkaProducer(bootstrap_servers='localhost:9092')
...
# send a bytearray to the Kafka topic 'tweets'
producer.send('tweets', json_bytes)

Step 5 in this guide shows how you can receive Kafka messages from the command line:

$ bin/kafka-console-consumer.sh --topic tweets --from-beginning --bootstrap-server localhost:9092

This guide presents more examples of using kafka-python.

Receive tweets from Kafka with streaming Spark

Task 2: Go back to the spark_receiver.py from Exercise 2, and change the code to receive tweets from a Kafka topic instead of a socket.

This time you do not need to import kafka-python, but you need a package called spark-sql-kafka to run Kafka with structured streaming Spark. The easiest way to get started is something like:

import findspark
...
findspark.init()
findspark.add_packages('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')

It is important to get the Scala and Spark versions right! In the example above:

  • 0-10 is the version of spark-sql-kafka
  • 2.12 is the version of Scala you are running
  • 3.3.0 is your version of Spark

One way to find out which Scala and Spark versions you need is to look at the Java packages installed in venv/lib/python3.x/site-packages/pyspark/jars/.

The line findspark.add_packages(...) downloads spark-sql-kafka and its dependencies anew each time your application starts. Note that you most likely cannot use the Java packages you installed along with Kafka because they may be a different version.

Here is generic Python code to read a structured stream from Kafka:

incoming_df = spark.readStream \
                .format('kafka') \
                .option('kafka.bootstrap.servers', 'localhost:9092') \
                .option('subscribe', 'tweets') \
                .option('startingOffsets', 'earliest') \
                .load()

You can use the same .writeStream as before to print the stream to console.

This transformation decodes the tweet into a string:

from pyspark.sql.functions import decode
streaming_df = incoming_df \
                .select(decode(incoming_df.value, 'utf-8').alias('tweet'))

Sentiment analysis of tweets

Afinn is a much used Python API for sentiment analysis:

(venv) $ pip install afinn

Task 3: Create a Spark user-defined function (UDF) that takes a text as input and returns its Afinn sentiment score. (The slides and notebook from Session 3 contains a UDF example.)

Task 4: Create a Spark pipeline that reads the tweet topic produced by tweet_harvester.py as before, and uses the new UDF to add an afinn_score to each tweet.

Twitter language metadata

Task 5: Use Twitter's language detector to filter out languages you do not want to analyse.

To get the language code along with the id and text for each tweet, you must add lang to the tweet_fields option when you call StreamingTwitter.sample() in tweet_harvester.py.

For more details, see the Twitter API v2 data dictionary and tweepy's StreamingClient documentation. There are many other interesting fields available, for example context_annotations and entities. Unfortunately, the geo field is often null...

The next few tasks are examples of further things you can do with Spark and Kafka. All of them are highly relevant for you programming projects! At the end you will also find some general advice for how to proceed.

Twitter media metadata

Task 6: Get information about included media for each tweet.

To achieve this, you must set the extensions option to ['attachments.media_keys'] when you call StreamingTwitter.sample() and also add fields like media_key,type,url to the media_fields option.

In tweet_harvester.py, when you receive the streaming tweets with media fields, code like the one suggested earlier will not quite do the job:

def on_data(json_data):
    json_obj = json.loads(json_data.decode())
    json_str = json.dumps(json_obj['data'])+'\r\n'
    json_bytes = json_str.encode()
    producer.send('tweets', json_bytes)

The reason is that the media fields are not in json_obj['data'], but in json_obj['includes']['media']. To handle this, you can either

  1. pick out the JSON fields you need already in the tweet harvester and receive a "flat" JSON object in your Spark pipeline, or
  1. pass all the json_data on and use the Spark pipeline to pick out the fields you want.

If you choose the latter option, here is how you can pass all the json_data on unprocessed:

def on_data(json_data):
    producer.send('tweets', json_data)

If you pass all the json_data on unprocessed, here is how you can pick out (a few of the) nested JSON fields in Spark:

from pyspark.sql.functions import decode, get_json_object
tweet_sdf = spark.readStream. (...as before...) .load()
selected_tweet_sdf = tweet_sdf \
               .select('*', decode(tweet_sdf.value, 'utf-8').alias('json_str')) \
               .select('*', get_json_object('json_str', '$.data.id').alias('id')) \
               .select('*', get_json_object('json_str', '$.includes.media').alias('media'))

The new media field is a JSON string that represents a list of JSON objects, which must be unpacked further!

Spark output stream to Kafka

In a terminal window, create new media topic. As before, you can use a command like this:

$ bin/kafka-topics.sh --create --topic media --bootstrap-server localhost:9092

(To remove all messages from a Kafka topic, you can use --delete instead of --create in the above command, and then use --create to recreate the topic.)

Task 7: Create a Spark output stream to the new Kafka topic. For each tweet with an id, a (media) type and a url, the output stream should contain at least these three fields.

Media downloader

Task 8: Create a new Python program media_downloader.py that consumes the media topic.

from kafka import KafkaConsumer, KafkaProducer
...
        self.consumer = KafkaConsumer(
            'media',
            bootstrap_servers='localhost:9092',
            client_id='media-downloader',
            group_id='media-downloader-group',
            enable_auto_commit=False)
...
        try:
             for message in self.consumer:
                ...handle each message...
        except KeyboardInterrupt:
            ...shut down...

The first version of the program can do something really simple, like using the requests library to download media with type photo, and perhaps return the size in bytes.

Task 9: Create a new Kafka topic, for example photos. Extend the media_downloader.py so that, for each photo, it writes (at least) the tweet 'id' and photo 'size' back to Kafka.

Spark join

Task 10: Add a Spark pipeline to read tweet id and photo size from the photos topic. Perform an inner join on the id column to combine the results of sentiment analysis and external photo analysis.

Practical advice

  • Do all of this incrementally, making sure that each step works completely before you begin the next!
  • It is easiest to write this as separate Python programs that you run in separate windows, for example: tweet_harvester.py, spark_pipeline.py, and media_downloader.py. (It is possible to run everything as a single complete program, but you risk race conditions that can be hard to debug.)
  • Also start a kafka-consumer for each topic in separate windows, and keep them running as you develop. That way you have more control of what happens, e.g.:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic media --from-beginning
  • When a Spark producer crashes, and in a few other cases, a Kafka topic can be left in an inconsistent state. If that happens, try to delete your checkpoint-folder.