756 words
4 minutes
Local Kafka Development - Part 2
2019-09-12

Overview#

In the previous post I showed how to set up Kafka on your local machine. In this article, I will walk through a ā€˜hello world’ Kafka publisher/subscriber implemented in Java/SpringBoot. All of the code for this example can be found here.

Prereqs#

Step 1: Maven dependencies - pom.xml#

All of the necessary dependencies are included in the pom.xml

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.7.RELEASE</version>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>
</dependencies>

The spring-kafka lib is worth a special mention - it provides a nice abstraction to help implement publishers, consumers etc, which we will go through in this article. The rest of libraries are needed to implement a web application/API and testing. lombok helps reduce boilerplate code in Java.

Step 2: Main Application#

Typical of a spring-boot application, there is a main Java class that acts as the entry point and implements a main() method with the @SpringBootApplication annotation at the class level (line 2)

@SpringBootApplication
public class SpringBootWithKafkaApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringBootWithKafkaApplication.class, args);
	}
}

Step 3: Configure Kafka in application.yaml#

Here’s a snippet of the application.yaml file -

server:
  port: 9000
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties.spring.json.trusted.packages: com.chitamoor.domain
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

bootstrap-servers (line 7) list out the broker servers and ports. In our case, the broker is running locally localhost:9092. key-deserializer (line 80) and value-deserializer (line 81) define the serialization classes needed for the key and value. Given that all of the messages are stored in binary in the kafka logs, you will need to de-serizalize it. That’s where the de-serialization comes into play. For more info on data-types and the serialization/de-serialization process, please see Data Types and Serialization

Step 4: Producer (Publisher)#

The Producer/Publisher is responsible for writing (publishing) messages into Kafka.

@Service
public class Producer {
    private static final Logger logger = LoggerFactory.getLogger(Producer.class);

    @Autowired
    private KafkaTemplate<String, Message> kafkaTemplate;

    public void sendMessage(Message message, String topic) {
        logger.info(String.format("#### -> Producing message -> %s", message));
        this.kafkaTemplate.send(topic, new Message(123, "Message"));
    }
}

KafkaTemplate class provides a nice abstraction to publish messages into a Kakfa topic. It takes care of all the details of interacting with the broker(s) to produce messages. Notice the declaration in line 7, KafkaTemplate, takes in two args - String and Message. The actual publishing of the message happens in the line kafkaTemplate.send() (line 11), which takes a topic name and an instance of the Message class. Message is the object written into a Kafka topic

public class Message {
    Integer integerAttribtuteOne;
    String  stringAttribtuteTwo;
}

That’s all there is to publish a message into Kafka. Now lets look at how to consume these messages.

Step 5: Consumer#

The Consumer is where one defines the message consupmtion and processing logic.

@Service
public class Consumer {

    private final Logger logger = LoggerFactory.getLogger(Producer.class);

    @KafkaListener(topics = "${kafka.topic.name}")
    public void consume(Message message) throws IOException { logger.info(String.format("#### -> Consumed message from topic_one -> %s", message));
    }
}

The important bit to note here is @KafkaListener annotation on line 7 which does the work of listening to a kafka topic. We pass the KafkaListener a topic name(s). KafkaListener also let you specify the consumer group that a given consumer belongs to. Though we haven’t defined it in the code, we have specified the consumer group using the property spring.kafka.consumer.group-id property property in the application.yaml file. @KafkaListener supports more options - refer to @KafkaListener for more info. The actual consumption logic is implemented in the consume() (line 8) method. The method signature of the consume() method takes in an object of type Message, which matches the kafkaTemplate.send() in the Producer class.

Step 6: Give it a whirl using the KafkaController#

The KafkaController provides a REST endpoint to publish a message to the kafka topic.

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    private final Producer producer;

    @Autowired
    KafkaController(Producer producer) {
        this.producer = producer;
    }

    @PostMapping(value = "/publish")
    public void sendMessageToKafkaTopic(
            @RequestBody Message message) {
        this.producer.sendMessage(message, topic);
    }
}

sendMessageToKafkaTopic() (line 14) method is the where the main action takes place. It consumes the HTTP POST payload (which is automatically converted to Message object) and invokes the sendMessageToKafkaTopic method on the Producer, which publishes the message on the Kafka topic.

You can invole the REST endpoint using the following curl call -

curl -d '{"integerAttribtuteOne":"123", "stringAttribtuteTwo":"value_for_stringAttribtuteTwo"}' -H "Content-Type: application/json" -X POST http://localhost:9000/kafka/publish

Step 7: Verify the consumption of the Message#

Verify that the message was written to the topic and was consumed by the consumer using the kafka-topics-ui at http://localhost:8000/

Code#

The code mentioned in this article can be found here

Where to go from here#