Install Kafka on the cluster: Difference between revisions

From info319
Line 27: Line 27:
  done
  done


In any case, you need to run this loop on your local computer, to set the "id" of each zookeeper (and later kafka) node
In any case, you need to run this loop on your local computer, to set the "id" of each zookeeper (and later kafka) node:
  i=1
  i=1
  for host in spark-{driver,worker-{1,2,3}}: ;  
  for host in spark-{driver,worker-{1,2,3}}: ;  

Revision as of 12:20, 31 October 2022

Install Zookeeper

On each instance, go to https://zookeeper.apache.org/releases.html#download . Download and unpack a recent binary distribution to ~/volume. For example:

cd ~/volume
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz
tar zxvf apache-zookeeper-3.8.0-bin.tar.gz
rm apache-zookeeper-3.8.0-bin.tar.gz
ln -fns apache-zookeeper-3.8.0-bin zookeeper

Configure Zookeeper

On your local computer, create the file zookeeper.properties:

dataDir=/home/ubuntu/volume/zookeeper/data
clientPort=2181
maxClientCnxns=200
tickTime=2000
initLimit=20
syncLimit=10

From your local computer, upload

scp zookeeper.properties spark-driver:
scp zookeeper.properties spark-worker-1:
scp zookeeper.properties spark-worker-2:

We will not run zookeeper on spark-worker-3, because the number of hosts must be odd to allow majority voting.

You can also run this as a bash-loop:

for host in spark-{driver,worker-{1,2}}: ; do
    scp zookeeper.properties $host: ;
done

In any case, you need to run this loop on your local computer, to set the "id" of each zookeeper (and later kafka) node:

i=1
for host in spark-{driver,worker-{1,2,3}}: ; 
    do echo $i > myid; scp myid $host; ((i++));
done

On each instance except spark-worker-3, add addresses from /etc/hosts to zookeeper.properties:

mv ~/zookeeper.properties ~/volume/zookeeper/conf/
i=1
for line in $(grep spark- /etc/hosts | cut -d" " -f1 | head -3); do
    echo "server.$i=$line:2888:3888"; ((i++));
done >> ~/volume/zookeeper/conf/zookeeper.properties
mkdir -p /home/ubuntu/volume/zookeeper/data
mv ~/myid /home/ubuntu/volume/zookeeper/data

Set ZOOKEEPER_HOME and add bin/ to PATH:

export ZOOKEEPER_HOME=/home/ubuntu/volume/zookeeper
export PATH=${PATH}:${ZOOKEEPER_HOME}/bin
cp ~/.bashrc ~/.bashrc-bkp-$(date --iso-8601=minutes --utc)
echo "export ZOOKEEPER_HOME=/home/ubuntu/volume/zookeeper" >>  ~/.bashrc
echo "export PATH=\${PATH}:\${ZOOKEEPER_HOME}/bin" >>  ~/.bashrc

Test run zookeeper

On spark-driver and -worker-1 and -2:

zkServer.sh start ${ZOOKEEPER_HOME}/conf/zookeeper.properties

This will start zookeeper in the background on each host. Afterwards, check its status:

zkServer.sh status ${ZOOKEEPER_HOME}/conf/zookeeper.properties

On of the zookeepers is the leader, the two others are followers. In case of trouble, check ${ZOOKEEPER_HOME}/logs/ .

To stop zookeeper, do

zkServer.sh status ${ZOOKEEPER_HOME}/conf/zookeeper.properties

on each host.

Install Kafka

On each instance (including spark-worker-3), go to http://kafka.apache.org/downloads . Download and unpack a recent binary distribution to ~/volume. It is practical, but not critical, to install a Kafka that runs on the same Scala version as your Spark. If you want to know the Scala version of your installed Spark, you can check this file:

ls $SPARK_HOME/jars/spark-sql*

For example, .../spark-sql_2.12-3.3.0.jar means that you run Scala 2.12 and Spark 3.3.0.

On each instance:

cd ~/volume
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
tar xzvf kafka_2.12-3.3.1.tgz
rm kafka_2.12-3.3.1.tgz
ln -fns kafka_2.12-3.3.1 kafka

On each instance, configure Kafka:

cp ~/volume/kafka/config/server.properties ~/volume/kafka/config/server.properties.original
sed -i "s/broker.id=0/broker.id=$(cat /tmp/zookeeper/myid)/g" ~/volume/kafka/config/server.properties
export ZOOKEEPER_HOSTLIST=$(grep spark- /etc/hosts | cut -d" " -f1 | head -3 | paste -sd,)
echo "export ZOOKEEPER_HOSTLIST=\$(grep spark- /etc/hosts | cut -d\" \" -f1 | head -3 | paste -sd,)" >> ~/.bashrc
sed -i "s/zookeeper.connect=localhost:2181/zookeeper.connect=$ZOOKEEPER_HOSTLIST/g" ~/volume/kafka/config/server.properties
local_ip=$(ip -4 address | grep -o "^ *inet \(.\+\)\/.\+global.*$" | grep -o "[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+" | head -1) ; \
echo "advertised.host.name=$local_ip" >> ~/volume/kafka/config/server.properties

Set KAFKA_HOME and add bin/ to PATH:

export KAFKA_HOME=/home/ubuntu/volume/kafka
export PATH=${PATH}:${KAFKA_HOME}/bin
cp ~/.bashrc ~/.bashrc-bkp-$(date --iso-8601=minutes --utc)
echo "export KAFKA_HOME=/home/ubuntu/volume/kafka" >>  ~/.bashrc
echo "export PATH=${PATH}:${KAFKA_HOME}/bin" >>  ~/.bashrc

Test run Kafka

Ensure Zookeeper is still running (or restart it) on the three nodes. On all four nodes:

kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties

(It is a good sign if nothing happens.)

Test commands to run on the different instances:

kafka-topics.sh --create --topic test --replication-factor 2 --partitions 3 --bootstrap-server ${MASTER_NODE}:9092
kafka-topics.sh --list --bootstrap-server ${MASTER_NODE}:9092

Run these two commands on different instances:

bin/kafka-console-producer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092
bin/kafka-console-consumer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092 --from-beginning

Task 2: Run the full Twitter pipeline from Exercise 3. Tips:

  • You need to create virtual environment to download packages, specifically tweepy, kafka-python, and afinn.
  • You need to create keys/bearer_token with restricted access.
  • With the default settings, it can take time before the final join is written to console. Be patient...
  • Start-up sequence from spark-driver:
# assuming zookeeper and kafka run already
# create kafka topics
kafka-topics.sh --create --topic tweets --bootstrap-server localhost:9092
kafka-topics.sh --create --topic media --bootstrap-server localhost:9092
kafka-topics.sh --create --topic photos --bootstrap-server localhost:9092
# optional monitoring
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tweets --from-beginning &
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic media --from-beginning &
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic photos --from-beginning &
# main programs
python twitterpipe_tweet_harvester.py  # run this repeatedly or modify to reconnect after sleeping
python twitterpipe_media_harvester.py &  # this one can run in background
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 twitterpipe_tweet_pipeline.py &

Because you no longer use findspark, you need to use the --packages option with spark-submit. Make sure you use exactly the same package as you did with findspark.add_packages(...).

Web UIs

Assuming that 158.39.77.227 is the IPv4 address of spark-driver, you can access zookeeper's very simple web UI at http://158.39.77.227:8080/commands/stat .

Kafka has no built in web UI, but third-party UIs are available.