In our last article on implementation of Apache Kafka, we have seen the basic Java client to produce and consume messages. To continue our learning lets see how we can send custom objects to Kafka topic. For that, we need to implement Custom Value Serializer for Kafka.
Implement Custom Value Serializer for Kafka:
You can send messages with different data types to Kafka topics. The Kafka deals with messages or records in the form of a byte array. If you have observed, both KafkaProducer and KafkaConsumer need a key and value serializer. The default configuration for Producer
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Similarlly default for Consumer is
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
So one thing is clear that Kafka depends on Serializer and Deserializer so that Producer and Consumer both know how to communicate and understand the messages.
Now we will see how we can implement our own custom value serializer and deserializer to send and receive custom java objects from Kafka topics.
The easiest way to serialize your custom object is to convert to JSON format. To achieve this we will add the Jackson dependency to our Kafka project.
POM
xdependencyx xgroupIdxcom.fasterxml.jackson.corex/groupIdx xartifactIdxjackson-databindx/artifactIdx x/dependencyx
The Domain Objects
We have defined simple domain objects that we are going to send over kafka topic.
Developer.class
public class Developer { private Long id; private String name; private BigDecimal salary; private Address address; //Getters and Setters beyond this point }
Address.class
public class Address { private String state; private String country; private String zipcode; //Getters and Setters beyond this }
Every custom serializer must implement Kafka Serializer interface. So below is ours.
public class DeveloperSerializer implements SerializerxDeveloperx { @Override public byte[] serialize(String arg0, Developer developer) { byte[] serializedBytes = null; ObjectMapper objectMapper = new ObjectMapper(); try { serializedBytes = objectMapper.writeValueAsString(developer).getBytes(); } catch (Exception e) { e.printStackTrace(); } return serializedBytes; } @Override public void close() { // TODO Auto-generated method stub } @Override public void configure(MapxString, ?x arg0, boolean arg1) { // TODO Auto-generated method stub } }
In above we have implemented the serialize method which is converting our custom object to json string and returning bytes to be sent to the topic.
In a similar fashion, all custom deserializers need to implement Kafka Deserializer interface. Our Jackson ObjectMapper provides us convenient method to convert byte[] to our custom object. Below is our deserializer.
public class DeveloperDeserializer implements DeserializerxDeveloperx { @Override public Developer deserialize(String arg0, byte[] devBytes) { ObjectMapper mapper = new ObjectMapper(); Developer developer = null; try { developer = mapper.readValue(devBytes, Developer.class); } catch (Exception e) { e.printStackTrace(); } return developer; } @Override public void close() { // TODO Auto-generated method stub } @Override public void configure(MapxString, ?x arg0, boolean arg1) { // TODO Auto-generated method stub } }
So far we have defined all the required components for our custom value serializer for Apache Kafka. Lets put it to use. As we are using the same project from our previous article I have added some more command line parameters to consider.
Start the custom consumer
In our consumer code, we have to specify the custom deserializer. That we will do in the property map we set for KafkaConsumer
props.put("value.deserializer", "com.opencodez.serializer.DeveloperDeserializer");
The command to start our custom consumer thread is
kafka-demo-0.0.1-SNAPSHOT.jar --start.as=custom-consumer --topic=test
This will keep our consumer polling to the topic test and as soon as some message is posted, it will be printed in our log file.
Start the Custom Producer
In our producer code, we have to specify the custom serializer. That we will do in the property map we set for KafkaProducer
props.put("value.serializer", "com.opencodez.serializer.DeveloperSerializer");
Command to start producer
kafka-demo-0.0.1-SNAPSHOT.jar --start.as=custom-producer --topic=test --message="Pavan Solapure"
Above command will create a dummy developer object and post it to test topic.Â
The Output
In the above log file snippet, you can see that our consumer was able to read the custom object successfully.
You can download the latest code from our git repository
Download from Gitx