Streaming tweets with Kafka and Spark

From info319

This exercise assumes you are running Ubuntu Linux, either natively or through WSL 2.

Download and install Kafka

You need to install Kafka "by hand" outside the usual package managers apt, yum, homebrew, etc. It is good practice to have a separate folder for such software, for example /opt:

$ sudo mkdir /opt
$ sudo chown your_username:your_username /opt
$ cd /opt

Download and install a recent Kafka, for example:

$ wget https://dlcdn.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz  # to download
$ tar xzf kafka_2.13-3.2.1.tgz  # to unzip and unpack
$ cd kafka_2.13-3.2.1  # to test

To start Kakfa, run the following two commands in sequence in two separate terminal windows (as for Spark, you need to have Java >8 on your machine):

$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties

(You can also run them in the same window by adding & to the end of each line.)

Steps 2-5 in this guide shows how you can test basic Kafka from the command line.

(If you want, you can now add /opt/kafka_2.13-3.2.1/bin to the end of your PATH and put it in your .bashrc file. To restart Zookeper and Kafka after you reboot your machine, you can add the appropriate commands to your .profile file - use ending &-s.)

Streaming tweets to Kafka

Create a Kafka topic called tweets:

$ bin/kafka-console-consumer.sh --topic tweets --from-beginning --bootstrap-server localhost:9092

Task 1: Go back to tweet_harvester.py from Exercise 2, and change the code to write the stream of tweets to a Kafka topic instead of a socket. Here are some basic steps and code pieces you will need:

$ pip install kafka-python

In your Python code:

from kafka import KafkaProducer
...
# connect to the kafka topic
producer = KafkaProducer(bootstrap_servers='localhost:9092')
...
# send a bytearray to the Kafka topic 'tweets'
producer.send('tweets', json_bytes)

Step 5 in this guide shows how you can receive Kafka messages from the command line:

$ bin/kafka-console-consumer.sh --topic tweets --from-beginning --bootstrap-server localhost:9092

This guide presents more examples of using kafka-python.

Receive tweets from Kafka with streaming Spark

Task 2: Go back to the spark_receiver.py from Exercise 2, and change the code to receive the stream of tweets from a Kafka topic instead.

This time you do not need to import kafka-python, but you need a package called spark-sql-kafka to run Kafka with streaming Spark. The easiest way to get started is something like:

import findspark
...
findspark.init()
findspark.add_packages('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')

It is important to get the versions right!:

  • 0-10 is the version of spark-sql-kafka
  • 2.12 is the version of Scala you are running
  • 3.3.0 is your version of Spark

It is important to get the Scala and Spark versions right. One way to check is to look at the Java packages you already have installed in venv/lib/python3.8/site-packages/pyspark/jars/.

findspark.add_packages(...) downloads spark-sql-kafka and its dependencies from scratch very time you start your application. When you are sure you have got the versions right, you can look for ways to install them permanently. Not that you most likely cannot use any Java packages you installed along with Kafka, because they may not have the right version.

Here is generic code to read from Kafka:

incoming_df = spark.readStream \
                .format('kafka') \
                .option('kafka.bootstrap.servers', 'localhost:9092') \
                .option('subscribe', 'tweets') \
                .option('startingOffsets', 'earliest') \
                .load()

from pyspark.sql.functions import decode
streaming_df = incoming_df \
                .select(decode(incoming_df.value, 'utf-8').alias('tweet'))