Categories: Uncategorised

Spring Boot + Apache Kafka

Prerequisite:

You must install and run Apache Kafka. For help with Apache Kafka installation and to run please check the post :

https://technopractic.com/apache-kafka-installation/

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

In this post lets see how to create Kafka producers, consumers and to built an API to send and receive events. Producers are those client applications that publish (write) events to Kafka, and consumers are those that subscribe to (read and process) these events.

To use Spring for Apache Kafka add the below dependency

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

As the next step configure kafka configs in the application properties file.

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.demo.model

kafka.topic=sampletopic

Defined MessagesController to create API to send and receive events

@RestController
@RequestMapping("/messages")
public class MessagesController {
 
 @Autowired
 MessageService svc;

 @PostMapping("/send")
 public ProducerMetadata sendMessage(@RequestBody Order order) {
  
  try {
   return svc.sendMessage(order);
  } catch (Exception e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
   
   return null;
  }
  
 }
 
 @GetMapping("/receive/{partition}/{offset}")
 public Order receiveOrder(@PathVariable String partition,@PathVariable String offset) {
  
  return svc.retrieveMessage(partition,offset);
 }
 
}

Used Spring KafkaTemplate to send and receive events. The KafkaTemplate wraps a producer and provides convenience methods to send data to Kafka topics. From the response we can get the partition and offset to which events was sent.

 public ProducerMetadata sendMessage(Order order) throws Exception {
  // TODO Auto-generated method stub
  ListenableFuture<SendResult<String, Order>> result = kafkaTemplate.send(topicName,order);
  
  ProducerMetadata data = new ProducerMetadata();
  
  RecordMetadata metadata = null;
  try {
   SendResult<String, Order> sendResult = result.get();
   
   metadata = sendResult.getRecordMetadata();
   
  }
  catch(Exception e) {
   throw e;
  }
  
  data.setOffset(metadata.offset());
  data.setPartition(metadata.partition());
  
  return data;
 }

Using the partition and offset details we can receive the event that was sent by calling receive method of the KafkaTemplate

ConsumerRecord<String, Order> orderRecord = kafkaTemplate.receive(topicName,Integer.parseInt(partition),Long.valueOf(offset));
mahendravarman.m@gmail.com

Recent Posts

Spring Webflux Functional Endpoint – File Upload

In this blog using the Spring WebFlux module, we are going to leverage the functional…

12 months ago

Serverless Functions with Spring Cloud Function, AWS Lambda

Spring Cloud Function is a project within the Spring ecosystem that allows developers to build…

1 year ago

Spring Boot + RabbitMQ – Decoupling Microservices Communication

RabbitMQ is an open-source message broker software that implements the Advanced Message Queuing Protocol (AMQP).…

1 year ago

Spring Integration – Sending files over SFTP

Spring Integration is a powerful extension of the Spring Framework designed to support the implementation…

1 year ago

Spring Cloud Config Client

The Spring Cloud Config Client is a component of the Spring Cloud framework that enables…

1 year ago

Handling CSV in Python

In Python, handling CSV (Comma Separated Values) files is easy using the built-in csv module.…

1 year ago