Spring Webflux – File upload

In this blog lets see how to upload a CSV file through reactive REST API and to load the file contents to Couchbase DB.

Create a project from start.spring.io with the following dependency

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-data-couchbase-reactive</artifactId>
</dependency>
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

opencsv is a library for reading and writing CSV in java. Add this library to the project with the below dependency

<dependency>
    <groupId>com.opencsv</groupId>
    <artifactId>opencsv</artifactId>
    <version>5.7.1</version>
</dependency>

Define the REST API as below. Please note to accept the file as input used @RequestPart annotation that can be used to associate the part of a “multipart/form-data” request with a method argument.

@RestController
public class EmployeeController {
	
	
	@Autowired
	EmployeeService service;

	
	@PostMapping("/employee")
	public Mono<Response> fileUpload(@RequestPart(name="file",required=true) FilePart file){
		return service.csvFileUpload(file);
		
	}
}

Employee entity:

@Document
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Employee {

	@Id
	@GeneratedValue
	private String id;
	
	@Field
	private String name;
	
	@Field
	private String department;
}

Couchbase DB configurations:

spring.couchbase.connection-string=localhost
spring.couchbase.password=
spring.couchbase.username=
spring.data.couchbase.auto-index=true
spring.data.couchbase.bucket-name=employees

Whenever you upload a file, your browser sends it — and other fields in the form — to the server as a multipart/form-data message. The exact format of these messages is described in RFC 7578. If you submit a simple form with a single text field called foo and a file selector called file, the multipart/form-data message looks something like this:

POST / HTTP/1.1
Host: example.com
Content-Type: multipart/form-data;boundary="boundary" (1)

--boundary (2)
Content-Disposition: form-data; name="foo" (3)

bar
--boundary (4)
Content-Disposition: form-data; name="file"; filename="lorum.txt" (5)
Content-Type: text/plain

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer iaculis metus id vestibulum nullam.

--boundary-- (6)

In reactive environment the file contents, boundaries can be split in more than one buffers and as a result if we try to read the csv file content using opencsv library will face exceptions.

One way to solve this problem would be to wait until all buffers have been received, join them and then use opencsv library to read the csv. Please be cautioned joining multiple buffers into one effectively stores the entire multipart message in memory.

@Service
public class EmployeeServiceImpl implements EmployeeService {

	@Autowired
	EmployeeRepository repo;

	private List<String> getContents(String content) {
		Supplier<Stream<String>> supp = content::lines;
		return supp.get().toList();
	}

	@Override
	public Mono<Response> csvFileUpload(FilePart file) {
		// TODO Auto-generated method stub

		List<Employee> employees = new ArrayList<Employee>();

		return DataBufferUtils.join(file.content()).map(data -> {
			byte[] bytes = null;
			try {
				bytes = data.asInputStream().readAllBytes();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			DataBufferUtils.release(data);
			return new String(bytes, StandardCharsets.UTF_8);
		}).map(this::getContents).flatMap(row -> {

			for (int i = 0; i < row.size(); i++) {
				try (CSVReader reader = new CSVReader(new StringReader(row.get(i)))) {
					String[] empRec = reader.readNext();

					Employee emp = Employee.builder().name(empRec[0]).department(empRec[1]).id("assgg").build();

					employees.add(emp);
				} catch (IOException ex) {
					System.out.println("error occured::" + ex.getMessage());
					return Mono.error(ex);
				} catch (CsvValidationException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
					return Mono.error(e);
				}
			}

			return repo.saveAll(employees).collectList().map(dblist -> {
				Response resp = Response.builder().code(200).message("Upload Successful").build();
				return resp;

			});
		});

	}
}

EmployeeRepository:


@Repository
public interface EmployeeRepository extends ReactiveCouchbaseRepository<Employee, String>{

	
}

References:

https://spring.io/blog/2021/09/14/efficient-parsing-of-reactive-buffer-streams

Leave a Reply

Your email address will not be published. Required fields are marked *