Simple Way to Implement Kafka with Java – Source Code on GitHub
Implement Kafka with Java:
Apache Kafka is the buzz word today. Everyone talks about it writes about it. So I have also decided to dive into it and understand it. I will try to put some basic understanding of Apache Kafka and then we will go through a running example.
Is Kafka similar to traditional Message Broker? Well answer to this can be Yes and No. “Yes” because it gives you the similar functionality of traditional brokers. “NO” because it offers you much more functionality than traditional brokers.
Apache Kafka – Concepts
When we talk about Kafka we need to have few things clear.
Cluster:
Kafka is always run as a cluster. Cluster is nothing but one instance of the Kafka server running on any machine. You can have such many clusters or instances of Kafka running on the same or different machines. You can specify the protocol and port on which Kafka runs in the respective properties file.
Topics:
Kafka treats topics as categories or feed name to which messages are published. The core concept here is similar to a traditional broker. On top of that,
- the Kafka topics are always multi-subscriber.
- topics can have single or multiple partitions which store messages with unique offset numbers
- Kafka topics retain all the published messages whether or not they have been consumed. The records are freed based on the configurable retention period.
(Image Courtesy : kafka.apache.org)
Zookeeper:
ZooKeeper is a centralized service for maintaining and providing distributed synchronization and providing group services. As Kafka is distributed as a clustered framework, it highly depends on Zookeeper to keep its clusters in sync.
Now let’s see how we can actually get some hands-on Kafka
Download and Installation
You can download the Kafka distribution from this link. Once downloaded, untar the archive to the folder of your choice. The distribution has scripts for both Linux and Windows environments. We are using Windows scripts here and default ports and directories. As mentioned earlier Kafka uses Zookeeper, so you have to first start the Zookeeper server.
Start Zookeeper
1 |
bin\windows\zookeeper-server-start.bat config\zookeeper.properties |
After Zookeeper is started its time to start Kafka. For demo purposes, we will use a single cluster.
Start Kafka
1 |
bin\windows\kafka-server-start.bat config\server.properties |
For our testing, we will create a topic named “test”. The command for same is:
1 |
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test |
We are ready to connect to this newly created Kafka topic and publish and consume some messages. I am using the Simple Spring Boot project with Kafka dependencies included. Below are few major dependencies from my pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
<dependencies> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> |
In this example, I have written simple Consumer and Producer classes. These classes are implementing the Runnable interface and they are producing or consuming from the topic that they receive as a command-line parameter.
Read here on How to pass command line parameters with Spring Boot
The Producer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
package com.opencodez.kafka; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Producer implements Runnable { private static final Logger logger = LoggerFactory.getLogger(Producer.class); private String message; private String topic; public Producer(String topic, String message) { this.message = message; this.topic = topic; } @Override public void run() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<String, String>(this.topic, this.message)); logger.info("Message sent to topic: {}", this.topic); producer.close(); } } |
The Consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
package com.opencodez.kafka; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Consumer implements Runnable { private static final Logger logger = LoggerFactory.getLogger(Consumer.class); private String topic; public Consumer(String topic) { this.topic = topic; } @Override public void run() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(this.topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { logger.info("offset = {}, value = {}", record.offset(), record.value()); System.out.println("offset = " + record.offset() + ", value = " + record.value()); } } } } |
In the main application, I have injected Simple Task Executor. This executor will start either producer or consumer based on the parameter we send.
The Main Application
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
package com.opencodez; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import com.opencodez.kafka.Consumer; import com.opencodez.kafka.Producer; import com.opencodez.util.Constants; @SpringBootApplication public class KafkaDemoApplication implements ApplicationRunner { @Autowired private TaskExecutor taskExecutor; public static void main(String[] args) { SpringApplication.run(KafkaDemoApplication.class, args); } @Bean public TaskExecutor taskExecutor() { return new SimpleAsyncTaskExecutor(); } @Override public void run(ApplicationArguments args) throws Exception { String start_as; String topic; String msg; if (args.containsOption(Constants.OPTION_START_AS)) { start_as = args.getOptionValues(Constants.OPTION_START_AS).get(0); if (args.containsOption(Constants.OPTION_TOPIC)) { topic = args.getOptionValues(Constants.OPTION_TOPIC).get(0); if (Constants.OPTION_CONSUMER.equalsIgnoreCase(start_as)) { taskExecutor.execute(new Consumer(topic)); } else if (Constants.OPTION_PRODUCER.equalsIgnoreCase(start_as)) { msg = args.getOptionValues(Constants.OPTION_MESSAGE).get(0); if (null != msg) { taskExecutor.execute(new Producer(topic, msg)); } } } } } } |
As I am using threaded example, I have added log file generation using log back. So when we run our consumer, the log file will get appended with the message read from the topic.
Start Java Consumer
This will run the jar and create myapplication.log file
1 |
kafka-demo-0.0.1-SNAPSHOT.jar --start.as=consumer --topic=test |
After this our consumer thread is started and it will log the message to the log file when it receives.
Send Message using Producer
1 |
kafka-demo-0.0.1-SNAPSHOT.jar --start.as=producer --topic=test --message="Hello Opencodez.com" |
The above command sends a message to our test topic. And now if you check your log file you will see the message in it. This is logged by our consumer
1 |
2017-12-29 17:16:58.085 INFO 120452 --- [SimpleAsyncTaskExecutor-1] com.opencodez.kafka.Consumer : offset = 1, value = Hello Opencodez.com |
To confirm same, let’s try and run official test script that is distributed with Apache Kafka
Summary
I found Apache Kafka simple to implement. I will try my hands on some more aspects of Apache Kafka and share it with readers. Meanwhile, you can download the source code from our repository.
Please feel free to comment or ask questions.
Download from Git