Spring

Spring WebFlux: Real-Time Example

1. Overview

In this article, there is a practical example, both server side, and client side, of the use of Spring WebFlux, a module included by Spring 5 that provides support for reactive programming, being fully asynchronous and non-blocking.

To learn more about Spring WebFlux we can access this link.

2. Maven Dependencies

For this implementation, we will need to add the dependency spring-boot-starter-webflux in our Spring Boot application. Just add to our pom.xml.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

3. Server side

Our server will produce one event per second infinitely, only reporting a sequential number, so we can accompany you on the client side.

The server side can follow two implementation models, being annotation-based and functional endpoints. Both act reactively. In this article we will demonstrate an example of each, providing the same behavior.

3.1. Using Annotation-based

This template follows common Spring MVC annotations as @Controller e @RequestMapping.

The main difference is that it is reactive and non-blocking. Let’s look at the example:

@RestController
public class SequenceExampleController {
    @RequestMapping("/sequence")
    public Flux<Integer> stockTransactionEvents(){
        Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
        Flux<Integer> sequence = Flux.fromStream(Stream.iterate(1, incSeq -> incSeq + 1));
        return Flux.zip(interval, sequence).map(Tuple2::getT2);
    }
}

We create a Flux to set the interval and another Flux to generate a sequential Integer stream. We use the Flux.zip method to gather the two and return.

3.2. Using Functional Endpoints

This model uses functional programming, in which functions are used for route and handle requests. To get started, we need to create a HandlerFunction, which is intended to handle an HTTP request.

Both ServerRequest, used in the parameters, and ServerResponse, used in the return, are immutable reactive interfaces, which exposes the body as Flux or Mono. Flux can represent a stream of zero or multiple elements and Mono can represent a stream of zero or one element.

@Component
public class SequenceExampleHandler {
    public Mono<ServerResponse> sequence(ServerRequest request) {
        Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
        Flux<Integer> sequence = Flux.fromStream(Stream.iterate(1, incSeq -> incSeq + 1));
        return ServerResponse.ok().contentType(MediaType.APPLICATION_STREAM_JSON)
          .body(Flux.zip(interval, sequence).map(Tuple2::getT2), Integer.class);
    }
}

In order for the request to be directed to our HandlerFunction, we need to route it through a RouterFunction, which is similar to @RequestMapping within the @Controller in the annotation-based model.

@Configuration
public class SequenceExampleRouter {
    @Bean
    public RouterFunction<ServerResponse> route(SequenceExampleHandler sequenceExampleHandler) {
        return RouterFunctions.route(
          GET("/sequence").and(RequestPredicates.accept(MediaType.APPLICATION_STREAM_JSON)),
          sequenceExampleHandler::sequence);
    }
}

Only with the implementation demonstrated our server side is ready, available in the “/sequence” URI.

4. Client side

In our client we will consume the events produced and log them, so we will use WebClient, which is a reactive and non-blocking client provided by WebFlux, it is an alternative to @RestTemplate.

At first, we need to instantiate a WebClient for our URI, then we will define the HTTP method of the request and define the accepted content-type.

The exchange() retrieves the response and decodes it. In the subscribe method we already have the value of the response and log, following the evolution of the sequence.

    private WebClient client = WebClient.create("http://localhost:8080");

    public void sequence() {
        client.get().uri("/sequence")
          .accept(MediaType.APPLICATION_STREAM_JSON)
          .exchange().flatMapMany(response -> response.bodyToFlux(Integer.class))
          .subscribe(seq ->
            {log.info("Sequence Number: " + seq + " - Time: " + LocalTime.now());},
            err -> log.error("Stream Error: " + err),
            () -> log.error("Stream Stopped!"));
    }

We still need to create an instance of our WebClient class and call the method to start.

SequenceExampleWebClient sequenceExampleWebClient = new SequenceExampleWebClient();
sequenceExampleWebClient.sequence();

There is a multitude of features that we can use with WebClient, we can find more details here.

5. Checking the Result

After starting the server and execute the client, we can see, in the console output, a result showing the number of the sequence and the time the event arrive, like this:

Sequence Number: 1 - Time: 10:56:45.608
Sequence Number: 2 - Time: 10:56:46.492
Sequence Number: 3 - Time: 10:56:47.492
Sequence Number: 4 - Time: 10:56:48.492
Sequence Number: 5 - Time: 10:56:49.496

6. Conclusion

In this article, we have seen a brief practical example of using the Spring WebFlux module, both on the server side, through the functional and annotation models, and on the client side by WebClient. The source code is available on Github.

Deixe um comentário

Preencha os seus dados abaixo ou clique em um ícone para log in:

Logo do WordPress.com

Você está comentando utilizando sua conta WordPress.com. Sair /  Alterar )

Imagem do Twitter

Você está comentando utilizando sua conta Twitter. Sair /  Alterar )

Foto do Facebook

Você está comentando utilizando sua conta Facebook. Sair /  Alterar )

Conectando a %s