Spring Boot + Apache Kafka

Prerequisite:

You must install and run Apache Kafka. For help with Apache Kafka installation and to run please check the post :

https://technopractic.com/apache-kafka-installation/

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

In this post lets see how to create Kafka producers, consumers and to built an API to send and receive events. Producers are those client applications that publish (write) events to Kafka, and consumers are those that subscribe to (read and process) these events.

To use Spring for Apache Kafka add the below dependency

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

As the next step configure kafka configs in the application properties file.

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.demo.model

kafka.topic=sampletopic

Defined MessagesController to create API to send and receive events

@RestController
@RequestMapping("/messages")
public class MessagesController {
	
	@Autowired
	MessageService svc;

	@PostMapping("/send")
	public ProducerMetadata sendMessage(@RequestBody Order order) {
		
		try {
			return svc.sendMessage(order);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			
			return null;
		}
		
	}
	
	@GetMapping("/receive/{partition}/{offset}")
	public Order receiveOrder(@PathVariable String partition,@PathVariable String offset) {
		
		return svc.retrieveMessage(partition,offset);
	}
	
}

Used Spring KafkaTemplate to send and receive events. The KafkaTemplate wraps a producer and provides convenience methods to send data to Kafka topics. From the response we can get the partition and offset to which events was sent.

	public ProducerMetadata sendMessage(Order order) throws Exception {
		// TODO Auto-generated method stub
		ListenableFuture<SendResult<String, Order>> result = kafkaTemplate.send(topicName,order);
		
		ProducerMetadata data = new ProducerMetadata();
		
		RecordMetadata metadata = null;
		try {
			SendResult<String, Order> sendResult = result.get();
			
			metadata = sendResult.getRecordMetadata();
			
		}
		catch(Exception e) {
			throw e;
		}
		
		data.setOffset(metadata.offset());
		data.setPartition(metadata.partition());
		
		return data;
	}

Using the partition and offset details we can receive the event that was sent by calling receive method of the KafkaTemplate

ConsumerRecord<String, Order> orderRecord = kafkaTemplate.receive(topicName,Integer.parseInt(partition),Long.valueOf(offset));

Leave a Reply

Your email address will not be published. Required fields are marked *