Implement Kafka with Java:
Apache Kafka is the buzz word today. Everyone talks about it writes about it. So I have also decided to dive into it and understand it. I will try to put some basic understanding of Apache Kafka and then we will go through a running example.
Is Kafka similar to traditional Message Broker? Well answer to this can be Yes and No. xYesx because it gives you the similar functionality of traditional brokers. xNOx because it offers you much more functionality than traditional brokers.
Apache Kafka x Concepts
When we talk about Kafka we need to have few things clear.
Cluster:
Kafka is always run as a cluster. Cluster is nothing but one instance of the Kafka server running on any machine. You can have such many clusters or instances of Kafka running on the same or different machines. You can specify the protocol and port on which Kafka runs in the respective properties file.
Topics:
Kafka treats topics as categories or feed name to which messages are published. The core concept here is similar to a traditional broker. On top of that,
- the Kafka topics are always multi-subscriber.
- topics can have single or multiple partitions which store messages with unique offset numbers
- Kafka topics retain all the published messages whether or not they have been consumed. The records are freed based on the configurable retention period.
(Image Courtesy : kafka.apache.org)
 Zookeeper:
ZooKeeper is a centralized service for maintaining and providing distributed synchronization and providing group services. As Kafka is distributed as a clustered framework, it highly depends on Zookeeper to keep its clusters in sync.
Now letxs see how we can actually get some hands-on Kafka
Download and Installation
You can download the Kafka distribution from this link. Once downloaded, untar the archive to the folder of your choice. The distribution has scripts for both Linux and Windows environments. We are using Windows scripts here and default ports and directories. As mentioned earlier Kafka uses Zookeeper, so you have to first start the Zookeeper server.
Start Zookeeper
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
After Zookeeper is started its time to start Kafka. For demo purposes, we will use a single cluster.
Start Kafka
bin\windows\kafka-server-start.bat config\server.properties
For our testing, we will create a topic named xtestx. The command for same is:
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
We are ready to connect to this newly created Kafka topic and publish and consume some messages. I am using the Simple Spring Boot project with Kafka dependencies included. Below are few major dependencies from my pom.xml
xdependenciesx xdependencyx xgroupIdxorg.springframework.kafkax/groupIdx xartifactIdxspring-kafkax/artifactIdx x/dependencyx xdependencyx xgroupIdxorg.springframework.bootx/groupIdx xartifactIdxspring-boot-starterx/artifactIdx x/dependencyx xdependencyx xgroupIdxorg.springframework.bootx/groupIdx xartifactIdxspring-boot-starter-testx/artifactIdx xscopextestx/scopex x/dependencyx x/dependenciesx
In this example, I have written simple Consumer and Producer classes. These classes are implementing the Runnable interface and they are producing or consuming from the topic that they receive as a command-line parameter.
Read here on How to pass command line parameters with Spring Boot
The Producer
package com.opencodez.kafka; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Producer implements Runnable { private static final Logger logger = LoggerFactory.getLogger(Producer.class); private String message; private String topic; public Producer(String topic, String message) { this.message = message; this.topic = topic; } @Override public void run() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducerxString, Stringx producer = new KafkaProducerxx(props); producer.send(new ProducerRecordxString, Stringx(this.topic, this.message)); logger.info("Message sent to topic: {}", this.topic); producer.close(); } }
The Consumer
package com.opencodez.kafka; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Consumer implements Runnable { private static final Logger logger = LoggerFactory.getLogger(Consumer.class); private String topic; public Consumer(String topic) { this.topic = topic; } @Override public void run() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerxString, Stringx consumer = new KafkaConsumerxx(props); consumer.subscribe(Arrays.asList(this.topic)); while (true) { ConsumerRecordsxString, Stringx records = consumer.poll(100); for (ConsumerRecordxString, Stringx record : records) { logger.info("offset = {}, value = {}", record.offset(), record.value()); System.out.println("offset = " + record.offset() + ", value = " + record.value()); } } } }
In the main application, I have injected Simple Task Executor. This executor will start either producer or consumer based on the parameter we send.
The Main Application
package com.opencodez; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import com.opencodez.kafka.Consumer; import com.opencodez.kafka.Producer; import com.opencodez.util.Constants; @SpringBootApplication public class KafkaDemoApplication implements ApplicationRunner { @Autowired private TaskExecutor taskExecutor; public static void main(String[] args) { SpringApplication.run(KafkaDemoApplication.class, args); } @Bean public TaskExecutor taskExecutor() { return new SimpleAsyncTaskExecutor(); } @Override public void run(ApplicationArguments args) throws Exception { String start_as; String topic; String msg; if (args.containsOption(Constants.OPTION_START_AS)) { start_as = args.getOptionValues(Constants.OPTION_START_AS).get(0); if (args.containsOption(Constants.OPTION_TOPIC)) { topic = args.getOptionValues(Constants.OPTION_TOPIC).get(0); if (Constants.OPTION_CONSUMER.equalsIgnoreCase(start_as)) { taskExecutor.execute(new Consumer(topic)); } else if (Constants.OPTION_PRODUCER.equalsIgnoreCase(start_as)) { msg = args.getOptionValues(Constants.OPTION_MESSAGE).get(0); if (null != msg) { taskExecutor.execute(new Producer(topic, msg)); } } } } } }
As I am using threaded example, I have added log file generation using log back. So when we run our consumer, the log file will get appended with the message read from the topic.
Start Java Consumer
This will run the jar and create myapplication.log file
kafka-demo-0.0.1-SNAPSHOT.jar --start.as=consumer --topic=test
After this our consumer thread is started and it will log the message to the log file when it receives.
Send Message using Producer
kafka-demo-0.0.1-SNAPSHOT.jar --start.as=producer --topic=test --message="Hello Opencodez.com"
The above command sends a message to our test topic. And now if you check your log file you will see the message in it. This is logged by our consumer
2017-12-29 17:16:58.085 INFO 120452 --- [SimpleAsyncTaskExecutor-1] com.opencodez.kafka.Consumer : offset = 1, value = Hello Opencodez.com
To confirm same, letxs try and run official test script that is distributed with Apache Kafka
Summary
I found Apache Kafka simple to implement. I will try my hands on some more aspects of Apache Kafka and share it with readers. Meanwhile, you can download the source code from our repository.
Please feel free to comment or ask questions.
Download from Git