opencodez

Simple Way to Implement Kafka with Java – Source Code on GitHub

Implement Kafka with Java:

Apache Kafka is the buzz word today. Everyone talks about it writes about it. So I have also decided to dive into it and understand it. I will try to put some basic understanding of Apache Kafka and then we will go through a running example.

Is Kafka similar to traditional Message Broker? Well answer to this can be Yes and No. xYesx because it gives you the similar functionality of traditional brokers. xNOx because it offers you much more functionality than traditional brokers.

Apache Kafka x Concepts

When we talk about Kafka we need to have few things clear.

Cluster:

Kafka is always run as a cluster. Cluster is nothing but one instance of the Kafka server running on any machine. You can have such many clusters or instances of Kafka running on the same or different machines. You can specify the protocol and port on which Kafka runs in the respective properties file.

Topics:

Kafka treats topics as categories or feed name to which messages are published. The core concept here is similar to a traditional broker. On top of that,

  1. the Kafka topics are always multi-subscriber.
  2. topics can have single or multiple partitions which store messages with unique offset numbers
  3. Kafka topics retain all the published messages whether or not they have been consumed. The records are freed based on the configurable retention period.

(Image Courtesy : kafka.apache.org)

 Zookeeper:

ZooKeeper is a centralized service for maintaining and providing distributed synchronization and providing group services. As Kafka is distributed as a clustered framework, it highly depends on Zookeeper to keep its clusters in sync.

Now letxs see how we can actually get some hands-on Kafka

Download and Installation

You can download the Kafka distribution from this link. Once downloaded, untar the archive to the folder of your choice. The distribution has scripts for both Linux and Windows environments. We are using Windows scripts here and default ports and directories.  As mentioned earlier Kafka uses Zookeeper, so you have to first start the Zookeeper server.

Start Zookeeper

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

After Zookeeper is started its time to start Kafka. For demo purposes, we will use a single cluster.

Start Kafka

bin\windows\kafka-server-start.bat config\server.properties

For our testing, we will create a topic named xtestx. The command for same is:

bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

We are ready to connect to this newly created Kafka topic and publish and consume some messages. I am using the Simple Spring Boot project with Kafka dependencies included. Below are few major dependencies from my pom.xml

xdependenciesx
	xdependencyx
		xgroupIdxorg.springframework.kafkax/groupIdx
		xartifactIdxspring-kafkax/artifactIdx
	x/dependencyx
	xdependencyx
		xgroupIdxorg.springframework.bootx/groupIdx
		xartifactIdxspring-boot-starterx/artifactIdx
	x/dependencyx

	xdependencyx
		xgroupIdxorg.springframework.bootx/groupIdx
		xartifactIdxspring-boot-starter-testx/artifactIdx
		xscopextestx/scopex
	x/dependencyx
x/dependenciesx

In this example, I have written simple Consumer and Producer classes. These classes are implementing the Runnable interface and they are producing or consuming from the topic that they receive as a command-line parameter.

Read here on How to pass command line parameters with Spring Boot

The Producer

package com.opencodez.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Producer implements Runnable {
	
	private static final Logger logger = LoggerFactory.getLogger(Producer.class);
	
	private String message;
	private String topic;
	
	public Producer(String topic, String message) {
		this.message = message;
		this.topic = topic;
	}

	@Override
	public void run() {
		
		Properties props = new Properties();
		props.put("bootstrap.servers", "localhost:9092");
		props.put("acks", "all");
		props.put("retries", 0);
		props.put("batch.size", 16384);
		props.put("linger.ms", 1);
		props.put("buffer.memory", 33554432);
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

		KafkaProducerxString, Stringx producer = new KafkaProducerxx(props);
		
		producer.send(new ProducerRecordxString, Stringx(this.topic, this.message));
		
		logger.info("Message sent to topic: {}", this.topic);
		
		producer.close();
	}
}

The Consumer

package com.opencodez.kafka;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer implements Runnable {
	
	private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
	
	private String topic;
	
	public Consumer(String topic) {
		this.topic = topic;
	}

	@Override
	public void run() {
		
		Properties props = new Properties();
		props.put("bootstrap.servers", "localhost:9092");
		props.put("group.id", "test");
		props.put("enable.auto.commit", "true");
		props.put("auto.commit.interval.ms", "1000");
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumerxString, Stringx consumer = new KafkaConsumerxx(props);
		consumer.subscribe(Arrays.asList(this.topic));
		while (true) {
			ConsumerRecordsxString, Stringx records = consumer.poll(100);
			for (ConsumerRecordxString, Stringx record : records) {
				logger.info("offset = {}, value = {}", record.offset(), record.value());
				System.out.println("offset = " + record.offset() + ", value = " + record.value());
			}
		}
		
	}
}

In the main application, I have injected Simple Task Executor. This executor will start either producer or consumer based on the parameter we send.

The Main Application

package com.opencodez;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;

import com.opencodez.kafka.Consumer;
import com.opencodez.kafka.Producer;
import com.opencodez.util.Constants;

@SpringBootApplication
public class KafkaDemoApplication implements ApplicationRunner {

	@Autowired
	private TaskExecutor taskExecutor;

	public static void main(String[] args) {
		SpringApplication.run(KafkaDemoApplication.class, args);
	}

	@Bean
	public TaskExecutor taskExecutor() {
		return new SimpleAsyncTaskExecutor();
	}

	@Override
	public void run(ApplicationArguments args) throws Exception {

		String start_as;
		String topic;
		String msg;

		if (args.containsOption(Constants.OPTION_START_AS)) {
			
			start_as = args.getOptionValues(Constants.OPTION_START_AS).get(0);

			if (args.containsOption(Constants.OPTION_TOPIC)) {

				topic = args.getOptionValues(Constants.OPTION_TOPIC).get(0);

				if (Constants.OPTION_CONSUMER.equalsIgnoreCase(start_as)) {
					taskExecutor.execute(new Consumer(topic));
				} else if (Constants.OPTION_PRODUCER.equalsIgnoreCase(start_as)) {
					msg = args.getOptionValues(Constants.OPTION_MESSAGE).get(0);
					if (null != msg) {
						taskExecutor.execute(new Producer(topic, msg));
					}
				}
			}
		}

	}
}

As I am using threaded example, I have added log file generation using log back. So when we run our consumer, the log file will get appended with the message read from the topic.

Start Java Consumer

This will run the jar and create myapplication.log file

kafka-demo-0.0.1-SNAPSHOT.jar --start.as=consumer --topic=test

After this our consumer thread is started and it will log the message to the log file when it receives.

Send Message using Producer

kafka-demo-0.0.1-SNAPSHOT.jar --start.as=producer --topic=test --message="Hello Opencodez.com"

The above command sends a message to our test topic. And now if you check your log file you will see the message in it. This is logged by our consumer

2017-12-29 17:16:58.085  INFO 120452 --- [SimpleAsyncTaskExecutor-1] com.opencodez.kafka.Consumer : offset = 1, value = Hello Opencodez.com

To confirm same, letxs try and run official test script that is distributed with Apache Kafka

Summary

I found Apache Kafka simple to implement. I will try my hands on some more aspects of Apache Kafka and share it with readers. Meanwhile, you can download the source code from our repository.

Please feel free to comment or ask questions.

Download from Git