Install Kafka on the cluster: Difference between revisions

From info319
(Created page with "== Install Zookeeper == On each instance, go to https://zookeeper.apache.org/releases.html#download . Download and unpack a recent binary distribution to ''~/volume''. For exa...")
 
 
(15 intermediate revisions by the same user not shown)
Line 7: Line 7:
  ln -fns apache-zookeeper-3.8.0-bin zookeeper
  ln -fns apache-zookeeper-3.8.0-bin zookeeper


== Configure Zookeeper ==
On your local computer, create the file ''zookeeper.properties'':
On your local computer, create the file ''zookeeper.properties'':
  dataDir=/tmp/zookeeper
  dataDir=/home/ubuntu/volume/zookeeper/data
  clientPort=2181
  clientPort=2181
  maxClientCnxns=200
  maxClientCnxns=200
Line 15: Line 16:
  syncLimit=10
  syncLimit=10


From your local computer, upload  
From your local computer, upload:
  scp zookeeper.properties spark-driver:
  scp zookeeper.properties spark-driver:
  scp zookeeper.properties spark-worker-1:
  scp zookeeper.properties spark-worker-1:
  scp zookeeper.properties spark-worker-2:
  scp zookeeper.properties spark-worker-2:
# we are not going to run zookeeper on spark-worker-3 (even number of hosts not allowed)
We will not run zookeeper on spark-worker-3, because the number of hosts must be odd to allow majority voting.
   
   
# you can also run this as a bash-loop:
You can also run this as a bash-loop:
  # for host in spark-{driver,worker-{1,2}}: ; do
  for host in spark-{driver,worker-{1,2}}: ; do
#    scp zookeeper.properties $host: ;
    scp zookeeper.properties $host: ;
  # done
  done


# in any case, you need to run this loop, which sets the "id" of each zookeeper node
In any case, you need to run this loop on your local computer, to set the "id" of each zookeeper (and later kafka) node:
  i=1
  i=1
  for host in spark-{driver,worker-{1,2}}: ;  
  for host in spark-{driver,worker-{1,2,3}}: ;  
     do echo $i > myid; scp myid $host; ((i++));
     do echo $i > myid; scp myid $host; ((i++));
  done
  done


On each instance ''except spark-worker-3'', add addresses from ''/etc/hosts'' to ''zookeeper.properties'':
On each instance ''except spark-worker-3'', add addresses from ''/etc/hosts'' to ''zookeeper.properties'':
  mv zookeeper.properties ~/volume/apache-zookeeper-3.8.0-bin/conf/
  mv ~/zookeeper.properties ~/volume/zookeeper/conf/
  i=1
  i=1
  for line in $(grep spark- /etc/hosts | cut -d" " -f1 | head -3); do
  for line in $(grep spark- /etc/hosts | cut -d" " -f1 | head -3); do
     echo "server.$i=$line:2888:3888"; ((i++));
     echo "server.$i=$line:2888:3888"; ((i++));
  done >> ~/volume/apache-zookeeper-3.8.0-bin/conf/zookeeper.properties
  done >> ~/volume/zookeeper/conf/zookeeper.properties
  mkdir -p /tmp/zookeeper
  mkdir -p /home/ubuntu/volume/zookeeper/data
  mv myid /tmp/zookeeper
  mv ~/myid /home/ubuntu/volume/zookeeper/data


Set ZOOKEEPER_HOME and add bin/ to PATH:
Set ZOOKEEPER_HOME and add bin/ to PATH:
  export ZOOKEEPER_HOME=/home/ubuntu/volume/apache-zookeeper-3.8.0-bin
  export ZOOKEEPER_HOME=/home/ubuntu/volume/zookeeper
  export PATH=${PATH}:${ZOOKEEPER_HOME}/bin
  export PATH=${PATH}:${ZOOKEEPER_HOME}/bin
  cp ~/.bashrc ~/.bashrc-bkp-$(date --iso-8601=minutes --utc)
  cp ~/.bashrc ~/.bashrc-bkp-$(date --iso-8601=minutes --utc)
  echo "export ZOOKEEPER_HOME=/home/ubuntu/volume/apache-zookeeper-3.8.0-bin" >>  ~/.bashrc
  echo "export ZOOKEEPER_HOME=/home/ubuntu/volume/zookeeper" >>  ~/.bashrc
  echo "export PATH=\${PATH}:\${ZOOKEEPER_HOME}/bin" >>  ~/.bashrc
  echo "export PATH=\${PATH}:\${ZOOKEEPER_HOME}/bin" >>  ~/.bashrc


== Test run zookeeper ==
== Test run zookeeper ==
On ''spark-driver'' and ''-worker-1'' and ''-2'':
On ''spark-driver'' and ''-worker-1'' and ''-2'':
  zkServer.sh start ${ZOOKEEPER_HOME}/conf/zookeeper.properties &  # ***
  zkServer.sh start ${ZOOKEEPER_HOME}/conf/zookeeper.properties
This will start zookeeper in the background on each host. Afterwards, check its status:
  zkServer.sh status ${ZOOKEEPER_HOME}/conf/zookeeper.properties
  zkServer.sh status ${ZOOKEEPER_HOME}/conf/zookeeper.properties
In case of trouble, check ${ZOOKEEPER_HOME}/logs/
One of the zookeepers is the leader, the two others are followers. In case of trouble, check ${ZOOKEEPER_HOME}/logs/ .
 
To stop zookeeper, do
zkServer.sh stop ${ZOOKEEPER_HOME}/conf/zookeeper.properties
on each host.


== Install Kafka ==
== Install Kafka ==
Line 66: Line 72:
  ln -fns kafka_2.12-3.3.1 kafka
  ln -fns kafka_2.12-3.3.1 kafka


== Configure Kafka ==
On each instance, configure Kafka:
On each instance, configure Kafka:
  cp volume/kafka_2.12-3.3.1/config/server.properties volume/kafka_2.12-3.3.1/config/server.properties.original
  cp ~/volume/kafka/config/server.properties ~/volume/kafka/config/server.properties.original
  sed -i "s/broker.id=0/broker.id=$(cat /tmp/zookeeper/myid)/g" volume/kafka_2.12-3.3.1/config/server.properties
  sed -i "s/broker.id=/broker.id=$(cat /tmp/zookeeper/myid)/g" ~/volume/kafka/config/server.properties
  export ZOOKEEPER_HOSTLIST=$(grep spark- /etc/hosts | cut -d" " -f1 | head -3 | paste -sd,)
  export ZOOKEEPER_HOSTLIST=$(grep spark- /etc/hosts | cut -d" " -f1 | head -3 | paste -sd,)
  cat "export ZOOKEEPER_HOSTLIST=\$(grep spark- /etc/hosts | cut -d\" \" -f1 | head -3 | paste -sd,)" >> ~/.bashrc
  echo "export ZOOKEEPER_HOSTLIST=\$(grep spark- /etc/hosts | cut -d\" \" -f1 | head -3 | paste -sd,)" >> ~/.bashrc
  sed -i "s/zookeeper.connect=localhost:2181/zookeeper.connect=$ZOOKEEPER_HOSTLIST/g" volume/kafka_2.12-3.3.1/config/server.properties
  sed -i "s/zookeeper.connect=localhost:2181/zookeeper.connect=$ZOOKEEPER_HOSTLIST/g" ~/volume/kafka/config/server.properties
  local_ip=$(ip -4 address | grep -o "^ *inet \(.\+\)\/.\+global.*$" | grep -o "[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+" | head -1) ; \
  local_ip=$(ip -4 address | grep -o "^ *inet \(.\+\)\/.\+global.*$" | grep -o "[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+" | head -1) ; \
  echo "advertised.host.name=$local_ip" >> volume/kafka_2.12-3.3.1/config/server.properties
  echo "advertised.host.name=$local_ip" >> ~/volume/kafka/config/server.properties


Set KAFKA_HOME and add bin/ to PATH:
Set KAFKA_HOME and add bin/ to PATH:
  export KAFKA_HOME=/home/ubuntu/volume/kafka_2.12-3.3.1
  export KAFKA_HOME=/home/ubuntu/volume/kafka
  export PATH=${PATH}:${KAFKA_HOME}/bin
  export PATH=${PATH}:${KAFKA_HOME}/bin
  cp ~/.bashrc ~/.bashrc-bkp-$(date --iso-8601=minutes --utc)
  cp ~/.bashrc ~/.bashrc-bkp-$(date --iso-8601=minutes --utc)
  echo "export KAFKA_HOME=/home/ubuntu/volume/kafka_2.12-3.3.1" >>  ~/.bashrc
  echo "export KAFKA_HOME=/home/ubuntu/volume/kafka" >>  ~/.bashrc
  echo "export PATH=${PATH}:${KAFKA_HOME}/bin" >>  ~/.bashrc
  echo "export PATH=${PATH}:${KAFKA_HOME}/bin" >>  ~/.bashrc


== Test run Kafka ==
== Test run Kafka ==
Ensure Zookeeper is still running. On each node:
Ensure Zookeeper is still running (or restart it) on the three nodes. On all four nodes:
  kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
  kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
(It is a good sign if nothing happens.)
Here are some test commands you can run on the different instances:
kafka-topics.sh --create --topic test --replication-factor 2 --partitions 3 --bootstrap-server ${MASTER_NODE}:9092
kafka-topics.sh --list --bootstrap-server ${MASTER_NODE}:9092


Test commands to run on different instances:
Run these two commands on different instances to see that lines you type into the ''producer'' console show up in the ''consumer'':
kafka-topics.sh --create --topic test --replication-factor 2 --partitions 3 --bootstrap-server 158.39.77.227:9092
  kafka-console-producer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092
kafka-topics.sh --list --bootstrap-server 158.39.77.227:9092
  kafka-console-consumer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092 --from-beginning
  kafka-console-producer.sh --topic test --bootstrap-server 158.39.77.227:9092
  kafka-console-consumer.sh --topic test --bootstrap-server 158.39.77.227:9092
# ... etc ...


Run these two commands on different instances:
'''Task 2:''' Run the full Twitter pipeline from Exercise 3. Tips:
bin/kafka-console-producer.sh --topic test --bootstrap-server 158.39.77.227:9092
* You may want to create a virtual environment to install Python packages such as ''tweepy'', ''kafka-python'', and ''afinn''.
  bin/kafka-console-consumer.sh --topic test --bootstrap-server 158.39.77.227:9092 --from-beginning
* You need to create ''keys/bearer_token'' with restricted access.
* With the default settings, it can take time before the final join is written to console. Be patient...
* Start-up sequence from ''spark-driver'':
# assuming zookeeper and kafka run already
# create kafka topics
kafka-topics.sh --create --topic tweets --bootstrap-server localhost:9092
kafka-topics.sh --create --topic media --bootstrap-server localhost:9092
kafka-topics.sh --create --topic photos --bootstrap-server localhost:9092
# optional monitoring
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tweets --from-beginning &
  kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic media --from-beginning &
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic photos --from-beginning &
# main programs
python twitterpipe_tweet_harvester.py  # run this repeatedly or modify to reconnect after sleeping
python twitterpipe_media_harvester.py &  # this one can run in background
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 twitterpipe_tweet_pipeline.py &
Because you no longer use ''findspark'', you need to use the '''--packages''' option with '''spark-submit'''. Make sure you use exactly the same package as you did with ''findspark.add_packages(...)''.


== Web UIs ==
== Web UIs ==
You can access Spark's web UI at http://158.39.77.227:4040 . But when you run on top of YARN it attempts to redirect to YARN's web UI at http://158.39.77.227:8088 .
Assuming that 158.39.77.227 is the IPv4 address of ''spark-driver'', you can access Zookeeper's very simple web UI at http://158.39.77.227:8080/commands/stat .


You can access HDFS' web UI at http://158.39.77.227:9870 and YARN's web UI at http://158.39.77.227:8088 .
Kafka has no built in web UI, but third-party UIs are available. <!-- If you [https://docs.docker.com/engine/install/ install Docker] on ''spark-driver'', you can run something like:
docker run -it -p 9001:9000 \
    -e KAFKA_BROKERCONNECT=${MASTER_NODE}:9092 \
    obsidiandynamics/kafdrop
When the docker runs, you can access the Kafka UI through a web browser at http://158.39.77.227:9001 (as usual assuming that 158.39.77.227 is the IPv4 address of ''spark-driver'').


You can access zookeeper's very simple web UI at http://158.39.77.227:8080/commands/stat .
You can install docker on ''spark-driver'' like this:
 
sudo apt install docker.io
Kafka has no built in web UI, but third-party UIs are available. If you have Docker ''on your local machine'', you can run:
sudo groupadd docker
docker run -it -p 9000:9000 \
sudo usermod -aG docker ${USER}
    -e KAFKA_BROKERCONNECT=158.39.77.227:9092 \
''Log out ('exit') and then login in again ('ssh spark-driver').''-->
    obsidiandynamics/kafdrop

Latest revision as of 11:15, 1 December 2022

Install Zookeeper

On each instance, go to https://zookeeper.apache.org/releases.html#download . Download and unpack a recent binary distribution to ~/volume. For example:

cd ~/volume
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz
tar zxvf apache-zookeeper-3.8.0-bin.tar.gz
rm apache-zookeeper-3.8.0-bin.tar.gz
ln -fns apache-zookeeper-3.8.0-bin zookeeper

Configure Zookeeper

On your local computer, create the file zookeeper.properties:

dataDir=/home/ubuntu/volume/zookeeper/data
clientPort=2181
maxClientCnxns=200
tickTime=2000
initLimit=20
syncLimit=10

From your local computer, upload:

scp zookeeper.properties spark-driver:
scp zookeeper.properties spark-worker-1:
scp zookeeper.properties spark-worker-2:

We will not run zookeeper on spark-worker-3, because the number of hosts must be odd to allow majority voting.

You can also run this as a bash-loop:

for host in spark-{driver,worker-{1,2}}: ; do
    scp zookeeper.properties $host: ;
done

In any case, you need to run this loop on your local computer, to set the "id" of each zookeeper (and later kafka) node:

i=1
for host in spark-{driver,worker-{1,2,3}}: ; 
    do echo $i > myid; scp myid $host; ((i++));
done

On each instance except spark-worker-3, add addresses from /etc/hosts to zookeeper.properties:

mv ~/zookeeper.properties ~/volume/zookeeper/conf/
i=1
for line in $(grep spark- /etc/hosts | cut -d" " -f1 | head -3); do
    echo "server.$i=$line:2888:3888"; ((i++));
done >> ~/volume/zookeeper/conf/zookeeper.properties
mkdir -p /home/ubuntu/volume/zookeeper/data
mv ~/myid /home/ubuntu/volume/zookeeper/data

Set ZOOKEEPER_HOME and add bin/ to PATH:

export ZOOKEEPER_HOME=/home/ubuntu/volume/zookeeper
export PATH=${PATH}:${ZOOKEEPER_HOME}/bin
cp ~/.bashrc ~/.bashrc-bkp-$(date --iso-8601=minutes --utc)
echo "export ZOOKEEPER_HOME=/home/ubuntu/volume/zookeeper" >>  ~/.bashrc
echo "export PATH=\${PATH}:\${ZOOKEEPER_HOME}/bin" >>  ~/.bashrc

Test run zookeeper

On spark-driver and -worker-1 and -2:

zkServer.sh start ${ZOOKEEPER_HOME}/conf/zookeeper.properties

This will start zookeeper in the background on each host. Afterwards, check its status:

zkServer.sh status ${ZOOKEEPER_HOME}/conf/zookeeper.properties

One of the zookeepers is the leader, the two others are followers. In case of trouble, check ${ZOOKEEPER_HOME}/logs/ .

To stop zookeeper, do

zkServer.sh stop ${ZOOKEEPER_HOME}/conf/zookeeper.properties

on each host.

Install Kafka

On each instance (including spark-worker-3), go to http://kafka.apache.org/downloads . Download and unpack a recent binary distribution to ~/volume. It is practical, but not critical, to install a Kafka that runs on the same Scala version as your Spark. If you want to know the Scala version of your installed Spark, you can check this file:

ls $SPARK_HOME/jars/spark-sql*

For example, .../spark-sql_2.12-3.3.0.jar means that you run Scala 2.12 and Spark 3.3.0.

On each instance:

cd ~/volume
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
tar xzvf kafka_2.12-3.3.1.tgz
rm kafka_2.12-3.3.1.tgz
ln -fns kafka_2.12-3.3.1 kafka

Configure Kafka

On each instance, configure Kafka:

cp ~/volume/kafka/config/server.properties ~/volume/kafka/config/server.properties.original
sed -i "s/broker.id=/broker.id=$(cat /tmp/zookeeper/myid)/g" ~/volume/kafka/config/server.properties
export ZOOKEEPER_HOSTLIST=$(grep spark- /etc/hosts | cut -d" " -f1 | head -3 | paste -sd,)
echo "export ZOOKEEPER_HOSTLIST=\$(grep spark- /etc/hosts | cut -d\" \" -f1 | head -3 | paste -sd,)" >> ~/.bashrc
sed -i "s/zookeeper.connect=localhost:2181/zookeeper.connect=$ZOOKEEPER_HOSTLIST/g" ~/volume/kafka/config/server.properties
local_ip=$(ip -4 address | grep -o "^ *inet \(.\+\)\/.\+global.*$" | grep -o "[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+" | head -1) ; \
echo "advertised.host.name=$local_ip" >> ~/volume/kafka/config/server.properties

Set KAFKA_HOME and add bin/ to PATH:

export KAFKA_HOME=/home/ubuntu/volume/kafka
export PATH=${PATH}:${KAFKA_HOME}/bin
cp ~/.bashrc ~/.bashrc-bkp-$(date --iso-8601=minutes --utc)
echo "export KAFKA_HOME=/home/ubuntu/volume/kafka" >>  ~/.bashrc
echo "export PATH=${PATH}:${KAFKA_HOME}/bin" >>  ~/.bashrc

Test run Kafka

Ensure Zookeeper is still running (or restart it) on the three nodes. On all four nodes:

kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties

(It is a good sign if nothing happens.)

Here are some test commands you can run on the different instances:

kafka-topics.sh --create --topic test --replication-factor 2 --partitions 3 --bootstrap-server ${MASTER_NODE}:9092
kafka-topics.sh --list --bootstrap-server ${MASTER_NODE}:9092

Run these two commands on different instances to see that lines you type into the producer console show up in the consumer:

kafka-console-producer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092
kafka-console-consumer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092 --from-beginning

Task 2: Run the full Twitter pipeline from Exercise 3. Tips:

  • You may want to create a virtual environment to install Python packages such as tweepy, kafka-python, and afinn.
  • You need to create keys/bearer_token with restricted access.
  • With the default settings, it can take time before the final join is written to console. Be patient...
  • Start-up sequence from spark-driver:
# assuming zookeeper and kafka run already
# create kafka topics
kafka-topics.sh --create --topic tweets --bootstrap-server localhost:9092
kafka-topics.sh --create --topic media --bootstrap-server localhost:9092
kafka-topics.sh --create --topic photos --bootstrap-server localhost:9092
# optional monitoring
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tweets --from-beginning &
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic media --from-beginning &
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic photos --from-beginning &
# main programs
python twitterpipe_tweet_harvester.py  # run this repeatedly or modify to reconnect after sleeping
python twitterpipe_media_harvester.py &  # this one can run in background
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 twitterpipe_tweet_pipeline.py &

Because you no longer use findspark, you need to use the --packages option with spark-submit. Make sure you use exactly the same package as you did with findspark.add_packages(...).

Web UIs

Assuming that 158.39.77.227 is the IPv4 address of spark-driver, you can access Zookeeper's very simple web UI at http://158.39.77.227:8080/commands/stat .

Kafka has no built in web UI, but third-party UIs are available.