Install Kafka on the cluster: Difference between revisions

From info319
 
(10 intermediate revisions by the same user not shown)
Line 16: Line 16:
  syncLimit=10
  syncLimit=10


From your local computer, upload  
From your local computer, upload:
  scp zookeeper.properties spark-driver:
  scp zookeeper.properties spark-driver:
  scp zookeeper.properties spark-worker-1:
  scp zookeeper.properties spark-worker-1:
Line 27: Line 27:
  done
  done


In any case, you need to run this loop on your local computer, to set the "id" of each zookeeper (and later kafka) node
In any case, you need to run this loop on your local computer, to set the "id" of each zookeeper (and later kafka) node:
  i=1
  i=1
  for host in spark-{driver,worker-{1,2,3}}: ;  
  for host in spark-{driver,worker-{1,2,3}}: ;  
Line 54: Line 54:
This will start zookeeper in the background on each host. Afterwards, check its status:
This will start zookeeper in the background on each host. Afterwards, check its status:
  zkServer.sh status ${ZOOKEEPER_HOME}/conf/zookeeper.properties
  zkServer.sh status ${ZOOKEEPER_HOME}/conf/zookeeper.properties
On of the zookeepers is the leader, the two others are followers. In case of trouble, check ${ZOOKEEPER_HOME}/logs/ .
One of the zookeepers is the leader, the two others are followers. In case of trouble, check ${ZOOKEEPER_HOME}/logs/ .


To stop zookeeper, do
To stop zookeeper, do
  zkServer.sh status ${ZOOKEEPER_HOME}/conf/zookeeper.properties
  zkServer.sh stop ${ZOOKEEPER_HOME}/conf/zookeeper.properties
on each host.
on each host.


Line 72: Line 72:
  ln -fns kafka_2.12-3.3.1 kafka
  ln -fns kafka_2.12-3.3.1 kafka


== Configure Kafka ==
On each instance, configure Kafka:
On each instance, configure Kafka:
  cp ~/volume/kafka/config/server.properties ~/volume/kafka/config/server.properties.original
  cp ~/volume/kafka/config/server.properties ~/volume/kafka/config/server.properties.original
  sed -i "s/broker.id=0/broker.id=$(cat /tmp/zookeeper/myid)/g" ~/volume/kafka/config/server.properties
  sed -i "s/broker.id=/broker.id=$(cat /tmp/zookeeper/myid)/g" ~/volume/kafka/config/server.properties
  export ZOOKEEPER_HOSTLIST=$(grep spark- /etc/hosts | cut -d" " -f1 | head -3 | paste -sd,)
  export ZOOKEEPER_HOSTLIST=$(grep spark- /etc/hosts | cut -d" " -f1 | head -3 | paste -sd,)
  echo "export ZOOKEEPER_HOSTLIST=\$(grep spark- /etc/hosts | cut -d\" \" -f1 | head -3 | paste -sd,)" >> ~/.bashrc
  echo "export ZOOKEEPER_HOSTLIST=\$(grep spark- /etc/hosts | cut -d\" \" -f1 | head -3 | paste -sd,)" >> ~/.bashrc
Line 93: Line 94:
(It is a good sign if nothing happens.)
(It is a good sign if nothing happens.)


Test commands to run on the different instances:
Here are some test commands you can run on the different instances:
  kafka-topics.sh --create --topic test --replication-factor 2 --partitions 3 --bootstrap-server ${MASTER_NODE}:9092
  kafka-topics.sh --create --topic test --replication-factor 2 --partitions 3 --bootstrap-server ${MASTER_NODE}:9092
  kafka-topics.sh --list --bootstrap-server ${MASTER_NODE}:9092
  kafka-topics.sh --list --bootstrap-server ${MASTER_NODE}:9092


Run these two commands on different instances:
Run these two commands on different instances to see that lines you type into the ''producer'' console show up in the ''consumer'':
  bin/kafka-console-producer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092
  kafka-console-producer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092
  bin/kafka-console-consumer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092 --from-beginning
  kafka-console-consumer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092 --from-beginning


'''Task 2:''' Run the full Twitter pipeline from Exercise 3. Tips:
'''Task 2:''' Run the full Twitter pipeline from Exercise 3. Tips:
* You need to create virtual environment to download packages, specifically ''tweepy'', ''kafka-python'', and ''afinn''.
* You may want to create a virtual environment to install Python packages such as ''tweepy'', ''kafka-python'', and ''afinn''.
* You need to create ''keys/bearer_token'' with restricted access.
* You need to create ''keys/bearer_token'' with restricted access.
* With the default settings, it can take time before the final join is written to console. Be patient...
* With the default settings, it can take time before the final join is written to console. Be patient...
Line 122: Line 123:


== Web UIs ==
== Web UIs ==
Assuming that 158.39.77.227 is the IPv4 address of ''spark-driver'', you can access zookeeper's very simple web UI at http://158.39.77.227:8080/commands/stat .
Assuming that 158.39.77.227 is the IPv4 address of ''spark-driver'', you can access Zookeeper's very simple web UI at http://158.39.77.227:8080/commands/stat .


Kafka has no built in web UI, but third-party UIs are available. <!-- If you [https://docs.docker.com/engine/install/ install Docker] on ''spark-driver'', you can run something like:
Kafka has no built in web UI, but third-party UIs are available. <!-- If you [https://docs.docker.com/engine/install/ install Docker] on ''spark-driver'', you can run something like:

Latest revision as of 11:15, 1 December 2022

Install Zookeeper

On each instance, go to https://zookeeper.apache.org/releases.html#download . Download and unpack a recent binary distribution to ~/volume. For example:

cd ~/volume
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz
tar zxvf apache-zookeeper-3.8.0-bin.tar.gz
rm apache-zookeeper-3.8.0-bin.tar.gz
ln -fns apache-zookeeper-3.8.0-bin zookeeper

Configure Zookeeper

On your local computer, create the file zookeeper.properties:

dataDir=/home/ubuntu/volume/zookeeper/data
clientPort=2181
maxClientCnxns=200
tickTime=2000
initLimit=20
syncLimit=10

From your local computer, upload:

scp zookeeper.properties spark-driver:
scp zookeeper.properties spark-worker-1:
scp zookeeper.properties spark-worker-2:

We will not run zookeeper on spark-worker-3, because the number of hosts must be odd to allow majority voting.

You can also run this as a bash-loop:

for host in spark-{driver,worker-{1,2}}: ; do
    scp zookeeper.properties $host: ;
done

In any case, you need to run this loop on your local computer, to set the "id" of each zookeeper (and later kafka) node:

i=1
for host in spark-{driver,worker-{1,2,3}}: ; 
    do echo $i > myid; scp myid $host; ((i++));
done

On each instance except spark-worker-3, add addresses from /etc/hosts to zookeeper.properties:

mv ~/zookeeper.properties ~/volume/zookeeper/conf/
i=1
for line in $(grep spark- /etc/hosts | cut -d" " -f1 | head -3); do
    echo "server.$i=$line:2888:3888"; ((i++));
done >> ~/volume/zookeeper/conf/zookeeper.properties
mkdir -p /home/ubuntu/volume/zookeeper/data
mv ~/myid /home/ubuntu/volume/zookeeper/data

Set ZOOKEEPER_HOME and add bin/ to PATH:

export ZOOKEEPER_HOME=/home/ubuntu/volume/zookeeper
export PATH=${PATH}:${ZOOKEEPER_HOME}/bin
cp ~/.bashrc ~/.bashrc-bkp-$(date --iso-8601=minutes --utc)
echo "export ZOOKEEPER_HOME=/home/ubuntu/volume/zookeeper" >>  ~/.bashrc
echo "export PATH=\${PATH}:\${ZOOKEEPER_HOME}/bin" >>  ~/.bashrc

Test run zookeeper

On spark-driver and -worker-1 and -2:

zkServer.sh start ${ZOOKEEPER_HOME}/conf/zookeeper.properties

This will start zookeeper in the background on each host. Afterwards, check its status:

zkServer.sh status ${ZOOKEEPER_HOME}/conf/zookeeper.properties

One of the zookeepers is the leader, the two others are followers. In case of trouble, check ${ZOOKEEPER_HOME}/logs/ .

To stop zookeeper, do

zkServer.sh stop ${ZOOKEEPER_HOME}/conf/zookeeper.properties

on each host.

Install Kafka

On each instance (including spark-worker-3), go to http://kafka.apache.org/downloads . Download and unpack a recent binary distribution to ~/volume. It is practical, but not critical, to install a Kafka that runs on the same Scala version as your Spark. If you want to know the Scala version of your installed Spark, you can check this file:

ls $SPARK_HOME/jars/spark-sql*

For example, .../spark-sql_2.12-3.3.0.jar means that you run Scala 2.12 and Spark 3.3.0.

On each instance:

cd ~/volume
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
tar xzvf kafka_2.12-3.3.1.tgz
rm kafka_2.12-3.3.1.tgz
ln -fns kafka_2.12-3.3.1 kafka

Configure Kafka

On each instance, configure Kafka:

cp ~/volume/kafka/config/server.properties ~/volume/kafka/config/server.properties.original
sed -i "s/broker.id=/broker.id=$(cat /tmp/zookeeper/myid)/g" ~/volume/kafka/config/server.properties
export ZOOKEEPER_HOSTLIST=$(grep spark- /etc/hosts | cut -d" " -f1 | head -3 | paste -sd,)
echo "export ZOOKEEPER_HOSTLIST=\$(grep spark- /etc/hosts | cut -d\" \" -f1 | head -3 | paste -sd,)" >> ~/.bashrc
sed -i "s/zookeeper.connect=localhost:2181/zookeeper.connect=$ZOOKEEPER_HOSTLIST/g" ~/volume/kafka/config/server.properties
local_ip=$(ip -4 address | grep -o "^ *inet \(.\+\)\/.\+global.*$" | grep -o "[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+" | head -1) ; \
echo "advertised.host.name=$local_ip" >> ~/volume/kafka/config/server.properties

Set KAFKA_HOME and add bin/ to PATH:

export KAFKA_HOME=/home/ubuntu/volume/kafka
export PATH=${PATH}:${KAFKA_HOME}/bin
cp ~/.bashrc ~/.bashrc-bkp-$(date --iso-8601=minutes --utc)
echo "export KAFKA_HOME=/home/ubuntu/volume/kafka" >>  ~/.bashrc
echo "export PATH=${PATH}:${KAFKA_HOME}/bin" >>  ~/.bashrc

Test run Kafka

Ensure Zookeeper is still running (or restart it) on the three nodes. On all four nodes:

kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties

(It is a good sign if nothing happens.)

Here are some test commands you can run on the different instances:

kafka-topics.sh --create --topic test --replication-factor 2 --partitions 3 --bootstrap-server ${MASTER_NODE}:9092
kafka-topics.sh --list --bootstrap-server ${MASTER_NODE}:9092

Run these two commands on different instances to see that lines you type into the producer console show up in the consumer:

kafka-console-producer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092
kafka-console-consumer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092 --from-beginning

Task 2: Run the full Twitter pipeline from Exercise 3. Tips:

  • You may want to create a virtual environment to install Python packages such as tweepy, kafka-python, and afinn.
  • You need to create keys/bearer_token with restricted access.
  • With the default settings, it can take time before the final join is written to console. Be patient...
  • Start-up sequence from spark-driver:
# assuming zookeeper and kafka run already
# create kafka topics
kafka-topics.sh --create --topic tweets --bootstrap-server localhost:9092
kafka-topics.sh --create --topic media --bootstrap-server localhost:9092
kafka-topics.sh --create --topic photos --bootstrap-server localhost:9092
# optional monitoring
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tweets --from-beginning &
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic media --from-beginning &
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic photos --from-beginning &
# main programs
python twitterpipe_tweet_harvester.py  # run this repeatedly or modify to reconnect after sleeping
python twitterpipe_media_harvester.py &  # this one can run in background
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 twitterpipe_tweet_pipeline.py &

Because you no longer use findspark, you need to use the --packages option with spark-submit. Make sure you use exactly the same package as you did with findspark.add_packages(...).

Web UIs

Assuming that 158.39.77.227 is the IPv4 address of spark-driver, you can access Zookeeper's very simple web UI at http://158.39.77.227:8080/commands/stat .

Kafka has no built in web UI, but third-party UIs are available.