Prerequisite:
You must install and run Apache Kafka. For help with Apache Kafka installation and to run please check the post :
https://technopractic.com/apache-kafka-installation/
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
In this post lets see how to create Kafka producers, consumers and to built an API to send and receive events. Producers are those client applications that publish (write) events to Kafka, and consumers are those that subscribe to (read and process) these events.
To use Spring for Apache Kafka add the below dependency
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
As the next step configure kafka configs in the application properties file.
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.demo.model
kafka.topic=sampletopic
Defined MessagesController to create API to send and receive events
@RestController
@RequestMapping("/messages")
public class MessagesController {
@Autowired
MessageService svc;
@PostMapping("/send")
public ProducerMetadata sendMessage(@RequestBody Order order) {
try {
return svc.sendMessage(order);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
}
@GetMapping("/receive/{partition}/{offset}")
public Order receiveOrder(@PathVariable String partition,@PathVariable String offset) {
return svc.retrieveMessage(partition,offset);
}
}
Used Spring KafkaTemplate to send and receive events. The KafkaTemplate wraps a producer and provides convenience methods to send data to Kafka topics. From the response we can get the partition and offset to which events was sent.
public ProducerMetadata sendMessage(Order order) throws Exception {
// TODO Auto-generated method stub
ListenableFuture<SendResult<String, Order>> result = kafkaTemplate.send(topicName,order);
ProducerMetadata data = new ProducerMetadata();
RecordMetadata metadata = null;
try {
SendResult<String, Order> sendResult = result.get();
metadata = sendResult.getRecordMetadata();
}
catch(Exception e) {
throw e;
}
data.setOffset(metadata.offset());
data.setPartition(metadata.partition());
return data;
}
Using the partition and offset details we can receive the event that was sent by calling receive method of the KafkaTemplate
ConsumerRecord<String, Order> orderRecord = kafkaTemplate.receive(topicName,Integer.parseInt(partition),Long.valueOf(offset));
In this blog using the Spring WebFlux module, we are going to leverage the functional…
Spring Cloud Function is a project within the Spring ecosystem that allows developers to build…
RabbitMQ is an open-source message broker software that implements the Advanced Message Queuing Protocol (AMQP).…
Spring Integration is a powerful extension of the Spring Framework designed to support the implementation…
The Spring Cloud Config Client is a component of the Spring Cloud framework that enables…
In Python, handling CSV (Comma Separated Values) files is easy using the built-in csv module.…