Streaming tweets with Kafka and Spark: Difference between revisions

From info319
Line 52: Line 52:
  findspark.add_packages('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
  findspark.add_packages('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')


''It is important to get the versions right!'':
''It is important to get the Scala and Spark versions right'':
* 0-10 is the version of ''spark-sql-kafka''
* 0-10 is the version of ''spark-sql-kafka''
* 2.12 is the version of Scala you are running
* 2.12 is the version of Scala you are running
* 3.3.0 is your version of Spark
* 3.3.0 is your version of Spark
One way to find out which Scala and Spark versions you need, is to look at the Java packages installed in ''venv/lib/python3.x/site-packages/pyspark/jars/''.


It is important to get the Scala and Spark versions right. One way to check is to look at the Java packages you already have installed in ''venv/lib/python3.8/site-packages/pyspark/jars/''.
The line '''findspark.add_packages(...)''' downloads ''spark-sql-kafka'' and its dependencies anew each time you start your application.  When you are sure you have got the versions right, you can look for ways to install them permanently. Not that you most likely ''cannot'' use the Java packages you installed along with Kafka because they are not the right version.


''findspark.add_packages(...)'' downloads ''spark-sql-kafka'' and its dependencies from scratch very time you start your application.  When you are sure you have got the versions right, you can look for ways to install them permanently. Not that you most likely ''cannot'' use any Java packages you installed along with Kafka, because they may not have the right version.
Here is generic code to read a structured stream from Kafka:
 
Here is generic code to read from Kafka:
  incoming_df = spark.readStream \
  incoming_df = spark.readStream \
                 .format('kafka') \
                 .format('kafka') \
Line 69: Line 68:
                 .load()
                 .load()
   
   
This transformation decodes the tweet into a string:
  from pyspark.sql.functions import decode
  from pyspark.sql.functions import decode
  streaming_df = incoming_df \
  streaming_df = incoming_df \
                 .select(decode(incoming_df.value, 'utf-8').alias('tweet'))
                 .select(decode(incoming_df.value, 'utf-8').alias('tweet'))

Revision as of 10:55, 18 September 2022

This exercise assumes you are running Ubuntu Linux, either natively or through WSL 2.

Download and install Kafka

You need to install Kafka "by hand" outside the usual package managers apt, yum, homebrew, etc. It is good practice to have a separate folder for such software, for example /opt:

$ sudo mkdir /opt
$ sudo chown your_username:your_username /opt
$ cd /opt

Download and install a recent Kafka, for example:

$ wget https://dlcdn.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz  # to download
$ tar xzf kafka_2.13-3.2.1.tgz  # to unzip and unpack
$ cd kafka_2.13-3.2.1  # to test

To start Kakfa, run the following two commands in sequence in two separate terminal windows (as for Spark, you need to have Java >8 on your machine):

$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties

(You can also run them in the same window by adding & to the end of each line.)

Steps 2-5 in this guide shows how you can test basic Kafka from the command line.

(If you want, you can now add /opt/kafka_2.13-3.2.1/bin to the end of your PATH and put it in your .bashrc file. To restart Zookeper and Kafka after you reboot your machine, you can add the appropriate commands to your .profile file - use ending &-s.)

Streaming tweets to Kafka

Create a Kafka topic called tweets:

$ bin/kafka-console-consumer.sh --topic tweets --from-beginning --bootstrap-server localhost:9092

Task 1: Go back to tweet_harvester.py from Exercise 2, and change the code to write the stream of tweets to a Kafka topic instead of a socket. Here are some basic steps and code pieces you will need:

$ pip install kafka-python

In your Python code:

from kafka import KafkaProducer
...
# connect to the kafka topic
producer = KafkaProducer(bootstrap_servers='localhost:9092')
...
# send a bytearray to the Kafka topic 'tweets'
producer.send('tweets', json_bytes)

Step 5 in this guide shows how you can receive Kafka messages from the command line:

$ bin/kafka-console-consumer.sh --topic tweets --from-beginning --bootstrap-server localhost:9092

This guide presents more examples of using kafka-python.

Receive tweets from Kafka with streaming Spark

Task 2: Go back to the spark_receiver.py from Exercise 2, and change the code to receive tweets from a Kafka topic instead of a socket.

This time you do not need to import kafka-python, but you need a package called spark-sql-kafka to run Kafka with structured streaming Spark. The easiest way to get started is something like:

import findspark
...
findspark.init()
findspark.add_packages('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')

It is important to get the Scala and Spark versions right:

  • 0-10 is the version of spark-sql-kafka
  • 2.12 is the version of Scala you are running
  • 3.3.0 is your version of Spark

One way to find out which Scala and Spark versions you need, is to look at the Java packages installed in venv/lib/python3.x/site-packages/pyspark/jars/.

The line findspark.add_packages(...) downloads spark-sql-kafka and its dependencies anew each time you start your application. When you are sure you have got the versions right, you can look for ways to install them permanently. Not that you most likely cannot use the Java packages you installed along with Kafka because they are not the right version.

Here is generic code to read a structured stream from Kafka:

incoming_df = spark.readStream \
                .format('kafka') \
                .option('kafka.bootstrap.servers', 'localhost:9092') \
                .option('subscribe', 'tweets') \
                .option('startingOffsets', 'earliest') \
                .load()

This transformation decodes the tweet into a string:

from pyspark.sql.functions import decode
streaming_df = incoming_df \
                .select(decode(incoming_df.value, 'utf-8').alias('tweet'))