Install Kafka on the cluster

From info319

Install Zookeeper

On each instance, go to https://zookeeper.apache.org/releases.html#download . Download and unpack a recent binary distribution to ~/volume. For example:

cd ~/volume
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz
tar zxvf apache-zookeeper-3.8.0-bin.tar.gz
rm apache-zookeeper-3.8.0-bin.tar.gz
ln -fns apache-zookeeper-3.8.0-bin zookeeper

Configure Zookeeper

On your local computer, create the file zookeeper.properties:

dataDir=/home/ubuntu/volume/zookeeper/data
clientPort=2181
maxClientCnxns=200
tickTime=2000
initLimit=20
syncLimit=10

From your local computer, upload

scp zookeeper.properties spark-driver:
scp zookeeper.properties spark-worker-1:
scp zookeeper.properties spark-worker-2:

We will not run zookeeper on spark-worker-3, because the number of hosts must be odd to allow majority voting.

You can also run this as a bash-loop:

for host in spark-{driver,worker-{1,2}}: ; do
    scp zookeeper.properties $host: ;
done
In any case, you need to run this loop on your local computer, to set the "id" of each zookeeper (and later kafka) node
i=1
for host in spark-{driver,worker-{1,2,3}}: ; 
    do echo $i > myid; scp myid $host; ((i++));
done

On each instance except spark-worker-3, add addresses from /etc/hosts to zookeeper.properties:

mv ~/zookeeper.properties ~/volume/zookeeper/conf/
i=1
for line in $(grep spark- /etc/hosts | cut -d" " -f1 | head -3); do
    echo "server.$i=$line:2888:3888"; ((i++));
done >> ~/volume/zookeeper/conf/zookeeper.properties
mkdir -p /home/ubuntu/volume/zookeeper/data
mv ~/myid /home/ubuntu/volume/zookeeper/data

Set ZOOKEEPER_HOME and add bin/ to PATH:

export ZOOKEEPER_HOME=/home/ubuntu/volume/zookeeper
export PATH=${PATH}:${ZOOKEEPER_HOME}/bin
cp ~/.bashrc ~/.bashrc-bkp-$(date --iso-8601=minutes --utc)
echo "export ZOOKEEPER_HOME=/home/ubuntu/volume/zookeeper" >>  ~/.bashrc
echo "export PATH=\${PATH}:\${ZOOKEEPER_HOME}/bin" >>  ~/.bashrc

Test run zookeeper

On spark-driver and -worker-1 and -2:

zkServer.sh start ${ZOOKEEPER_HOME}/conf/zookeeper.properties

This will start zookeeper in the background on each host. Afterwards, check its status:

zkServer.sh status ${ZOOKEEPER_HOME}/conf/zookeeper.properties

On of the zookeepers is the leader, the two others are followers. In case of trouble, check ${ZOOKEEPER_HOME}/logs/ .

To stop zookeeper, do

zkServer.sh status ${ZOOKEEPER_HOME}/conf/zookeeper.properties

on each host.

Install Kafka

On each instance (including spark-worker-3), go to http://kafka.apache.org/downloads . Download and unpack a recent binary distribution to ~/volume. It is practical, but not critical, to install a Kafka that runs on the same Scala version as your Spark. If you want to know the Scala version of your installed Spark, you can check this file:

ls $SPARK_HOME/jars/spark-sql*

For example, .../spark-sql_2.12-3.3.0.jar means that you run Scala 2.12 and Spark 3.3.0.

On each instance:

cd ~/volume
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
tar xzvf kafka_2.12-3.3.1.tgz
rm kafka_2.12-3.3.1.tgz
ln -fns kafka_2.12-3.3.1 kafka

On each instance, configure Kafka:

cp ~/volume/kafka/config/server.properties ~/volume/kafka/config/server.properties.original
sed -i "s/broker.id=0/broker.id=$(cat /tmp/zookeeper/myid)/g" ~/volume/kafka/config/server.properties
export ZOOKEEPER_HOSTLIST=$(grep spark- /etc/hosts | cut -d" " -f1 | head -3 | paste -sd,)
echo "export ZOOKEEPER_HOSTLIST=\$(grep spark- /etc/hosts | cut -d\" \" -f1 | head -3 | paste -sd,)" >> ~/.bashrc
sed -i "s/zookeeper.connect=localhost:2181/zookeeper.connect=$ZOOKEEPER_HOSTLIST/g" ~/volume/kafka/config/server.properties
local_ip=$(ip -4 address | grep -o "^ *inet \(.\+\)\/.\+global.*$" | grep -o "[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+" | head -1) ; \
echo "advertised.host.name=$local_ip" >> ~/volume/kafka/config/server.properties

Set KAFKA_HOME and add bin/ to PATH:

export KAFKA_HOME=/home/ubuntu/volume/kafka
export PATH=${PATH}:${KAFKA_HOME}/bin
cp ~/.bashrc ~/.bashrc-bkp-$(date --iso-8601=minutes --utc)
echo "export KAFKA_HOME=/home/ubuntu/volume/kafka" >>  ~/.bashrc
echo "export PATH=${PATH}:${KAFKA_HOME}/bin" >>  ~/.bashrc

Test run Kafka

Ensure Zookeeper is still running (or restart it) on the three nodes. On all four nodes:

kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties

(It is a good sign if nothing happens.)

Test commands to run on the different instances:

kafka-topics.sh --create --topic test --replication-factor 2 --partitions 3 --bootstrap-server ${MASTER_NODE}:9092
kafka-topics.sh --list --bootstrap-server ${MASTER_NODE}:9092

Run these two commands on different instances:

bin/kafka-console-producer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092
bin/kafka-console-consumer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092 --from-beginning

Task 2: Run the full Twitter pipeline from Exercise 3. Tips:

  • You need to create virtual environment to download packages, specifically tweepy, kafka-python, and afinn.
  • You need to create keys/bearer_token with restricted access.
  • With the default settings, it can take time before the final join is written to console. Be patient...
  • Start-up sequence from spark-driver:
# assuming zookeeper and kafka run already
# create kafka topics
kafka-topics.sh --create --topic tweets --bootstrap-server localhost:9092
kafka-topics.sh --create --topic media --bootstrap-server localhost:9092
kafka-topics.sh --create --topic photos --bootstrap-server localhost:9092
# optional monitoring
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tweets --from-beginning &
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic media --from-beginning &
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic photos --from-beginning &
# main programs
python twitterpipe_tweet_harvester.py  # run this repeatedly or modify to reconnect after sleeping
python twitterpipe_media_harvester.py &  # this one can run in background
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 twitterpipe_tweet_pipeline.py &

Because you no longer use findspark, you need to use the --packages option with spark-submit. Make sure you use exactly the same package as you did with findspark.add_packages(...).

Web UIs

Assuming that 158.39.77.227 is the IPv4 address of spark-driver, you can access zookeeper's very simple web UI at http://158.39.77.227:8080/commands/stat .

Kafka has no built in web UI, but third-party UIs are available.