Streaming tweets with Kafka and Spark: Difference between revisions

From info319
 
(17 intermediate revisions by the same user not shown)
Line 1: Line 1:
''This exercise assumes you are running Ubuntu Linux, either natively or through WSL 2.''
''This and later exercises assume you are running Ubuntu Linux, either natively or through WSL 2.''


== Download and install Kafka ==
== Download and install Kafka ==
Line 23: Line 23:
== Streaming tweets to Kafka ==
== Streaming tweets to Kafka ==
Create a Kafka topic called ''tweets'':
Create a Kafka topic called ''tweets'':
  $ bin/kafka-console-consumer.sh --topic tweets --from-beginning --bootstrap-server localhost:9092
  $ bin/kafka-topics.sh --create --topic tweets --bootstrap-server localhost:9092


'''Task 1:''' Go back to '''tweet_harvester.py''' from Exercise 2, and change the code to write the stream of tweets to a Kafka topic instead of a socket. Here are some basic steps and code pieces you will need:
'''Task 1:''' Go back to '''tweet_harvester.py''' from Exercise 2, and change the code to write the stream of tweets to a Kafka topic instead of a socket. Here are some basic steps and code pieces you will need:
Line 41: Line 41:


This guide presents more [https://towardsdatascience.com/3-libraries-you-should-know-to-master-apache-kafka-in-python-c95fdf8700f2 examples of using kafka-python].
This guide presents more [https://towardsdatascience.com/3-libraries-you-should-know-to-master-apache-kafka-in-python-c95fdf8700f2 examples of using kafka-python].
== Building a distributed architecture ==
The following illustration shows one way to organise the solution to this exercise:
[[File:Twitterpipe.png]]


== Receive tweets from Kafka with streaming Spark ==
== Receive tweets from Kafka with streaming Spark ==
Line 131: Line 136:
  from pyspark.sql.types import ArrayType, StringType
  from pyspark.sql.types import ArrayType, StringType
  from pyspark.sql.functions import from_json
  from pyspark.sql.functions import from_json
 
  list_schema = ArrayType(StringType())
  list_schema = ArrayType(StringType())
   
   
Line 137: Line 142:
                 .select('*', from_json('media', list_schema).alias('media_list')) \
                 .select('*', from_json('media', list_schema).alias('media_list')) \
  ...
  ...
You can then chose to '''.explode''' the ''media_list'' further.


== Spark output stream to Kafka ==
== Spark output stream to Kafka ==
Line 144: Line 150:
(To remove all messages from a Kafka topic, you can use '''--delete''' instead of '''--create''' in the above command, and then use '''--create'''  to recreate the topic.)
(To remove all messages from a Kafka topic, you can use '''--delete''' instead of '''--create''' in the above command, and then use '''--create'''  to recreate the topic.)


'''Task 7:''' Create a Spark output stream to the new Kafka topic. For each tweet with an ''id'', a (media) ''type'' and a ''url'', the output stream should contain at least these three fields.
'''Task 7:''' To your Spark pipeline, add a new output stream to the new Kafka ''media'' topic. For each tweet with an ''id'', a (media) ''type'' and a ''url'', the output stream should receive at least these three fields.


== Media downloader ==
== Media downloader ==


'''Task 8:''' Create a new Python program ''media_downloader.py'' that consumes the ''media'' topic.
'''Task 8:''' Create a new Python program ''media_downloader.py'' that consumes the ''media'' topic. Here are some useful pieces of code:


  from kafka import KafkaConsumer, KafkaProducer
  from kafka import KafkaConsumer, KafkaProducer
  ...
  ...
 
         self.consumer = KafkaConsumer(
         self.consumer = KafkaConsumer(
             'media',
             'media',
Line 159: Line 166:
             group_id='media-downloader-group',
             group_id='media-downloader-group',
             enable_auto_commit=False)
             enable_auto_commit=False)
  ...
  ...
 
         try:
         try:
 
            for message in self.consumer:
              for message in self.consumer:
                 ...handle each message...
                 ...handle each message...
         except KeyboardInterrupt:
         except KeyboardInterrupt:
             ...shut down...
             ...shut down...
The first version of the program can do something really simple, like using Python's '''requests''' library to download media with type ''photo'', and perhaps return the photo size in bytes.


The first version of the program can do something really simple, like using the '''requests''' library to download media with type ''photo'', and perhaps return the size in bytes.
'''Task 9:''' Create a new Kafka topic, for example ''photos''. Extend ''media_downloader.py'' so that, for each photo, it writes (at least) the tweet 'id' and photo 'size' back to Kafka.
 
'''Task 9:''' Create a new Kafka topic, for example ''photos''. Extend the ''media_downloader.py'' so that, for each photo, it writes (at least) the tweet 'id' and photo 'size' back to Kafka.


== Spark join ==
== Spark join ==
Line 177: Line 182:
'''Task 10:''' Add a Spark pipeline to read tweet ''id'' and photo ''size'' from the ''photos'' topic. Perform an '''inner''' join on the ''id'' column to combine the results of sentiment analysis and external photo analysis.
'''Task 10:''' Add a Spark pipeline to read tweet ''id'' and photo ''size'' from the ''photos'' topic. Perform an '''inner''' join on the ''id'' column to combine the results of sentiment analysis and external photo analysis.


== Practical advice ==
'''Task 11:''' The inner join in task 10 only returns tweets with photos. A ''left outer join'' of the sentiment analysis frame (left) and photo analysis frame (right) will return a Row for every tweet, but with some null values for the tweets without photos.
 
Perform such an '''outerLeft''' join on the tweet ids. To achieve this, you need to:
# pick out the ''timestamp''-s in addition to the ''value''-s from the incoming Kafka streams of tweets and photos,
# rename the ''id'' and ''timestamp'' columns in the streaming photo DataFrame,
# define the ''timestamp'' and (renamed) ''photo_timestamp'' columns as watermarks, for example with a 10 minute threshold, and
# add an explicit '''outerLeft''' join expression over the ''id''-s and ''timestamp''-s.
 
Tips:
* to pick out timestamps from incoming Kafka streams, for example:
                .select(photo_sdf.timestamp,
                        decode(photo_sdf.value, 'utf-8').alias('json_str')) \
* to rename columns, for example:
                .withColumnRenamed('timestamp', 'photo_timestamp') \
* to add a watermark, for example:
                .withWatermark('photo_timestamp', '10 minutes')


* Do all of this incrementally, making sure that each step works completely before you begin the next!
For more inforation, see [https://spark.apache.org/docs/3.3.0/structured-streaming-programming-guide.html#handling-late-data-and-watermarking Handling Late Data and Watermarking] and [https://spark.apache.org/docs/3.3.0/structured-streaming-programming-guide.html#outer-joins-with-watermarking Outer Joins with Watermarking] in the [https://spark.apache.org/docs/3.3.0/structured-streaming-programming-guide.html Structured Streaming Programming Guide].


* It is easiest to write this as separate Python programs that you run in separate windows, for example: ''tweet_harvester.py'', ''spark_pipeline.py'', and ''media_downloader.py''. (It is possible to run everything as a single complete program, but you risk race conditions that can be hard to debug.)
== Some practical advice ==


* Do all of this incrementally, making sure that each step works completely before you start the next. Keep the whole pipeline running as you proceed.
* It is easiest to write this as separate Python programs that you run in separate windows, for example: ''tweet_harvester.py'', ''spark_pipeline.py'', and ''media_downloader.py''. It is possible to run everything as a single complete program, but you risk race conditions that can be hard to debug.
* Also start a kafka-consumer for each topic in separate windows, and keep them running as you develop. That way you have more control of what happens, e.g.:
* Also start a kafka-consumer for each topic in separate windows, and keep them running as you develop. That way you have more control of what happens, e.g.:
  bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic media --from-beginning
  bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic media --from-beginning
* When a Spark producer crashes, and in a few other cases, a Kafka topic can be left in an inconsistent state. If that happens, try to delete your ''checkpoint''-folder.
* You do not need to flood Kafka by sampling tweets continuously. Instead you can configure ''tweet_harvester.py'' to harvest a small number of tweets (e.g., 3) each time you want to test something new.
* When a Spark producer crashes, and in a few other cases, a Kafka topic can be left in an inconsistent state. If that happens, you can try to delete your ''checkpoint''-folder.

Latest revision as of 15:22, 13 October 2022

This and later exercises assume you are running Ubuntu Linux, either natively or through WSL 2.

Download and install Kafka

You need to install Kafka "by hand" outside the usual package managers apt, yum, homebrew, etc. It is good practice to have a separate folder for such software, for example /opt:

$ sudo mkdir /opt
$ sudo chown your_username:your_username /opt
$ cd /opt

Download and install a recent Kafka, for example:

$ wget https://dlcdn.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz  # to download
$ tar xzf kafka_2.13-3.2.1.tgz  # to unzip and unpack
$ cd kafka_2.13-3.2.1  # to test

To start Kakfa, run the following two commands in sequence in two separate terminal windows (as for Spark, you need to have Java >8 on your machine):

$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties

(You can also run them in the same window by adding & to the end of each line.)

Steps 2-5 in this guide shows how you can test basic Kafka from the command line.

(If you want, you can now add /opt/kafka_2.13-3.2.1/bin to the end of your PATH and put it in your .bashrc file. To restart Zookeper and Kafka after you reboot your machine, you can add the appropriate commands to your .profile file - use ending &-s.)

Streaming tweets to Kafka

Create a Kafka topic called tweets:

$ bin/kafka-topics.sh --create --topic tweets --bootstrap-server localhost:9092

Task 1: Go back to tweet_harvester.py from Exercise 2, and change the code to write the stream of tweets to a Kafka topic instead of a socket. Here are some basic steps and code pieces you will need:

$ pip install kafka-python

In your Python code:

from kafka import KafkaProducer
...
# connect to the kafka topic
producer = KafkaProducer(bootstrap_servers='localhost:9092')
...
# send a bytearray to the Kafka topic 'tweets'
producer.send('tweets', json_bytes)

Step 5 in this guide shows how you can receive Kafka messages from the command line:

$ bin/kafka-console-consumer.sh --topic tweets --from-beginning --bootstrap-server localhost:9092

This guide presents more examples of using kafka-python.

Building a distributed architecture

The following illustration shows one way to organise the solution to this exercise:

Receive tweets from Kafka with streaming Spark

Task 2: Go back to the spark_receiver.py from Exercise 2, and change the code to receive tweets from a Kafka topic instead of a socket.

This time you do not need to import kafka-python, but you need a package called spark-sql-kafka to run Kafka with structured streaming Spark. The easiest way to get started is something like:

import findspark
...
findspark.init()
findspark.add_packages('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')

It is important to get the Scala and Spark versions right! In the example above:

  • 0-10 is the version of spark-sql-kafka
  • 2.12 is the version of Scala you are running
  • 3.3.0 is your version of Spark

One way to find out which Scala and Spark versions you need is to look at the Java packages installed in venv/lib/python3.x/site-packages/pyspark/jars/.

The line findspark.add_packages(...) downloads spark-sql-kafka and its dependencies anew each time your application starts. Note that you most likely cannot use the Java packages you installed along with Kafka because they may be a different version.

Here is generic Python code to read a structured stream from Kafka:

incoming_df = spark.readStream \
                .format('kafka') \
                .option('kafka.bootstrap.servers', 'localhost:9092') \
                .option('subscribe', 'tweets') \
                .option('startingOffsets', 'earliest') \
                .load()

You can use the same .writeStream as before to print the stream to console.

This transformation decodes the tweet into a string:

from pyspark.sql.functions import decode
streaming_df = incoming_df \
                .select(decode(incoming_df.value, 'utf-8').alias('tweet'))

Sentiment analysis of tweets

Afinn is a much used Python API for sentiment analysis:

(venv) $ pip install afinn

Task 3: Create a Spark user-defined function (UDF) that takes a text as input and returns its Afinn sentiment score. (The slides and notebook from Session 3 contains a UDF example.)

Task 4: Create a Spark pipeline that reads the tweet topic produced by tweet_harvester.py as before, and uses the new UDF to add an afinn_score to each tweet.

Twitter language metadata

Task 5: Use Twitter's language detector to filter out languages you do not want to analyse.

To get the language code along with the id and text for each tweet, you must add lang to the tweet_fields option when you call StreamingTwitter.sample() in tweet_harvester.py.

For more details, see the Twitter API v2 data dictionary and tweepy's StreamingClient documentation. There are many other interesting fields available, for example context_annotations and entities. Unfortunately, the geo field is often null...

The next few tasks are examples of further things you can do with Spark and Kafka. All of them are highly relevant for you programming projects! At the end you will also find some general advice for how to proceed.

Twitter media metadata

Task 6: Get information about included media for each tweet.

To achieve this, you must set the extensions option to ['attachments.media_keys'] when you call StreamingTwitter.sample() and also add fields like media_key,type,url to the media_fields option.

In tweet_harvester.py, when you receive the streaming tweets with media fields, code like the one suggested earlier will not quite do the job:

def on_data(json_data):
    json_obj = json.loads(json_data.decode())
    json_str = json.dumps(json_obj['data'])+'\r\n'
    json_bytes = json_str.encode()
    producer.send('tweets', json_bytes)

The reason is that the media fields are not in json_obj['data'], but in json_obj['includes']['media']. To handle this, you can either

  1. pick out the JSON fields you need already in the tweet harvester and receive them in a "flat" JSON object in your Spark pipeline, or
  2. pass all the json_data on as they are and use the Spark pipeline to pick out the fields you want.

If you choose the latter option, here is how you can pass all the json_data on unprocessed:

def on_data(json_data):
    producer.send('tweets', json_data)

If you pass all the json_data on unprocessed, here is how you can pick out (a few of the) nested JSON fields in Spark:

from pyspark.sql.functions import decode, get_json_object
tweet_sdf = spark.readStream. (...as before...) .load()
selected_tweet_sdf = tweet_sdf \
               .select('*', decode(tweet_sdf.value, 'utf-8').alias('json_str')) \
               .select('*', get_json_object('json_str', '$.data.id').alias('id')) \
               .select('*', get_json_object('json_str', '$.includes.media').alias('media'))

The new media field is a JSON string that represents a list of JSON objects, which must be unpacked further. Tips on how to do this:

from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import from_json

list_schema = ArrayType(StringType())

...
               .select('*', from_json('media', list_schema).alias('media_list')) \
...

You can then chose to .explode the media_list further.

Spark output stream to Kafka

In a terminal window, create new media topic. As before, you can use a command like this:

$ bin/kafka-topics.sh --create --topic media --bootstrap-server localhost:9092

(To remove all messages from a Kafka topic, you can use --delete instead of --create in the above command, and then use --create to recreate the topic.)

Task 7: To your Spark pipeline, add a new output stream to the new Kafka media topic. For each tweet with an id, a (media) type and a url, the output stream should receive at least these three fields.

Media downloader

Task 8: Create a new Python program media_downloader.py that consumes the media topic. Here are some useful pieces of code:

from kafka import KafkaConsumer, KafkaProducer

...

        self.consumer = KafkaConsumer(
            'media',
            bootstrap_servers='localhost:9092',
            client_id='media-downloader',
            group_id='media-downloader-group',
            enable_auto_commit=False)

...

        try:
            for message in self.consumer:
                ...handle each message...
        except KeyboardInterrupt:
            ...shut down...

The first version of the program can do something really simple, like using Python's requests library to download media with type photo, and perhaps return the photo size in bytes.

Task 9: Create a new Kafka topic, for example photos. Extend media_downloader.py so that, for each photo, it writes (at least) the tweet 'id' and photo 'size' back to Kafka.

Spark join

Task 10: Add a Spark pipeline to read tweet id and photo size from the photos topic. Perform an inner join on the id column to combine the results of sentiment analysis and external photo analysis.

Task 11: The inner join in task 10 only returns tweets with photos. A left outer join of the sentiment analysis frame (left) and photo analysis frame (right) will return a Row for every tweet, but with some null values for the tweets without photos.

Perform such an outerLeft join on the tweet ids. To achieve this, you need to:

  1. pick out the timestamp-s in addition to the value-s from the incoming Kafka streams of tweets and photos,
  2. rename the id and timestamp columns in the streaming photo DataFrame,
  3. define the timestamp and (renamed) photo_timestamp columns as watermarks, for example with a 10 minute threshold, and
  4. add an explicit outerLeft join expression over the id-s and timestamp-s.

Tips:

  • to pick out timestamps from incoming Kafka streams, for example:
                .select(photo_sdf.timestamp,
                        decode(photo_sdf.value, 'utf-8').alias('json_str')) \
  • to rename columns, for example:
                .withColumnRenamed('timestamp', 'photo_timestamp') \
  • to add a watermark, for example:
                .withWatermark('photo_timestamp', '10 minutes')

For more inforation, see Handling Late Data and Watermarking and Outer Joins with Watermarking in the Structured Streaming Programming Guide.

Some practical advice

  • Do all of this incrementally, making sure that each step works completely before you start the next. Keep the whole pipeline running as you proceed.
  • It is easiest to write this as separate Python programs that you run in separate windows, for example: tweet_harvester.py, spark_pipeline.py, and media_downloader.py. It is possible to run everything as a single complete program, but you risk race conditions that can be hard to debug.
  • Also start a kafka-consumer for each topic in separate windows, and keep them running as you develop. That way you have more control of what happens, e.g.:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic media --from-beginning
  • You do not need to flood Kafka by sampling tweets continuously. Instead you can configure tweet_harvester.py to harvest a small number of tweets (e.g., 3) each time you want to test something new.
  • When a Spark producer crashes, and in a few other cases, a Kafka topic can be left in an inconsistent state. If that happens, you can try to delete your checkpoint-folder.