Streaming tweets with Kafka and Spark
Install Kafka
This exercise will assume you are running Ubuntu Linux, either natively or through WSL 2. See steps 1-5 in this guide.
We have to install Kafka "ourselves", outside apt, yum, homebrew, etc. It is good practice to have a separate folder for that, for example /opt:
$ sudo mkdir /opt # or another folder you want to keep self-installed software in $ sudo chown your_username:your_username /opt $ cd /opt
Download and install a recent Kafka, for example:
$ wget https://dlcdn.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz # to download $ tar xzf kafka_2.13-3.2.1.tgz # to unzip and unpack $ cd kafka_2.13-3.2.1
(If you want, you can now add /opt//kafka_2.13-3.2.1/bin to the end of your PATH and put it in your .bashrc file.)
As for Spark, you also need to Java >8 on your machine! Steps 2-5 in this guide shows how you can test basic Kafka from the command line.
Streaming tweets to Kafka
Create a Kafka topic called tweets:
$ bin/kafka-console-consumer.sh --topic tweets --from-beginning --bootstrap-server localhost:9092
Task 1: Go back to tweet_harvester.py from Exercise 2, and change the code to write the stream of tweets to a Kafka topic instead. Here are some basic steps and code pieces you will need:
$ pip install kafka-python
In your Python code:
from kafka import KafkaProducer
...
# connect to the kafka topic
producer = KafkaProducer(bootstrap_servers='localhost:9092')
...
# send a bytearray to the Kafka topic 'tweets'
producer.send('tweets', json_bytes)
See steps 5 in this guide shows how you can test basic Kafka from the command line. This guide presents more examples of using kafka-python.
Receive tweets from Kafka with streaming Spark
Task 2: Go back to the spark_receiver.py from Exercise 2, and change the code to receive the stream of tweets from a Kafka topic instead.
This time you do not need to import kafka-python, but you need a package called spark-sql-kafka to run Kafka with streaming Spark. The easiest way to get started is something like:
import findspark
...
findspark.init()
findspark.add_packages('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
It is important to get the versions right!:
- 0-10 is the version of spark-sql-kafka
- 2.12 is the version of Scala you are running
- 3.3.0 is your version of Spark
It is important to get the Scala and Spark versions right. One way to check is to look at the Java packages you already have installed in venv/lib/python3.8/site-packages/pyspark/jars/.
findspark.add_packages(...) downloads spark-sql-kafka and its dependencies from scratch very time you start your application. When you are sure you have got the versions right, you can look for ways to install them permanently. Not that you most likely cannot use any Java packages you installed along with Kafka, because they may not have the right version.
Here is generic code to read from Kafka:
incoming_df = spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'localhost:9092') \
.option('subscribe', 'tweets') \
.option('startingOffsets', 'earliest') \
.load()
from pyspark.sql.functions import decode
streaming_df = incoming_df \
.select(decode(incoming_df.value, 'utf-8').alias('tweet'))
