Install Kafka on the cluster: Difference between revisions
(13 intermediate revisions by the same user not shown) | |||
Line 7: | Line 7: | ||
ln -fns apache-zookeeper-3.8.0-bin zookeeper | ln -fns apache-zookeeper-3.8.0-bin zookeeper | ||
== Configure Zookeeper == | |||
On your local computer, create the file ''zookeeper.properties'': | On your local computer, create the file ''zookeeper.properties'': | ||
dataDir=/ | dataDir=/home/ubuntu/volume/zookeeper/data | ||
clientPort=2181 | clientPort=2181 | ||
maxClientCnxns=200 | maxClientCnxns=200 | ||
Line 15: | Line 16: | ||
syncLimit=10 | syncLimit=10 | ||
From your local computer, upload | From your local computer, upload: | ||
scp zookeeper.properties spark-driver: | scp zookeeper.properties spark-driver: | ||
scp zookeeper.properties spark-worker-1: | scp zookeeper.properties spark-worker-1: | ||
Line 26: | Line 27: | ||
done | done | ||
In any case, you need to run this loop on your local computer, to set the "id" of each zookeeper (and later kafka) node: | |||
i=1 | i=1 | ||
for host in spark-{driver,worker-{1,2,3}}: ; | for host in spark-{driver,worker-{1,2,3}}: ; | ||
Line 38: | Line 39: | ||
echo "server.$i=$line:2888:3888"; ((i++)); | echo "server.$i=$line:2888:3888"; ((i++)); | ||
done >> ~/volume/zookeeper/conf/zookeeper.properties | done >> ~/volume/zookeeper/conf/zookeeper.properties | ||
mkdir -p / | mkdir -p /home/ubuntu/volume/zookeeper/data | ||
mv ~/myid / | mv ~/myid /home/ubuntu/volume/zookeeper/data | ||
Set ZOOKEEPER_HOME and add bin/ to PATH: | Set ZOOKEEPER_HOME and add bin/ to PATH: | ||
Line 47: | Line 48: | ||
echo "export ZOOKEEPER_HOME=/home/ubuntu/volume/zookeeper" >> ~/.bashrc | echo "export ZOOKEEPER_HOME=/home/ubuntu/volume/zookeeper" >> ~/.bashrc | ||
echo "export PATH=\${PATH}:\${ZOOKEEPER_HOME}/bin" >> ~/.bashrc | echo "export PATH=\${PATH}:\${ZOOKEEPER_HOME}/bin" >> ~/.bashrc | ||
== Test run zookeeper == | == Test run zookeeper == | ||
Line 75: | Line 54: | ||
This will start zookeeper in the background on each host. Afterwards, check its status: | This will start zookeeper in the background on each host. Afterwards, check its status: | ||
zkServer.sh status ${ZOOKEEPER_HOME}/conf/zookeeper.properties | zkServer.sh status ${ZOOKEEPER_HOME}/conf/zookeeper.properties | ||
One of the zookeepers is the leader, the two others are followers. In case of trouble, check ${ZOOKEEPER_HOME}/logs/ . | |||
To stop zookeeper, do | To stop zookeeper, do | ||
zkServer.sh | zkServer.sh stop ${ZOOKEEPER_HOME}/conf/zookeeper.properties | ||
on each host. | on each host. | ||
Line 93: | Line 72: | ||
ln -fns kafka_2.12-3.3.1 kafka | ln -fns kafka_2.12-3.3.1 kafka | ||
== Configure Kafka == | |||
On each instance, configure Kafka: | On each instance, configure Kafka: | ||
cp ~/volume/kafka/config/server.properties ~/volume/kafka/config/server.properties.original | cp ~/volume/kafka/config/server.properties ~/volume/kafka/config/server.properties.original | ||
sed -i "s/broker.id= | sed -i "s/broker.id=/broker.id=$(cat /tmp/zookeeper/myid)/g" ~/volume/kafka/config/server.properties | ||
export ZOOKEEPER_HOSTLIST=$(grep spark- /etc/hosts | cut -d" " -f1 | head -3 | paste -sd,) | export ZOOKEEPER_HOSTLIST=$(grep spark- /etc/hosts | cut -d" " -f1 | head -3 | paste -sd,) | ||
echo "export ZOOKEEPER_HOSTLIST=\$(grep spark- /etc/hosts | cut -d\" \" -f1 | head -3 | paste -sd,)" >> ~/.bashrc | echo "export ZOOKEEPER_HOSTLIST=\$(grep spark- /etc/hosts | cut -d\" \" -f1 | head -3 | paste -sd,)" >> ~/.bashrc | ||
Line 114: | Line 94: | ||
(It is a good sign if nothing happens.) | (It is a good sign if nothing happens.) | ||
Here are some test commands you can run on the different instances: | |||
kafka-topics.sh --create --topic test --replication-factor 2 --partitions 3 --bootstrap-server ${MASTER_NODE}:9092 | kafka-topics.sh --create --topic test --replication-factor 2 --partitions 3 --bootstrap-server ${MASTER_NODE}:9092 | ||
kafka-topics.sh --list --bootstrap-server ${MASTER_NODE}:9092 | kafka-topics.sh --list --bootstrap-server ${MASTER_NODE}:9092 | ||
Run these two commands on different instances: | Run these two commands on different instances to see that lines you type into the ''producer'' console show up in the ''consumer'': | ||
kafka-console-producer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092 | |||
kafka-console-consumer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092 --from-beginning | |||
'''Task 2:''' Run the full Twitter pipeline from Exercise 3. Tips: | |||
* You may want to create a virtual environment to install Python packages such as ''tweepy'', ''kafka-python'', and ''afinn''. | |||
* You need to create ''keys/bearer_token'' with restricted access. | |||
* With the default settings, it can take time before the final join is written to console. Be patient... | |||
* Start-up sequence from ''spark-driver'': | |||
# assuming zookeeper and kafka run already | |||
# create kafka topics | |||
kafka-topics.sh --create --topic tweets --bootstrap-server localhost:9092 | |||
kafka-topics.sh --create --topic media --bootstrap-server localhost:9092 | |||
kafka-topics.sh --create --topic photos --bootstrap-server localhost:9092 | |||
# optional monitoring | |||
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tweets --from-beginning & | |||
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic media --from-beginning & | |||
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic photos --from-beginning & | |||
# main programs | |||
python twitterpipe_tweet_harvester.py # run this repeatedly or modify to reconnect after sleeping | |||
python twitterpipe_media_harvester.py & # this one can run in background | |||
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 twitterpipe_tweet_pipeline.py & | |||
Because you no longer use ''findspark'', you need to use the '''--packages''' option with '''spark-submit'''. Make sure you use exactly the same package as you did with ''findspark.add_packages(...)''. | |||
== Web UIs == | == Web UIs == | ||
Assuming that 158.39.77.227 is the IPv4 address of ''spark-driver'', you can access | Assuming that 158.39.77.227 is the IPv4 address of ''spark-driver'', you can access Zookeeper's very simple web UI at http://158.39.77.227:8080/commands/stat . | ||
Kafka has no built in web UI, but third-party UIs are available. <!-- If you [https://docs.docker.com/engine/install/ install Docker] on ''spark-driver'', you can run something like: | Kafka has no built in web UI, but third-party UIs are available. <!-- If you [https://docs.docker.com/engine/install/ install Docker] on ''spark-driver'', you can run something like: |
Latest revision as of 11:15, 1 December 2022
Install Zookeeper
On each instance, go to https://zookeeper.apache.org/releases.html#download . Download and unpack a recent binary distribution to ~/volume. For example:
cd ~/volume wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz tar zxvf apache-zookeeper-3.8.0-bin.tar.gz rm apache-zookeeper-3.8.0-bin.tar.gz ln -fns apache-zookeeper-3.8.0-bin zookeeper
Configure Zookeeper
On your local computer, create the file zookeeper.properties:
dataDir=/home/ubuntu/volume/zookeeper/data clientPort=2181 maxClientCnxns=200 tickTime=2000 initLimit=20 syncLimit=10
From your local computer, upload:
scp zookeeper.properties spark-driver: scp zookeeper.properties spark-worker-1: scp zookeeper.properties spark-worker-2:
We will not run zookeeper on spark-worker-3, because the number of hosts must be odd to allow majority voting.
You can also run this as a bash-loop:
for host in spark-{driver,worker-{1,2}}: ; do scp zookeeper.properties $host: ; done
In any case, you need to run this loop on your local computer, to set the "id" of each zookeeper (and later kafka) node:
i=1 for host in spark-{driver,worker-{1,2,3}}: ; do echo $i > myid; scp myid $host; ((i++)); done
On each instance except spark-worker-3, add addresses from /etc/hosts to zookeeper.properties:
mv ~/zookeeper.properties ~/volume/zookeeper/conf/ i=1 for line in $(grep spark- /etc/hosts | cut -d" " -f1 | head -3); do echo "server.$i=$line:2888:3888"; ((i++)); done >> ~/volume/zookeeper/conf/zookeeper.properties mkdir -p /home/ubuntu/volume/zookeeper/data mv ~/myid /home/ubuntu/volume/zookeeper/data
Set ZOOKEEPER_HOME and add bin/ to PATH:
export ZOOKEEPER_HOME=/home/ubuntu/volume/zookeeper export PATH=${PATH}:${ZOOKEEPER_HOME}/bin cp ~/.bashrc ~/.bashrc-bkp-$(date --iso-8601=minutes --utc) echo "export ZOOKEEPER_HOME=/home/ubuntu/volume/zookeeper" >> ~/.bashrc echo "export PATH=\${PATH}:\${ZOOKEEPER_HOME}/bin" >> ~/.bashrc
Test run zookeeper
On spark-driver and -worker-1 and -2:
zkServer.sh start ${ZOOKEEPER_HOME}/conf/zookeeper.properties
This will start zookeeper in the background on each host. Afterwards, check its status:
zkServer.sh status ${ZOOKEEPER_HOME}/conf/zookeeper.properties
One of the zookeepers is the leader, the two others are followers. In case of trouble, check ${ZOOKEEPER_HOME}/logs/ .
To stop zookeeper, do
zkServer.sh stop ${ZOOKEEPER_HOME}/conf/zookeeper.properties
on each host.
Install Kafka
On each instance (including spark-worker-3), go to http://kafka.apache.org/downloads . Download and unpack a recent binary distribution to ~/volume. It is practical, but not critical, to install a Kafka that runs on the same Scala version as your Spark. If you want to know the Scala version of your installed Spark, you can check this file:
ls $SPARK_HOME/jars/spark-sql*
For example, .../spark-sql_2.12-3.3.0.jar means that you run Scala 2.12 and Spark 3.3.0.
On each instance:
cd ~/volume wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz tar xzvf kafka_2.12-3.3.1.tgz rm kafka_2.12-3.3.1.tgz ln -fns kafka_2.12-3.3.1 kafka
Configure Kafka
On each instance, configure Kafka:
cp ~/volume/kafka/config/server.properties ~/volume/kafka/config/server.properties.original sed -i "s/broker.id=/broker.id=$(cat /tmp/zookeeper/myid)/g" ~/volume/kafka/config/server.properties export ZOOKEEPER_HOSTLIST=$(grep spark- /etc/hosts | cut -d" " -f1 | head -3 | paste -sd,) echo "export ZOOKEEPER_HOSTLIST=\$(grep spark- /etc/hosts | cut -d\" \" -f1 | head -3 | paste -sd,)" >> ~/.bashrc sed -i "s/zookeeper.connect=localhost:2181/zookeeper.connect=$ZOOKEEPER_HOSTLIST/g" ~/volume/kafka/config/server.properties local_ip=$(ip -4 address | grep -o "^ *inet \(.\+\)\/.\+global.*$" | grep -o "[0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+" | head -1) ; \ echo "advertised.host.name=$local_ip" >> ~/volume/kafka/config/server.properties
Set KAFKA_HOME and add bin/ to PATH:
export KAFKA_HOME=/home/ubuntu/volume/kafka export PATH=${PATH}:${KAFKA_HOME}/bin cp ~/.bashrc ~/.bashrc-bkp-$(date --iso-8601=minutes --utc) echo "export KAFKA_HOME=/home/ubuntu/volume/kafka" >> ~/.bashrc echo "export PATH=${PATH}:${KAFKA_HOME}/bin" >> ~/.bashrc
Test run Kafka
Ensure Zookeeper is still running (or restart it) on the three nodes. On all four nodes:
kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
(It is a good sign if nothing happens.)
Here are some test commands you can run on the different instances:
kafka-topics.sh --create --topic test --replication-factor 2 --partitions 3 --bootstrap-server ${MASTER_NODE}:9092 kafka-topics.sh --list --bootstrap-server ${MASTER_NODE}:9092
Run these two commands on different instances to see that lines you type into the producer console show up in the consumer:
kafka-console-producer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092 kafka-console-consumer.sh --topic test --bootstrap-server ${MASTER_NODE}:9092 --from-beginning
Task 2: Run the full Twitter pipeline from Exercise 3. Tips:
- You may want to create a virtual environment to install Python packages such as tweepy, kafka-python, and afinn.
- You need to create keys/bearer_token with restricted access.
- With the default settings, it can take time before the final join is written to console. Be patient...
- Start-up sequence from spark-driver:
# assuming zookeeper and kafka run already # create kafka topics kafka-topics.sh --create --topic tweets --bootstrap-server localhost:9092 kafka-topics.sh --create --topic media --bootstrap-server localhost:9092 kafka-topics.sh --create --topic photos --bootstrap-server localhost:9092 # optional monitoring kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tweets --from-beginning & kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic media --from-beginning & kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic photos --from-beginning & # main programs python twitterpipe_tweet_harvester.py # run this repeatedly or modify to reconnect after sleeping python twitterpipe_media_harvester.py & # this one can run in background spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 twitterpipe_tweet_pipeline.py &
Because you no longer use findspark, you need to use the --packages option with spark-submit. Make sure you use exactly the same package as you did with findspark.add_packages(...).
Web UIs
Assuming that 158.39.77.227 is the IPv4 address of spark-driver, you can access Zookeeper's very simple web UI at http://158.39.77.227:8080/commands/stat .
Kafka has no built in web UI, but third-party UIs are available.