Streaming tweets with Kafka and Spark
This exercise assumes you are running Ubuntu Linux, either natively or through WSL 2.
Download and install Kafka
You need to install Kafka "by hand" outside the usual package managers apt, yum, homebrew, etc. It is good practice to have a separate folder for such software, for example /opt:
$ sudo mkdir /opt $ sudo chown your_username:your_username /opt $ cd /opt
Download and install a recent Kafka, for example:
$ wget https://dlcdn.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz # to download $ tar xzf kafka_2.13-3.2.1.tgz # to unzip and unpack $ cd kafka_2.13-3.2.1 # to test
To start Kakfa, run the following two commands in sequence in two separate terminal windows (as for Spark, you need to have Java >8 on your machine):
$ bin/zookeeper-server-start.sh config/zookeeper.properties $ bin/kafka-server-start.sh config/server.properties
(You can also run them in the same window by adding & to the end of each line.)
Steps 2-5 in this guide shows how you can test basic Kafka from the command line.
(If you want, you can now add /opt/kafka_2.13-3.2.1/bin to the end of your PATH and put it in your .bashrc file. To restart Zookeper and Kafka after you reboot your machine, you can add the appropriate commands to your .profile file - use ending &-s.)
Streaming tweets to Kafka
Create a Kafka topic called tweets:
$ bin/kafka-console-consumer.sh --topic tweets --from-beginning --bootstrap-server localhost:9092
Task 1: Go back to tweet_harvester.py from Exercise 2, and change the code to write the stream of tweets to a Kafka topic instead of a socket. Here are some basic steps and code pieces you will need:
$ pip install kafka-python
In your Python code:
from kafka import KafkaProducer
...
# connect to the kafka topic
producer = KafkaProducer(bootstrap_servers='localhost:9092')
...
# send a bytearray to the Kafka topic 'tweets'
producer.send('tweets', json_bytes)
Step 5 in this guide shows how you can receive Kafka messages from the command line:
$ bin/kafka-console-consumer.sh --topic tweets --from-beginning --bootstrap-server localhost:9092
This guide presents more examples of using kafka-python.
Receive tweets from Kafka with streaming Spark
Task 2: Go back to the spark_receiver.py from Exercise 2, and change the code to receive tweets from a Kafka topic instead of a socket.
This time you do not need to import kafka-python, but you need a package called spark-sql-kafka to run Kafka with structured streaming Spark. The easiest way to get started is something like:
import findspark
...
findspark.init()
findspark.add_packages('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
It is important to get the Scala and Spark versions right! In the example above:
- 0-10 is the version of spark-sql-kafka
- 2.12 is the version of Scala you are running
- 3.3.0 is your version of Spark
One way to find out which Scala and Spark versions you need, is to look at the Java packages installed in venv/lib/python3.x/site-packages/pyspark/jars/.
The line findspark.add_packages(...) downloads spark-sql-kafka and its dependencies anew each time you start your application. When you are sure you have got the versions right, you can look for ways to install them permanently. Not that you most likely cannot use the Java packages you installed along with Kafka because they may be a different version.
Here is generic code to read a structured stream from Kafka:
incoming_df = spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'localhost:9092') \
.option('subscribe', 'tweets') \
.option('startingOffsets', 'earliest') \
.load()
This transformation decodes the tweet into a string:
from pyspark.sql.functions import decode
streaming_df = incoming_df \
.select(decode(incoming_df.value, 'utf-8').alias('tweet'))
