In this blog lets see how to upload a CSV file through reactive REST API and to load the file contents to Couchbase DB.
Create a project from start.spring.io with the following dependency
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-couchbase-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
opencsv is a library for reading and writing CSV in java. Add this library to the project with the below dependency
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.7.1</version>
</dependency>
Define the REST API as below. Please note to accept the file as input used @RequestPart annotation that can be used to associate the part of a “multipart/form-data” request with a method argument.
@RestController
public class EmployeeController {
@Autowired
EmployeeService service;
@PostMapping("/employee")
public Mono<Response> fileUpload(@RequestPart(name="file",required=true) FilePart file){
return service.csvFileUpload(file);
}
}
Employee entity:
@Document
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Employee {
@Id
@GeneratedValue
private String id;
@Field
private String name;
@Field
private String department;
}
Couchbase DB configurations:
spring.couchbase.connection-string=localhost
spring.couchbase.password=
spring.couchbase.username=
spring.data.couchbase.auto-index=true
spring.data.couchbase.bucket-name=employees
Whenever you upload a file, your browser sends it — and other fields in the form — to the server as a multipart/form-data message. The exact format of these messages is described in RFC 7578. If you submit a simple form with a single text field called foo and a file selector called file, the multipart/form-data message looks something like this:
POST / HTTP/1.1
Host: example.com
Content-Type: multipart/form-data;boundary="boundary" (1)
--boundary (2)
Content-Disposition: form-data; name="foo" (3)
bar
--boundary (4)
Content-Disposition: form-data; name="file"; filename="lorum.txt" (5)
Content-Type: text/plain
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer iaculis metus id vestibulum nullam.
--boundary-- (6)
In reactive environment the file contents, boundaries can be split in more than one buffers and as a result if we try to read the csv file content using opencsv library will face exceptions.
One way to solve this problem would be to wait until all buffers have been received, join them and then use opencsv library to read the csv. Please be cautioned joining multiple buffers into one effectively stores the entire multipart message in memory.
@Service
public class EmployeeServiceImpl implements EmployeeService {
@Autowired
EmployeeRepository repo;
private List<String> getContents(String content) {
Supplier<Stream<String>> supp = content::lines;
return supp.get().toList();
}
@Override
public Mono<Response> csvFileUpload(FilePart file) {
// TODO Auto-generated method stub
List<Employee> employees = new ArrayList<Employee>();
return DataBufferUtils.join(file.content()).map(data -> {
byte[] bytes = null;
try {
bytes = data.asInputStream().readAllBytes();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
DataBufferUtils.release(data);
return new String(bytes, StandardCharsets.UTF_8);
}).map(this::getContents).flatMap(row -> {
for (int i = 0; i < row.size(); i++) {
try (CSVReader reader = new CSVReader(new StringReader(row.get(i)))) {
String[] empRec = reader.readNext();
Employee emp = Employee.builder().name(empRec[0]).department(empRec[1]).id("assgg").build();
employees.add(emp);
} catch (IOException ex) {
System.out.println("error occured::" + ex.getMessage());
return Mono.error(ex);
} catch (CsvValidationException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return Mono.error(e);
}
}
return repo.saveAll(employees).collectList().map(dblist -> {
Response resp = Response.builder().code(200).message("Upload Successful").build();
return resp;
});
});
}
}
EmployeeRepository:
@Repository
public interface EmployeeRepository extends ReactiveCouchbaseRepository<Employee, String>{
}
References:
https://spring.io/blog/2021/09/14/efficient-parsing-of-reactive-buffer-streams
In this blog using the Spring WebFlux module, we are going to leverage the functional…
Spring Cloud Function is a project within the Spring ecosystem that allows developers to build…
RabbitMQ is an open-source message broker software that implements the Advanced Message Queuing Protocol (AMQP).…
Spring Integration is a powerful extension of the Spring Framework designed to support the implementation…
The Spring Cloud Config Client is a component of the Spring Cloud framework that enables…
In Python, handling CSV (Comma Separated Values) files is easy using the built-in csv module.…