A very small demo Application showing various aspects of Spring Data reactive support. You can walk through the commits one by one to see the application evolve.
The application is supposed to store, retrieve and expose Person
s from an embedded MongoDB instance in a non blocking way.
Nothing fancy in here - we just aim to create a Stream
of Person
objects randomly named after characters from Game of Thrones.
String[] names = { "Eddard", "Catelyn", "Jon", "Rob", "Sansa", "Aria", "Bran", "Rickon" };
Flux<Person> starks = Flux
.fromStream(Stream.generate(() -> names[ramdom.nextInt(names.length - 1)]).map(Person::new));
Set up a ReactiveCrudRepository
for MongoDB and store a new Person
from the Stream
each and every second.
Flux.interval(Duration.ofSeconds(1))
.zipWith(starks)
.map(Tuple2::getT2)
.flatMap(repository::save)
.subscribe();
interface PersonRepository extends ReactiveCrudRepository<Person, String> {}
Create a simple derived finder method and expose the result via Spring WebFlux.
@RestController
@RequiredArgsConstructor
static class PersonController {
final PersonRepository repository;
@GetMapping("/") // curl localhost:8080/?name=Eddard
Flux<Person> fluxPersons(String name) {
return repository.findAllByName(name);
}
}
interface PersonRepository extends ReactiveCrudRepository<Person, String> {
Flux<Person> findAllByName(String name);
}
Use StepVerifier
from reactor-test
for creating unit and integration tests for the application.
@Test
public void saveAndFindAll() {
StepVerifier.create(repository.save(new Person("Aria")))
.expectNextCount(1)
.verifyComplete();
StepVerifier.create(repository.findAllByName("Aria").take(1))
.consumeNextWith(value -> Assert.assertTrue(value.getName().equals("Aria")))
.verifyComplete();
}
Use RxJava Observable
instead of Reactor Flux
for reading and exposing data.
@GetMapping("/rx") // curl localhost:8080/rx?name=Eddard
Observable<Person> rxPersons(String name) {
return repository.findByName(name);
}
interface PersonRepository extends ReactiveCrudRepository<Person, String> {
//...
Observable<Person> findByName(String name);
}
Use MongoDB capped collections and tailable cursors to create an infinite stream.
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) // curl localhost:8080/stream
Flux<Person> streamPersons() {
return repository.findBy();
}
interface PersonRepository extends ReactiveCrudRepository<Person, String> {
//...
@Tailable
Flux<Person> findBy();
}
Transition to a more functional style routing declaration using RouterFunctions
.
@Bean
RouterFunction<ServerResponse> routerFunction(PersonHandler requestHandler) {
return RouterFunctions
.route(RequestPredicates.GET("/"), requestHandler::fluxPersons)
.andRoute(RequestPredicates.GET("/rx"), requestHandler::rxPersons)
.andRoute(RequestPredicates.GET("/stream"), requestHandler::streamPersons);
}
@Component
@RequiredArgsConstructor
static class PersonHandler {
final PersonRepository personRepository;
Mono<ServerResponse> fluxPersons(ServerRequest request) {
return ServerResponse.ok()
.body(personRepository.findAllByName(request.queryParam("name").orElse("")), Person.class);
}
Mono<ServerResponse> rxPersons(ServerRequest request) {
return ServerResponse.ok()
.body(new PublisherAdapter(personRepository.findByName(request.queryParam("name").orElse(""))), Person.class);
}
Mono<ServerResponse> streamPersons(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(personRepository.findBy(), Person.class);
}
}
Consume data from remote service using WebClient
.
@Bean
WebClient client() {
return WebClient.create("http://localhost:8080/");
}
@Bean
CommandLineRunner run(WebClient client) {
return (args) -> {
client.get()
.uri(builder -> builder.path("/").queryParam("name", "Eddard").build())
.retrieve()
.bodyToFlux(Person.class)
.subscribe(System.out::println);
};
}