Overview
In the previous post I showed how to set up Kafka on your local machine. In this article, I will walk through a āhello worldā Kafka publisher/subscriber implemented in Java/SpringBoot. All of the code for this example can be found here.
Prereqs
- Please read my post on the basic Kafka setup
- Java 8 (or above)
- maven
Step 1: Maven dependencies - pom.xml
All of the necessary dependencies are included in the pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
The spring-kafka lib is worth a special mention - it provides a nice abstraction to help implement publishers, consumers etc, which we will go through in this article. The rest of libraries are needed to implement a web application/API and testing. lombok helps reduce boilerplate code in Java.
Step 2: Main Application
Typical of a spring-boot application, there is a main Java class that acts as the entry point and implements a main() method with the @SpringBootApplication annotation at the class level (line 2)
@SpringBootApplication
public class SpringBootWithKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootWithKafkaApplication.class, args);
}
}
Step 3: Configure Kafka in application.yaml
Hereās a snippet of the application.yaml file -
server:
port: 9000
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties.spring.json.trusted.packages: com.chitamoor.domain
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
bootstrap-servers (line 7) list out the broker servers and ports. In our case, the broker is running locally localhost:9092. key-deserializer (line 80) and value-deserializer (line 81) define the serialization classes needed for the key and value. Given that all of the messages are stored in binary in the kafka logs, you will need to de-serizalize it. Thatās where the de-serialization comes into play. For more info on data-types and the serialization/de-serialization process, please see Data Types and Serialization
Step 4: Producer (Publisher)
The Producer/Publisher is responsible for writing (publishing) messages into Kafka.
@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
@Autowired
private KafkaTemplate<String, Message> kafkaTemplate;
public void sendMessage(Message message, String topic) {
logger.info(String.format("#### -> Producing message -> %s", message));
this.kafkaTemplate.send(topic, new Message(123, "Message"));
}
}
KafkaTemplate class provides a nice abstraction to publish messages into a Kakfa topic. It takes care of all the details of interacting with the broker(s) to produce messages. Notice the declaration in line 7, KafkaTemplate, takes in two args - String and Message. The actual publishing of the message happens in the line kafkaTemplate.send()
(line 11), which takes a topic name and an instance of the Message
class. Message is the object written into a Kafka topic
public class Message {
Integer integerAttribtuteOne;
String stringAttribtuteTwo;
}
Thatās all there is to publish a message into Kafka. Now lets look at how to consume these messages.
Step 5: Consumer
The Consumer is where one defines the message consupmtion and processing logic.
@Service
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Producer.class);
@KafkaListener(topics = "${kafka.topic.name}")
public void consume(Message message) throws IOException { logger.info(String.format("#### -> Consumed message from topic_one -> %s", message));
}
}
The important bit to note here is @KafkaListener annotation on line 7 which does the work of listening to a kafka topic. We pass the KafkaListener a topic name(s). KafkaListener also let you specify the consumer group that a given consumer belongs to. Though we havenāt defined it in the code, we have specified the consumer group using the property spring.kafka.consumer.group-id property property in the application.yaml file. @KafkaListener supports more options - refer to @KafkaListener for more info. The actual consumption logic is implemented in the consume() (line 8) method. The method signature of the consume() method takes in an object of type Message, which matches the kafkaTemplate.send() in the Producer class.
Step 6: Give it a whirl using the KafkaController
The KafkaController provides a REST endpoint to publish a message to the kafka topic.
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final Producer producer;
@Autowired
KafkaController(Producer producer) {
this.producer = producer;
}
@PostMapping(value = "/publish")
public void sendMessageToKafkaTopic(
@RequestBody Message message) {
this.producer.sendMessage(message, topic);
}
}
sendMessageToKafkaTopic() (line 14) method is the where the main action takes place. It consumes the HTTP POST payload (which is automatically converted to Message object) and invokes the sendMessageToKafkaTopic method on the Producer, which publishes the message on the Kafka topic.
You can invole the REST endpoint using the following curl call -
curl -d '{"integerAttribtuteOne":"123", "stringAttribtuteTwo":"value_for_stringAttribtuteTwo"}' -H "Content-Type: application/json" -X POST http://localhost:9000/kafka/publish
Step 7: Verify the consumption of the Message
Verify that the message was written to the topic and was consumed by the consumer using the kafka-topics-ui at http://localhost:8000/
Code
The code mentioned in this article can be found here
Where to go from here
- My Kafka resource recommendations for Kafka