Friday, August 21, 2020

Processing Large Messages with Apache Kafka

 Kafka was not built for large messages. Period. Nevertheless, more and more projects send and process 1Mb, 10Mb, and even much bigger files and other large payloads via Kafka. One reason is that Kafka was designed for large volume/throughput - which is required for large messages. This article covers the use cases, architectures, and trade-offs for handling large messages with Kafka.

Use Cases for Large (Kafka) Message Payloads

Various use cases for large message payloads exist: Image recognition, video analytics, audio analytics, and file processing are widespread examples.

Image Recognition and Video Analytics

Image recognition and video analytics (also known as computer vision) is probably the number one use case. Many examples require the analysis of videos in real-time, including:

  • Security and surveillance (access control, intrusion detection, motion detection)
  • Transport monitoring system (vehicle traffic detection, incidence detection, pedestrian monitoring)
  • Healthcare (health status monitoring, telemedicine, surgical video analysis)
  • Manufacturing (machine vision for quality assurance, augmented support and training)

The usage of image and video processing via concepts such as Computer Vision (e.g., OpenCV) or Deep Learning / Neural Networks (e.g., TensorFlow) reduces time, cost, and human effort, plus this makes industries more secure, reliable, and consistent.

Audio Analytics

Audio analytics is an interesting use case, coming up more and more:

  • In conjunction with video analytics: See the use cases above. Often video and audio need to be processed together.
  • Consumer IoT (CIoT): Alerting, informing, advising people, e.g., using Audio Analytic.
  • Industrial IoT (IIoT): Machine diagnostics and predictive maintenance using advanced sound analysis, e.g., using Neuron Soundware
  • Natural Language Processing (NLP): Chatbots and other modern systems use text and speech translation, e.g., using the fully-managed services from the major cloud providers

Big Data File Processing

Last but not least, the processing of big files received in batch-mode will not go away any time soon. But big files can be incorporated into a modern event streaming workflow for decoupling/separation of concerns, connectivity to various sinks. And it allows data processing in real-time and batch simultaneously.

Legacy systems will provide data sources like big CSV or proprietary files or snapshots/exports from databases that need to be integrated. Data processing includes streaming applications (such as Kafka Streams, ksqlDB, or Apache Flink) to continuously process, correlate, and analyze events from different data sources. Data sources such as Hadoop or Spark processed incoming data in batch mode (e.g., map/reduce, shuffling). Other data sources such as data warehouse (e.g., Snowflake) or text search (e.g., Elasticsearch) ingest data in near-real-time.

What Kafka is NOT

After exploring use cases for large message payloads, let's clarify what Kafka is not:

Kafka is usually not the right technology to store and process large files (images, videos, proprietary files, etc.) as a whole. Products were built specifically for these use cases.

For instance, a Content Delivery Network (CDN) such as Akamai, Limelight Networks, or Amazon CloudFront distribute video streams and other software downloads across the globe. Or "big file editing and processing" (like a video processing tool). Or video editing tools from Adobe, Autodesk, Camtasia, and many other vendors are used to structure and present all video information, including films and television shows, video advertisements, and video essays.

Let's take a look at one example which combines Kafka and these other tools:

Netflix processes over 6 Petabytes per day with Kafka. However, this is "just" for message orchestration, coordination, data integration, data preprocessing, ingestion into data lakes, building stateless and stateful business applications, and other use cases. But Kafka is not used to sharing and storing all the shows and movies you watch on your TV or tablet. A Content Delivery Network (CDN) like Akamai is used in conjunction with other tools and products to provide you the excellent video streaming experience you know.

Okay, Kafka is not the right tool to store and process large files as a whole, like a CDN or video editing tool. Why, when, and how should you handle large message payloads with Kafka then? And what is a "large message" in Kafka terms?

Features and Limitations of using Kafka for Large Messages

Originally, Kafka was not built for processing large messages and files. This does not mean that you cannot do it!

Kafka limits the max size of messages. The default value of the broker configuration' ' message.max.bytes' is 1MB.

Why does Kafka limit the message size by default?

  • Different sizing, configuration, and tuning required for large message handling compared to a mission-critical real-time cluster with low latency.
  • Large messages increase the memory pressure on the broker JVM.
  • Large messages are expensive to handle and could slow down the brokers.
  • A reasonable message size limit can meet the requirements of most use cases.
  • Good workarounds exist if you need to handle large messages.
  • Most cloud offerings don't allow large messages.

There are noticeable performance impacts from increasing the allowable message size.

Hence, understand all alternatives discussed below before sending messages >1Mb through your Kafka cluster. Depending on your SLAs for uptime and latency, a separate Kafka cluster should be considered for processing large messages.

Having said this, I have seen customers processing messages far bigger than 10Mb with Kafka. It is valid to evaluate Kafka for processing large messages instead of using another tool for that (often in conjunction with Kafka).

LinkedIn talked a long time ago about the pros and cons of two different approaches: Using 'Kafka only' vs. 'Kafka in conjunction with another data storage'. Especially outside the public cloud, most enterprises cannot simply use an S3 object store for big data. Therefore, the question comes up if one system (Kafka) is good enough, or if you should invest in two systems (Kafka and external storage).

Let's take a look at the trade-offs for using Kafka for large messages.

Kafka for Large Messages – Alternatives and Trade-Offs

There is no single best solution. The decision on how to handle large messages with Kafka depends on your use cases, SLAs, and already existing infrastructure.

The following three available alternatives exist to handle large messages with Kafka:

  • Reference-based messaging in Kafka and external storage
  • In-line large message support in Kafka without external storage
  • In-line large message support and tiered storage in Kafka

Here are the characteristics and pros/cons of each approach (this is an extension from a LinkedIn presentation in 2016):

Apache Kafka for large message payloads and files - Alternatives and Trade-offs

Also, don't underestimate the power of compression for large messages. Some big files like CSV or XML can reduce its size significantly just by setting the compression parameter to use GZIP, Snappy, or LZ4.

Even a 1GB file could be sent via Kafka, but this is undoubtedly not what Kafka was designed for. In both the client and the broker, a 1GB chunk of memory will need to be allocated in JVM for every 1GB message. Hence, in most cases, for really large files, it is better to externalize them into an object store and use Kafka just for the metadata. 

You need to define what is 'a large message' by yourself and when to use which of the design patterns discussed in this blog post. That's why I am writing this up here... :-)

The following sections explore these alternatives in more detail. Before we start, let's explain the general concept of Tiered Storage for Kafka mentioned in the above table. Many readers might not be aware of this yet.

Tiered Storage for Kafka

Kafka data is mostly consumed in a streaming fashion using tail reads. Tail reads leverage OS's page cache to serve the data instead of disk reads. Older data is typically read from the disk for backfill or failure recovery purposes and is infrequent.

In the tiered storage approach, the Kafka cluster is configured with two tiers of storage - local and remote. Local tier is the same as the current Kafka that uses the local disks on the Kafka brokers to store the log segments. The new remote tier uses an external storage system such as AWS S3, GCS, or MinIO to store the completed log segments. Two separate retention periods are defined corresponding to each of the tiers.

With remote tier enabled, the retention period for the local tier can be significantly reduced from days to few hours. The retention period for remote tier can be much longer, months, or even years.

Tiered Storage for Kafka allows scaling storage independent of memory and CPUs in a Kafka cluster, enabling Kafka to be a long-term storage solution. This also reduces the amount of data stored locally on Kafka brokers and hence the amount of data that needs to be copied during recovery and rebalancing.

The consumer API does not change at all. Kafka applications consume data as before. They don't even know if Tiered Storage is used under the hood.

Confluent Tiered Storage

Confluent Tiered Storage is available today in Confluent Platform and used under the hood in Confluent Cloud:

Confluent Tiered Storage for Kafka

From an infrastructure perspective, Confluent Tiered Storage required an external (object) storage like AWS S3, GCS, or MinIO. But from operations and development perspective, the complexity of end-to-end communication and separation of messages and files is provided out-of-the-box under the hood.

KIP-405 - Add Tiered Storage to Kafka

KIP-405 –  Add Tiered Storage Support to Kafka is also in the works. Confluent is actively working on this with the open-source community. Uber is leading this initiative.

Kafka + Tiered Storage is an exciting option (in some use cases) for handling large messages. It provides a single infrastructure to the operator, but also cost savings and better elasticity.

We now understand the technical feasibility of handling large message payloads with Kafka. Let's now discuss the different use cases and architectures in more detail.

Use Cases and Architectures using Kafka for Large Message Payloads

The processing of the content of your large message payload depends on the technical use case. Do you want to

  • Send an image to analyze or enhance it?
  • Stream a video to a remote consumer application?
  • Analyze audio noise in real-time?
  • Process a structured (i.e., splittable) file line-by-line?
  • Send an unstructured (i.e., non-splittable) file to a consumer tool to process it?

I cover a few use cases for handling large messages:

  • Manufacturing: Quality assurance in production lines deployed at the edge in the factory
  • Retailing: Augmented reality for better customer experience and cross/up-selling
  • Pharma and Life Sciences: Image processing and machine learning for drug discovery
  • Public sector: Security and surveillance
  • Media: Content delivery of large video files
  • Banking: Attachments in a chat application for customer service

The following sections explore these use cases with different architectural approaches to process large message payloads with Apache Kafka to discuss their pros and cons:

  1. Kafka-native payload processing
  2. Chunk and re-assemble
  3. Metadata in Kafka and linking to external storage
  4. Externalizing large payloads on-the-fly

Kafka for Large Message Payloads – Image Processing

Computer vision and image recognition are used in many industries, including automotive, manufacturing, healthcare, retailing, and innovative "silicon valley use cases". Image processing includes tools such as OpenCV but also technologies implementing deep learning algorithms such as Convolutional Neural Networks (CNN).

Let's take a look at a few examples from different industries.

Kafka-native Image Processing for Machine Vision in Manufacturing

Machine Vision is the technology and methods used to provide imaging-based automatic inspection and analysis for such applications as automated inspection, process control, and robot guidance, usually in industry.

A Kafka-native machine vision implementation sends images from cameras to Kafka. Preprocessing adds metadata and correlation it with data from other backend systems. The message is then consumed by one or more applications:

Kafka-native Image Processing

Image processing and machine learning for drug discovery in Pharma and Life Sciences

"On average, it takes at least ten years for a new medicine to complete the journey from initial discovery to the marketplace," said PhRMA.

Here is one example where event streaming at scale in real-time speeds up this process significantly.

Recursion had several technical challenges. Their drug discovery process was manual and slow, bursty batch mode, not scalable:

Drug Discovery in manual and slow, bursty batch mode, not scalable

To solve these challenges, Recursion leveraged Kafka and its ecosystem to built a massively parallel system that combines experimental biology, artificial intelligence, automation, and real-time event streaming to accelerate drug discovery:

Drug Discovery in automated, scalable, reliable real time Mode

Check out Recusion's Kafka Summit talk to learn more details.

I see plenty of customers in various industries implementing scalable real-time machine learning infrastructures with the Kafka ecosystem. Related to the above use case, I explored more details in the blog post "Apache Kafka and Event Streaming in Pharma and Life Sciences". The following shows a potential ML infrastructure:

Streaming Analytics for Drug Discovery in Real Time at Scale with Apache Kafka

Kafka-native Image Recognition for Augmented Reality in Retailing

Augmented reality (AR) is an interactive experience of a real-world environment where the objects that reside in the real world are enhanced by computer-generated perceptual information. AR applications are usually built with engines such as Unity or Unreal. Use cases exist in various industries. Industry 4.0 is the most present one today. But other industries start building fascinating applications. Just think about Pokemon Go from Nintendo for your smartphone.

The following shows an example of AR in the telco industry for providing an innovative retailing service. The customer makes a picture of his home, sends the picture to an OTT service of the Telco provider, and receives the enhanced picture (e.g, with a new couch to buy for your home):

Augmented Reality with Apache Kafka and Deep Learning for Picture Enhancement

Kafka is used for orchestration, integration with backend services, and sending the original and enhanced image between the smartphone and the OTT Telco service.

Machine Vision at the Edge in Industrial IoT (IIoT) with Confluent and Hivecell

Kafka comes up at the edge more and more. Here is an example of machine vision at the edge with Kafka in Industrial IoT (IIoT) / Industry 4.0 (I4):

Hivecell and Confluent Platform for Image Processing at the Edge

A Hivecell node is equipped with

  • Confluent MQTT Proxy: Integration with the cameras
  • Kafka Broker and ZooKeeper: Event streaming platform
  • Kafka Streams: Data processing, such as filtering, transformations, aggregations, etc.
  • Nvidia's Triton Inference server: Image recognition using trained analytic models
  • Kafka Connect and Confluent Replicator:  Replication of the machine vision results in the cloud

Video Streaming with Apache Kafka

Streaming media is the process of delivering and obtaining media. Data is continuously received by and presented to one or more consumers while being delivered by a provider. Buffering the split up data packages of videos on the consumer side ensures a continuous flow.

The implementation of video streaming with Kafka-native technologies is pretty straightforward:

Video Streaming with Apache Kafka via Chunk + Re-Assemble Large Kafka Message Payloads

This architecture leverages the Composed Message Processor Enterprise Integration Pattern (EIP):

Composed Message Processor Enterprise Integration Pattern

The use case is even more straightforward, as we don't need a content-based router in our case. We just combine the Splitter and Aggregator EIPs.

Split and Aggregate Video Streams for Security and Surveillance in the Public Sector

The following shows a use case for video streaming with Kafka for security and surveillance:

Video Streaming with Apache Kafka for Security and Surveillance as part of Modernized SIEM

In this case, video streaming is part of a modernized SIEM (security information and event management). Audio streaming works in a very similar way. Hence, I will not cover it separately.

A smart city is another example where video, image, and audio processing with Kafka come into play.

Kafka for Large Message Payloads – Big Data Files (CSV, Video, Proprietary)

Up above, we have seen examples for processing specific large messages: Images, video, audio. In many use cases, other kinds of files need to be processed. Large files include:

  • Structured data, e.g., big CSV files
  • Unstructured data, e.g., complete videos (not continuous video streaming) or other binary files such as an analytic model

As I said before, Kafka is not the right technology to store big files. Specific tools were built for this, including object stores such as AWS S3 or MinIO.

The Claim Check EIP is the perfect solution for this problem:

 

Claim Check Pattern EIP

Metadata in Kafka and Linking to External Storage for Content Delivery of Large Video Files in the Media Industry

Many large video files are produced in the media industry. Specific storage and video editing tools are used. Kafka does not send these big files. But it controls the orchestration in a flexible, decoupled real-time architecture:

Metadata in Kafka + Link to Object Storage for Large Files

Externalizing Large Payloads on-the-fly for Legacy Integration from Proprietary Systems in Financial Services

Big files have to be processed in many industries. In financial services, I saw several use cases where large proprietary files have to be shared between different legacy applications.

Similar to the Claim Check EIP used above, you can also leverage Kafka Connect and its Single Message Transformations (SMT) feature:

Externalizing Large Payloads on the fly with Kafka Connect SMT

Natural Language Processing (NLP) using Kafka and Machine Learning for Large Text Files

Machine Learning and Kafka are a perfect fit. I covered this topic in many articles and talks in the past. Just google or start with this blog post to get an idea about this approach: "Using Apache Kafka to Drive Cutting-Edge Machine Learning".

Natural Language Processing (NLP) using Kafka and machine learning for large text files is a great example. "Continuous NLP Pipelines with Python, Java, and Apache Kafka" shows how to implement the above design pattern using Kafka Streams, Kafka Connect, and an S3 Serializer / Deserializer.

I like this example because it also solves the impedance mismatch between the data scientist (who loves Python) and the production engineer (who loves Java). "Machine Learning with Python, Jupyter, KSQL, and TensorFlow" explores this challenge in more detail.

Large Messages in a Chat Application for Customer Service in Banking

You just learned how to handle large files with Kafka by externalizing them into an object store and only sending the metadata via Kafka. In some use cases, this is too much effort or cost. Sending large files directly via Kafka is possible and sometimes easier to implement. The architecture is much simpler and more cost-effective.

I already discussed the trade-offs above. But here is an excellent use case of sending large files natively with Kafka: Attachments in a chat application for customer service.  

An example of a financial firm using Kafka for a chat system is Goldman Sachs. They led the development of Symphony, an industry initiative to build a cloud-based platform for instant communication and content sharing that securely connects market participants. Symphony is based on an open-source business model that is cost-effective, extensible, and customizable to suit end-user needs. Many other FinServ companies invested into Symphony, including Bank of America, BNY Mellon, BlackRock, Citadel, Citi, Credit Suisse, Deutsche Bank, Goldman Sachs, HSBC, Jefferies, JPMorgan, Maverick, Morgan Stanley, Nomura, and Wells Fargo.

Kafka is a perfect fit for chat applications. The broker storage and decoupling are perfect for multi-platform and multi-technology infrastructure. Offline capabilities and consuming old messages are built into Kafka, too. Here is an example from a chat platform in the gaming industry:

Real-time Chat function at scale within games and cross-platform usings Apache Kafka

Attachments like files, images, or any other binary content can be part of this implementation. Different architectures are possible. For instance, you could use dedicated Kafka Topics for handling large messages. Or you just put them into your' ' chat message' event. With Confluent Schema Registry, the schema could have an attribute 'attachment'. Or you externalize the attachment using the Claim Check EIP discussed above.

Kafka-native Handling of Large Messages Has Its Use Cases!

As you learned in this post, plenty of use cases exist for handling large messages files with Apache Kafka and its ecosystem. Kafka was built for large volume/throughput - which is required for large messages. 'Scaling Apache Kafka to 10+ GB Per Second in Confluent Cloud' is an impressive example.

However, not all large messages should be processed with Kafka. Often you should use the right storage system and just leverage Kafka for the orchestration. Know the different design patterns and choose the right technology for each problem.

A common scenario for Kafka-native processing of large messages is at the edge where other data storages are often not available or would increase the cost and complexity for provisioning the infrastructure.

What are your experiences with handling large messages with the Kafka ecosystem? Did you or do you plan to use Apache Kafka and its ecosystem? What is your strategy? Let's connect on LinkedIn and discuss it!

Wednesday, August 5, 2020

Understanding Kafka Failover for Brokers and Consumers


In this tutorial, we are going to run many Kafka Nodes on our development laptop so that you will need at least 16 GB of RAM for local dev machine. You can run just two servers if you have less memory than 16 GB. We are going to create a replicated topic. We then demonstrate consumer failover and broker failover. We also demonstrate load balancing Kafka consumers. We show how, with many groups, Kafka acts like a Publish/Subscribe. But, when we put all of our consumers in the same group, Kafka will load share the messages to the consumers in the same group (more like a queue than a topic in a traditional MOM sense).

If not already running, then start up ZooKeeper (./run-zookeeper.sh from the first tutorial). Also, shut down Kafka from the first tutorial.

Next, you need to copy server properties for three brokers (detailed instructions to follow). Then we will modify these Kafka server properties to add unique Kafka ports, Kafka log locations, and unique Broker ids. Then we will create three scripts to start these servers up using these properties, and then start the servers. Lastly, we create replicated topic and use it to demonstrate Kafka consumer failover, and Kafka broker failover.

Create Three New Kafka server-n.properties Files

In this section, we will copy the existing Kafka server.properties to server-0.propertiesserver-1.properties, and server-2.properties. Then we change server-0.properties to set log.dirs to “./logs/kafka-0. Then we modify server-1.properties to set port to 9093, broker id to 1, and log.dirs to “./logs/kafka-1”. Lastly modify server-2.propertiesto use port 9094, broker id 2, and log.dirs “./logs/kafka-2”.

Copy Server Properties File:

$ ~/kafka-training
$ mkdir -p lab2/config
$ cp kafka/config/server.properties kafka/lab2/config/server-0.properties
$ cp kafka/config/server.properties kafka/lab2/config/server-1.properties
$ cp kafka/config/server.properties kafka/lab2/config/server-2.properties

With your favorite text editor, change server-0.properties so that log.dirs is set to ./logs/kafka-0. Leave the rest of the file the same. Make sure log.dirs is only defined once.

~/kafka-training/lab2/config/server-0.properties

broker.id=0
port=9092
log.dirs=./logs/kafka-0
...

Change log.dirsbroker.id and and log.dirs of server-1.properties as follows.

~/kafka-training/lab2/config/server-1.properties

broker.id=1
port=9093
log.dirs=./logs/kafka-1
...

Change log.dirsbroker.id and and log.dirs of server-2.properties as follows.

~/kafka-training/lab2/config/server-2.properties

broker.id=2
port=9094
log.dirs=./logs/kafka-2
...

Create Startup Scripts for Three Kafka Servers

The startup scripts will just run kafka-server-start.sh with the corresponding properties file.

~/kafka-training/lab2/start-1st-server.sh

#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
## Run Kafka
kafka/bin/kafka-server-start.sh \
    "$CONFIG/server-0.properties"

~/kafka-training/lab2/start-2nd-server.sh

#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
## Run Kafka
kafka/bin/kafka-server-start.sh \
    "$CONFIG/server-1.properties"

~/kafka-training/lab2/start-3rd-server.sh

#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
## Run Kafka
kafka/bin/kafka-server-start.sh \
    "$CONFIG/server-2.properties"

Notice that we are passing the Kafka server properties files that we created in the last step.

Now run all three in separate terminals/shells.

Run Kafka servers each in own terminal from ~/kafka-training/lab2

~/kafka-training/lab2
$ ./start-1st-server.sh
...
$ ./start-2nd-server.sh
...
$ ./start-3rd-server.sh

Give the servers a minute to startup and connect to ZooKeeper.

Create Kafka Replicated Topic my-failsafe-topic

Now we will create a replicated topic that the console producers and console consumers can use.

~/kafka-training/lab2/create-replicated-topic.sh

#!/usr/bin/env bash
cd ~/kafka-training
kafka/bin/kafka-topics.sh --create \
    --zookeeper localhost:2181 \
    --replication-factor 3 \
    --partitions 13 \
    --topic my-failsafe-topic

Notice that the replication factor gets set to 3, and the topic name is my-failsafe-topic, and like before, it has 13 partitions.

Then we just have to run the script to create the topic.

Run create-replicated-topic.sh

~/kafka-training/lab2
$ ./create-replicated-topic.sh

Start Kafka Consumer That Uses Replicated Topic

Next, create a script that starts the consumer and then start the consumer with the script.

~/kafka-training/lab2/start-consumer-console-replicated.sh

#!/usr/bin/env bash
cd ~/kafka-training
kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9094,localhost:9092 \
    --topic my-failsafe-topic \
    --from-beginning

Notice that a list of Kafka servers is passed to the --bootstrap-server parameter. Only two of the three servers get passed that we ran earlier. Even though only one broker is needed, the consumer client will learn about the other broker from just one server. Usually, you list multiple brokers in case there is an outage, so that the client can connect.

Now we just run this script to start the consumer.

Run start-consumer-console-replicated.sh

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh

Start Kafka Producer That Uses Replicated Topic

Next, we create a script that starts the producer. Then launch the producer with the script you created.

~/kafka-training/lab2/start-consumer-producer-replicated.sh

#!/usr/bin/env bash
cd ~/kafka-training
kafka/bin/kafka-console-producer.sh \
--broker-list localhost:9092,localhost:9093 \
--topic my-failsafe-topic

Notice that we start the Kafka producer and pass it a list of Kafka Brokers to use via the parameter --broker-list.

Now use the start producer script to launch the producer as follows.

Run start-producer-console-replicated.sh

~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh

Send Messages

Now send a message from the producer to Kafka and see those messages consumed by the consumer.

Producer Console

~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
Hi Mom
How are you?
How are things going?
Good!

Consumer Console

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
Hi Mom
How are you?
How are things going?
Good!

Start Two More Consumers and Send More Messages

Now start two more consumers in their own terminal window and send more messages from the producer.

Producer Console

~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
Hi Mom
How are you?
How are things going?
Good!
message 1
message 2
message 3

Consumer Console 1st

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
Hi Mom
How are you?
How are things going?
Good!
message 1
message 2
message 3

Consumer Console 2nd in new Terminal

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
Hi Mom
How are you?
How are things going?
Good!
message 1
message 2
message 3

Consumer Console 2nd in new Terminal

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
Hi Mom
How are you?
How are things going?
Good!
message 1
message 2
message 3

Notice that the messages are sent to all of the consumers because each consumer is in a different consumer group.

Change Consumer to Be in Their Own Consumer Group

Stop the producers and the consumers from before, but leave Kafka and ZooKeeper running.

Now let’s modify the start-consumer-console-replicated.sh script to add a Kafka consumer group. We want to put all of the consumers in the same consumer group. This way the consumers will share the messages as each consumer in the consumer group will get its share of partitions.

~/kafka-training/lab2/start-consumer-console-replicated.sh

#!/usr/bin/env bash
cd ~/kafka-training
kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9094,localhost:9092 \
    --topic my-failsafe-topic \
    --consumer-property group.id=mygroup

Notice that the script is the same as before except we added --consumer-property group.id=mygroup which will put every consumer that runs with this script into the mygroup consumer group.

Now we just run the producer and three consumers.

Run this three times - start-consumer-console-replicated.sh

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh

Run Producer Console

~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh

Now send seven messages from the Kafka producer console.

Producer Console

~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
m1
m2
m3
m4
m5
m6
m7

Notice that the messages are spread evenly among the consumers.

1st Kafka Consumer gets m3, m5

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m3
m5

Notice the first consumer gets messages m3 and m5.

2nd Kafka Consumer gets m2, m6

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m2
m6

Notice the second consumer gets messages m2 and m6.

3rd Kafka Consumer gets m1, m4, m7

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m1
m4
m7

Notice the third consumer gets messages m1, m4 and m7.

Notice that each consumer in the group got a share of the messages.

Kafka Consumer Failover

Next, let’s demonstrate consumer failover by killing one of the consumers and sending seven more messages. Kafka should divide up the work to the consumers that are running.

First, kill the third consumer (CTRL-C in the consumer terminal does the trick).

Now send seven more messages with the Kafka console-producer.

Producer Console - send seven more messages m8 through m14

~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
m1
...
m8
m9
m10
m11
m12
m13
m14

Notice that the messages are spread evenly among the remaining consumers.

1st Kafka Consumer gets m8, m9, m11, m14

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m3
m5
m8
m9
m11
m14

The first consumer got m8, m9, m11 and m14.

2nd Kafka Consumer gets m10, m12, m13

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m2
m6
m10
m12
m13

The second consumer got m10, m12, and m13.

We killed one consumer, sent seven more messages, and saw Kafka spread the load to remaining consumers. Kafka consumer failover works!

Create Kafka Describe Topic Script

You can use kafka-topics.sh to see how the Kafka topic is laid out among the Kafka brokers. The ---describe will show partitions, ISRs, and broker partition leadership.

~/kafka-training/lab2/describe-topics.sh

#!/usr/bin/env bash
cd ~/kafka-training
# List existing topics
kafka/bin/kafka-topics.sh --describe \
    --topic my-failsafe-topic \
    --zookeeper localhost:2181

Let’s run kafka-topics.sh --describe and see the topology of our my-failsafe-topic.

Run describe-topics

We are going to lists which broker owns (leader of) which partition, and list replicas and ISRs of each partition. ISRs are replicas that are up to date. Remember there are 13 topics.

Topology of Kafka Topic Partition Ownership

~/kafka-training/lab2
$ ./describe-topics.sh
Topic: my-failsafe-topic    PartitionCount: 13    ReplicationFactor: 3    Configs:
    Topic: my-failsafe-topic    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: my-failsafe-topic    Partition: 1    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: my-failsafe-topic    Partition: 2    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: my-failsafe-topic    Partition: 3    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: my-failsafe-topic    Partition: 4    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: my-failsafe-topic    Partition: 5    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: my-failsafe-topic    Partition: 6    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: my-failsafe-topic    Partition: 7    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: my-failsafe-topic    Partition: 8    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: my-failsafe-topic    Partition: 9    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: my-failsafe-topic    Partition: 10    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: my-failsafe-topic    Partition: 11    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: my-failsafe-topic    Partition: 12    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1

Notice how each broker gets a share of the partitions as leaders and followers. Also, see how Kafka replicates the partitions on each broker.

Test Broker Failover by Killing 1st Server

Let’s kill the first broker, and then test the failover.

Kill the first broker

 $ kill `ps aux | grep java | grep server-0.properties | tr -s " " | cut -d " " -f2`

You can stop the first broker by hitting CTRL-C in the broker terminal or by running the above command.

Now that the first Kafka broker has stopped, let’s use Kafka topics describe to see that new leaders were elected!

Run describe-topics again to see leadership change

~/kafka-training/lab2/solution
$ ./describe-topics.sh
Topic:my-failsafe-topic    PartitionCount:13    ReplicationFactor:3    Configs:
    Topic: my-failsafe-topic    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 1    Leader: 1    Replicas: 0,1,2    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 2    Leader: 1    Replicas: 1,2,0    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 3    Leader: 2    Replicas: 2,1,0    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 4    Leader: 2    Replicas: 0,2,1    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 5    Leader: 1    Replicas: 1,0,2    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 6    Leader: 2    Replicas: 2,0,1    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 7    Leader: 1    Replicas: 0,1,2    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 8    Leader: 1    Replicas: 1,2,0    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 9    Leader: 2    Replicas: 2,1,0    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 10    Leader: 2    Replicas: 0,2,1    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 11    Leader: 1    Replicas: 1,0,2    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 12    Leader: 2    Replicas: 2,0,1    Isr: 2,1


Notice how Kafka spreads the leadership over the 2nd and 3rd Kafka brokers.

Show Broker Failover Worked

Let’s prove that failover worked by sending two more messages from the producer console.
Then notice if the consumers still get the messages.

Send the message m15 and m16.

Producer Console - send m15 and m16

~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
m1
...
m15
m16

Notice that the messages are spread evenly among the remaining live consumers.

1st Kafka Consumer gets m16

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m3
m5
m8
m9
m11
m14
...
m16

The first Kafka broker gets m16.

2nd Kafka Consumer gets m15

~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m2
m6
m10
m12
m13
...
m15

The second Kafka broker gets m15.

Kafka broker Failover WORKS!

Kafka Cluster Failover Review

Why did the three consumers not load share the messages at first?

They did not load share at first because they were each in a different consumer group. Consumer groups each subscribe to a topic and maintain their own offsets per partition in that topic.

How did we demonstrate failover for consumers?

We shut a consumer down. Then we sent more messages. We observed Kafka spreading messages to the remaining cluster.

How did we show failover for producers?

We didn’t. We showed failover for Kafka brokers by shutting one down, then using the producer console to send two more messages. Then we saw that the producer used the remaining Kafka brokers. Those Kafka brokers then delivered the messages to the live consumers.

What tool and option did we use to show ownership of partitions and the ISRs?

We used kafka-topics.sh using the --describe option.