rodolfo bandeira

software, electronics, security, devops, thoughts

Zookeeper and Apache Kafka with Ruby

Time to check this distributed queue system idea using Zookeeper and Apache Kafka. Let me guide you from installing it on Ubuntu 16.04 and finally testing a producer and consumer using Ruby.

There we go:

1) Install Zookeeper: sudo apt-get install zookeeper

2) Downloading Apache Kafka:


3) Installing Kafka:

sudo mkdir /usr/local/kafka

sudo tar -xvf kafka_2.11- -C /usr/local/kafka/

4) Special config to make it work on a single node:

sudo vim /usr/local/kafka/kafka_2.11-

port = 9092 = localhost

5) Starting Apache Kafka in background. Notice that you DON’T need sudo.

nohup /usr/local/kafka/kafka_2.11- /usr/local/kafka/kafka_2.11- &

6) Let’s create a iptables simple firewall allowing only localhost to connect to our Zookeeper and Kafka. Remember that the idea of Zookeeper and Kafta is having a distributed queue system where you can spread your queue on multiple machines. (Huge stuff brow); So, since the idea here is just study using one single machine on digital ocean, here is how to block h4ck3rs from internet connecting to your toys.



echo "Closing Kafka and Zookeeper ports from external world. Allowing just locally."

iptables -A INPUT -p tcp --dport 2181 -s -j ACCEPT
iptables -A INPUT -p tcp --dport 44337 -s -j ACCEPT
iptables -A INPUT -p tcp --dport 34232 -s -j ACCEPT
iptables -A INPUT -p tcp --dport 9092 -s -j ACCEPT
iptables -A INPUT -p tcp --dport 2181 -j DROP
iptables -A INPUT -p tcp --dport 44337 -j DROP
iptables -A INPUT -p tcp --dport 34232 -j DROP
iptables -A INPUT -p tcp --dport 9092 -j DROP

Cleaning iptables before starting our rules. Careful if you have previous rules!! You can check the rules with sudo iptables -L

sudo iptables -F

sudo ./

Now testing using ruby:

gem install ruby-kafka

git clone

cd ruby-kafka

ruby producer.rb (Repete 3 times)

ruby consumer.rb (Yeahh)

Hello, World! 617f4877-e34d-4bab-9993-4f95da626549

Hello, World! f14e791a-092f-4100-b35d-c51d716e5e57

Hello, World! 87c302aa-df8e-41fa-88b5-657148f029d1


require 'kafka'
require 'securerandom'

kafka =
  seed_brokers: [''],
  client_id: 'my-application',

kafka.deliver_message("Hello, World! #{SecureRandom.uuid}", topic: 'greetings')


require 'kafka'

kafka = ['localhost:9092'])

consumer = kafka.consumer(
  group_id: 'my-consumer',

  # Increase offset commit frequency to once every 5 seconds.
  offset_commit_interval: 5,

  # Commit offsets when 100 messages have been processed.
  offset_commit_threshold: 100,

  # Increase the length of time that committed offsets are kept.
  offset_retention_time: 7 * 60 * 60


trap("TERM") { consumer.stop }

consumer.each_message(automatically_mark_as_processed: false) do |message|
  puts message.offset, message.key, message.value


Hope you enjoy!