Using racecar to Consume Kafka messages in Rails

In the bundle console, run Waterdrop setup:

WaterDrop.setup do |config|
  config.deliver = true
  config.kafka.seed_brokers = %w[kafka://localhost:9092]
end

You can see Waterdrop has default settings for Kafka configurations:

{:client_id=>"waterdrop", :logger=>#<NullLogger:0x0055731e74f9c8>, :monitor=>#<WaterDrop::Instrumentation::Monitor:0x0055731e74e8c0 @id=:waterdrop, @clock=#<Dry::Monitor::Clock:0x0055731f08b370>, @__bus__=#<Dry::Events::Bus:0x0055731e74cd68 @listeners=#<Concurrent::Map:0x0055731e74d2e0 entries=0 default_proc=#<Proc:0x0055731f0cac50@/var/lib/gems/2.3.0/gems/dry-events-0.1.0/lib/dry/events/constants.rb:8>>, @events=#<Concurrent::Map:0x0055731e74d740 entries=4 default_proc=nil>>>, :deliver=>true, :raise_on_buffer_overflow=>true, :kafka=>{:seed_brokers=>["kafka://localhost:9092"], :connect_timeout=>10, :socket_timeout=>30, :max_buffer_bytesize=>10000000, :max_buffer_size=>1000, :max_queue_size=>1000, :ack_timeout=>5, :delivery_interval=>10, :delivery_threshold=>100, :max_retries=>2, :required_acks=>-1, :retry_backoff=>1, :compression_threshold=>1, :compression_codec=>nil, :ssl_ca_cert=>nil, :ssl_ca_cert_file_path=>nil, :ssl_ca_certs_from_system=>false, :ssl_client_cert=>nil, :ssl_client_cert_key=>nil, :sasl_gssapi_principal=>nil, :sasl_gssapi_keytab=>nil, :sasl_plain_authzid=>"", :sasl_plain_username=>nil, :sasl_plain_password=>nil, :sasl_scram_username=>nil, :sasl_scram_password=>nil, :sasl_scram_mechanism=>nil}}

There are lot of Kafka configuration parameters. You can see the acks defaults to -1. This means the leader partition got the message and the message is replicated to the followers. This setting will result is higher latency than 0 or 1. The 0 means as soon as the message is sent, it is considered to be successful. The 1 means the leader got the message. You can also see we are not using SSL or SASL for securing the Kafka. We can send a message.

WaterDrop::AsyncProducer.call('Hi water drop', topic: 'test-top')

You have to install development tools first. On my Ubuntu VM:

sudo apt-get install git-core curl zlib1g-dev build-essential libssl-dev libreadline-dev libyaml-dev libsqlite3-dev sqlite3 libxml2-dev libxslt1-dev libcurl4-openssl-dev software-properties-common libffi-dev nodejs yarn

We can now install rails gem. Add the racecar gem to the Gemfile and run bundle. Create the racecar configuration file.

bundle exec rails g racecar:install

The generated config file:

# These config values will be shared by all environments but can be overridden.
common: &common
  client_id: "rafk"

development:
  <<: *common
  brokers:
    - localhost:9092

test:
  <<: *common
  brokers:
    - localhost:9092

production:
  <<: *common
  brokers:
    - kafka1.myapp.com:9092
    - kafka2.myapp.com:9092
    - kafka3.myapp.com:9092

On interesting thing is that you can change the localhost to kafka and it will work. You can generate a consumer class by using the racecar generator.

bundle exec rails g racecar:consumer Print

This will generate the print_consumer.rb that extends from Racecar::Consumer class.

class PrintConsumer < Racecar::Consumer
  subscribes_to "test-top"

  def process(message)
    puts "Received message: #{message.value}"
  end
end

I have changed the topic name to test-top. We can read the message from the queue using this consumer.

bundle exec racecar PrintConsumer

This will output:

=> Starting Racecar consumer PrintConsumer...
=> Detected Rails, booting application...
=> Wrooooom!
=> Ctrl-C to shutdown consumer
I, [2019-02-04T01:53:37.132672 #14065]  INFO -- : New topics added to target list: test-top
I, [2019-02-04T01:53:37.132820 #14065]  INFO -- : Fetching cluster metadata from kafka://localhost:9092
D, [2019-02-04T01:53:37.132963 #14065] DEBUG -- : Opening connection to localhost:9092 with client id rafk...
D, [2019-02-04T01:53:37.133904 #14065] DEBUG -- : Sending topic_metadata API request 1 to localhost:9092
D, [2019-02-04T01:53:37.134239 #14065] DEBUG -- : Waiting for response 1 from localhost:9092
D, [2019-02-04T01:53:37.145499 #14065] DEBUG -- : Received response 1 from localhost:9092
I, [2019-02-04T01:53:37.145709 #14065]  INFO -- : Discovered cluster metadata; nodes: localhost:9092 (node_id=0)
D, [2019-02-04T01:53:37.145799 #14065] DEBUG -- : Closing socket to localhost:9092
I, [2019-02-04T01:53:37.146486 #14065]  INFO -- : Joining group `print-consumer`

Received message: Hi water drop

The output shows the message in the test topic that has one message. We can also see that it joins a consumer group named print-consumer. We can run many instances of our consumer and they will join this consumer group. In a production environment, they will be run on different hosts to achieve fail over.

Running Multiple Consumers in a Group

We can put multiple messages and see what happens when we run multiple consumers.

10.times do |index|
  WaterDrop::AsyncProducer.call("Counting #{index}", topic: 'test-top')
end

You can run multiple consumers by running the command:

bundle exec racecar PrintConsumer

multiple times. I ran two print consumers. But I only saw messages consumed in one of the consumer. The other consumer was always idle. To troubleshoot this problem, let's install kafkacat.

apt-get install kafkacat

We can now get the metadata for all topics:

kafkacat -L -b localhost
Metadata for all topics (from broker -1: localhost:9092/bootstrap):
 1 brokers:
  broker 0 at localhost:9092
 6 topics:
  topic "test-topic" with 1 partitions:

As you can see from this output, there is only one partition for the test-topic. Remember that we cannot have more consumers than the partitions in a topic. This is the reason that we did not see any messages being consumed in the second print consumer. The topic was created by the waterdrop gem when we published a message to it. By default the number of partition is one. We can always increase the number of partitions but we cannot reduce the number of partitions.

References


Related Articles