Categories: Uncategorised

Spring Webflux – File upload

In this blog lets see how to upload a CSV file through reactive REST API and to load the file contents to Couchbase DB.

Create a project from start.spring.io with the following dependency

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-data-couchbase-reactive</artifactId>
</dependency>
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

opencsv is a library for reading and writing CSV in java. Add this library to the project with the below dependency

<dependency>
    <groupId>com.opencsv</groupId>
    <artifactId>opencsv</artifactId>
    <version>5.7.1</version>
</dependency>

Define the REST API as below. Please note to accept the file as input used @RequestPart annotation that can be used to associate the part of a “multipart/form-data” request with a method argument.

@RestController
public class EmployeeController {
 
 
 @Autowired
 EmployeeService service;

 
 @PostMapping("/employee")
 public Mono<Response> fileUpload(@RequestPart(name="file",required=true) FilePart file){
  return service.csvFileUpload(file);
  
 }
}

Employee entity:

@Document
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Employee {

 @Id
 @GeneratedValue
 private String id;
 
 @Field
 private String name;
 
 @Field
 private String department;
}

Couchbase DB configurations:

spring.couchbase.connection-string=localhost
spring.couchbase.password=
spring.couchbase.username=
spring.data.couchbase.auto-index=true
spring.data.couchbase.bucket-name=employees

Whenever you upload a file, your browser sends it — and other fields in the form — to the server as a multipart/form-data message. The exact format of these messages is described in RFC 7578. If you submit a simple form with a single text field called foo and a file selector called file, the multipart/form-data message looks something like this:

POST / HTTP/1.1
Host: example.com
Content-Type: multipart/form-data;boundary="boundary" (1)

--boundary (2)
Content-Disposition: form-data; name="foo" (3)

bar
--boundary (4)
Content-Disposition: form-data; name="file"; filename="lorum.txt" (5)
Content-Type: text/plain

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer iaculis metus id vestibulum nullam.

--boundary-- (6)

In reactive environment the file contents, boundaries can be split in more than one buffers and as a result if we try to read the csv file content using opencsv library will face exceptions.

One way to solve this problem would be to wait until all buffers have been received, join them and then use opencsv library to read the csv. Please be cautioned joining multiple buffers into one effectively stores the entire multipart message in memory.

@Service
public class EmployeeServiceImpl implements EmployeeService {

 @Autowired
 EmployeeRepository repo;

 private List<String> getContents(String content) {
  Supplier<Stream<String>> supp = content::lines;
  return supp.get().toList();
 }

 @Override
 public Mono<Response> csvFileUpload(FilePart file) {
  // TODO Auto-generated method stub

  List<Employee> employees = new ArrayList<Employee>();

  return DataBufferUtils.join(file.content()).map(data -> {
   byte[] bytes = null;
   try {
    bytes = data.asInputStream().readAllBytes();
   } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
   DataBufferUtils.release(data);
   return new String(bytes, StandardCharsets.UTF_8);
  }).map(this::getContents).flatMap(row -> {

   for (int i = 0; i < row.size(); i++) {
    try (CSVReader reader = new CSVReader(new StringReader(row.get(i)))) {
     String[] empRec = reader.readNext();

     Employee emp = Employee.builder().name(empRec[0]).department(empRec[1]).id("assgg").build();

     employees.add(emp);
    } catch (IOException ex) {
     System.out.println("error occured::" + ex.getMessage());
     return Mono.error(ex);
    } catch (CsvValidationException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
     return Mono.error(e);
    }
   }

   return repo.saveAll(employees).collectList().map(dblist -> {
    Response resp = Response.builder().code(200).message("Upload Successful").build();
    return resp;

   });
  });

 }
}

EmployeeRepository:


@Repository
public interface EmployeeRepository extends ReactiveCouchbaseRepository<Employee, String>{

 
}

References:

https://spring.io/blog/2021/09/14/efficient-parsing-of-reactive-buffer-streams

mahendravarman.m@gmail.com

Recent Posts

Spring Webflux Functional Endpoint – File Upload

In this blog using the Spring WebFlux module, we are going to leverage the functional…

10 months ago

Serverless Functions with Spring Cloud Function, AWS Lambda

Spring Cloud Function is a project within the Spring ecosystem that allows developers to build…

10 months ago

Spring Boot + RabbitMQ – Decoupling Microservices Communication

RabbitMQ is an open-source message broker software that implements the Advanced Message Queuing Protocol (AMQP).…

10 months ago

Spring Integration – Sending files over SFTP

Spring Integration is a powerful extension of the Spring Framework designed to support the implementation…

11 months ago

Spring Cloud Config Client

The Spring Cloud Config Client is a component of the Spring Cloud framework that enables…

11 months ago

Handling CSV in Python

In Python, handling CSV (Comma Separated Values) files is easy using the built-in csv module.…

11 months ago