Streaming tweets with Kafka and Spark: Difference between revisions

From info319
Created page with "== Streaming tweets with Kafka and Spark == === Install Kafka === This exercise will assume you are running Ubuntu Linux, either natively or through WSL 2. See [https://kafk..."
 
No edit summary
Line 1: Line 1:
== Streaming tweets with Kafka and Spark ==
== Install Kafka ==
 
=== Install Kafka ===
 
This exercise will assume you are running Ubuntu Linux, either natively or through WSL 2. See [https://kafka.apache.org/quickstart steps 1-5 in this guide].
This exercise will assume you are running Ubuntu Linux, either natively or through WSL 2. See [https://kafka.apache.org/quickstart steps 1-5 in this guide].


We have to install Kafka "ourselves", outside '''apt''', etc. It is good practice to have a separate folder for that, for example /opt
We have to install Kafka "ourselves", outside '''apt''', etc. It is good practice to have a separate folder for that, for example /opt
  $ sudo mkdir /opt  # or another folder you want to keep self-installed software in
  $ sudo mkdir /opt  # or another folder you want to keep self-installed software in
  $ sudo chown your_username:your_username /opt
  $ sudo chown your_username:your_username /opt
  $ cd /opt
  $ cd /opt


Download and install a recent Kafka, for example:
Download and install a recent Kafka, for example:
 
  $ wget https://dlcdn.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz  # to download
  $ wget https://dlcdn.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz  # to download
  $ tar xzf kafka_2.13-3.2.1.tgz  # to unzip and unpack
  $ tar xzf kafka_2.13-3.2.1.tgz  # to unzip and unpack
  $ cd kafka_2.13-3.2.1
  $ cd kafka_2.13-3.2.1


Line 28: Line 19:


== Streaming tweets to Kafka ==
== Streaming tweets to Kafka ==
Create a Kafka topic called ''tweets'':
Create a Kafka topic called ''tweets'':
  $ bin/kafka-console-consumer.sh --topic tweets --from-beginning --bootstrap-server localhost:9092
  $ bin/kafka-console-consumer.sh --topic tweets --from-beginning --bootstrap-server localhost:9092


Task 1: Go back to '''tweet_harvester.py''' from Exercise 2, and change the code to write the stream of tweets to a Kafka topic instead. Here are some basic steps and code pieces you will need:
Task 1: Go back to '''tweet_harvester.py''' from Exercise 2, and change the code to write the stream of tweets to a Kafka topic instead. Here are some basic steps and code pieces you will need:
  $ pip install kafka-python
  $ pip install kafka-python


In your Python code:
  from kafka import KafkaProducer
  from kafka import KafkaProducer
  ...
  ...
  # connect to the kafka topic
  # connect to the kafka topic
  producer = KafkaProducer(bootstrap_servers='localhost:9092')
  producer = KafkaProducer(bootstrap_servers='localhost:9092')
  ...
  ...
  # send a bytearray to the Kafka topic 'tweets'
  # send a bytearray to the Kafka topic 'tweets'
  producer.send('tweets', json_bytes)
  producer.send('tweets', json_bytes)


Line 54: Line 38:
This guide presents more [https://towardsdatascience.com/3-libraries-you-should-know-to-master-apache-kafka-in-python-c95fdf8700f2 examples of using kafka-python].
This guide presents more [https://towardsdatascience.com/3-libraries-you-should-know-to-master-apache-kafka-in-python-c95fdf8700f2 examples of using kafka-python].


=== Receive tweets from Kafka with streaming Spark ===
== Receive tweets from Kafka with streaming Spark ==
 
'''Task 2:''' Go back to the '''spark_receiver.py''' from Exercise 2, and change the code to receive the stream of tweets from a Kafka topic instead.
'''Task 2:''' Go back to the '''spark_receiver.py''' from Exercise 2, and change the code to receive the stream of tweets from a Kafka topic instead.


Line 62: Line 45:
  import findspark
  import findspark
  ...
  ...
  findspark.init()
  findspark.init()
  findspark.add_packages('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
  findspark.add_packages('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
''It is important to get the versions right!'':
''It is important to get the versions right!'':
* 0-10 is the version of ''spark-sql-kafka''
* 0-10 is the version of ''spark-sql-kafka''
* 2.12 is the version of Scala you are running
* 2.12 is the version of Scala you are running
* 3.3.0 is your version of Spark
* 3.3.0 is your version of Spark


Line 79: Line 58:


Here is generic code to read from Kafka:
Here is generic code to read from Kafka:
  incoming_df = spark.readStream \
  incoming_df = spark.readStream \
                 .format('kafka') \
                 .format('kafka') \

Revision as of 09:50, 17 September 2022

Install Kafka

This exercise will assume you are running Ubuntu Linux, either natively or through WSL 2. See steps 1-5 in this guide.

We have to install Kafka "ourselves", outside apt, etc. It is good practice to have a separate folder for that, for example /opt

$ sudo mkdir /opt  # or another folder you want to keep self-installed software in
$ sudo chown your_username:your_username /opt
$ cd /opt

Download and install a recent Kafka, for example:

$ wget https://dlcdn.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz  # to download
$ tar xzf kafka_2.13-3.2.1.tgz  # to unzip and unpack
$ cd kafka_2.13-3.2.1

(If you want, you can now add /opt//kafka_2.13-3.2.1/bin to the end of your PATH and put it in your .bashrc file.)

As for Spark, you also need to Java >8 on your machine!

Steps 2-5 in this guide shows how you can test basic Kafka from the command line.

Streaming tweets to Kafka

Create a Kafka topic called tweets:

$ bin/kafka-console-consumer.sh --topic tweets --from-beginning --bootstrap-server localhost:9092

Task 1: Go back to tweet_harvester.py from Exercise 2, and change the code to write the stream of tweets to a Kafka topic instead. Here are some basic steps and code pieces you will need:

$ pip install kafka-python

In your Python code:

from kafka import KafkaProducer
...
# connect to the kafka topic
producer = KafkaProducer(bootstrap_servers='localhost:9092')
...
# send a bytearray to the Kafka topic 'tweets'
producer.send('tweets', json_bytes)

See steps 5 in this guide shows how you can test basic Kafka from the command line.

This guide presents more examples of using kafka-python.

Receive tweets from Kafka with streaming Spark

Task 2: Go back to the spark_receiver.py from Exercise 2, and change the code to receive the stream of tweets from a Kafka topic instead.

This time you do not need to import kafka-python, but you need a package called spark-sql-kafka to run Kafka with streaming Spark. The easiest way to get started is something like:

import findspark
...
findspark.init()
findspark.add_packages('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')

It is important to get the versions right!:

  • 0-10 is the version of spark-sql-kafka
  • 2.12 is the version of Scala you are running
  • 3.3.0 is your version of Spark

It is important to get the Scala and Spark versions right. One way to check is to look at the Java packages you already have installed in venv/lib/python3.8/site-packages/pyspark/jars/.

findspark.add_packages(...) downloads spark-sql-kafka and its dependencies from scratch very time you start your application. When you are sure you have got the versions right, you can look for ways to install them permanently. Not that you most likely cannot use any Java packages you installed along with Kafka, because they may not have the right version.

Here is generic code to read from Kafka:

incoming_df = spark.readStream \
                .format('kafka') \
                .option('kafka.bootstrap.servers', 'localhost:9092') \
                .option('subscribe', 'texts') \
                .option('startingOffsets', 'earliest') \
                .load()

from pyspark.sql.functions import decode
streaming_df = incoming_df \
                .select(decode(incoming_df.value, 'utf-8').alias('text'))