Quarkus - Asynchronous messages between beans

    • point-to-point - send the message, one consumer receives it. If several consumers listen to the address, a round robin is applied;

    • publish/subscribe - publish a message, all the consumers listening to the address are receiving the message;

    • request/reply - send the message and expect a response. The receiver can respond to the message in an asynchronous-fashion

    All these delivery mechanism are non-blocking, and are providing one of the fundamental brick to build reactive applications.

    This mechanism uses the Vert.x EventBus, so you need to enable the extension to use this feature.If you are creating a new project, set the extensions parameter are follows:

    If you have an already created project, the vertx extension can be added to an existing Quarkus project withthe add-extension command:

    1. ./mvnw quarkus:add-extension -Dextensions="vertx"

    Otherwise, you can manually add this to the dependencies section of your pom.xml file:

    1. <dependency>
    2. <groupId>io.quarkus</groupId>
    3. <artifactId>quarkus-vertx</artifactId>
    4. </dependency>

    To consume events, use the io.quarkus.vertx.ConsumeEvent annotation:

    1. package org.acme.vertx;
    2. import io.quarkus.vertx.ConsumeEvent;
    3. import javax.enterprise.context.ApplicationScoped;
    4. @ApplicationScoped
    5. public class GreetingService {
    6. @ConsumeEvent (1)
    7. public String consume(String name) { (2)
    8. return name.toUpperCase();
    9. }
    10. }
    1If not set, the address is the fully qualified name of the bean, for instance, in this snippet it’s org.acme.vertx.GreetingService.
    2The method parameter is the message body. If the method returns something it’s the message response.

    The @ConsumeEvent annotation can be configured to set the address:

    1. @ConsumeEvent("greeting") (1)
    2. public String consume(String name) {
    3. return name.toUpperCase();
    4. }
    1Receive the messages sent to the greeting address

    The return value of a method annotated with @ConsumeEvent is used as response to the incoming message.For instance, in the following snippet, the returned String is the response.

    You can also return a CompletionStage<T> to handle asynchronous reply:

    1. @ConsumeEvent("greeting")
    2. public CompletionStage<String> consume2(String name) {
    3. return CompletableFuture.supplyAsync(name::toUpperCase, executor);
    4. }
    1. @ConsumeEvent("greeting")
    2. public void consume(String event) {
    3. // Do something with the event

    As said above, this mechanism is based on the Vert.x event bus. So, you can also use Message directly:

    1. @ConsumeEvent("greeting")
    2. public void consume(Message<String> msg) {
    3. System.out.println(msg.address());
    4. System.out.println(msg.body());
    5. }

    Ok, we have seen how to receive messages, let’s now switch to the other side: the sender.Sending and publishing messages use the Vert.x event bus:

    1. package org.acme;
    2. import io.vertx.axle.core.eventbus.EventBus;
    3. import io.vertx.axle.core.eventbus.Message;
    4. import javax.inject.Inject;
    5. import javax.ws.rs.GET;
    6. import javax.ws.rs.Path;
    7. import javax.ws.rs.PathParam;
    8. import java.util.concurrent.CompletionStage;
    9. @Path("/async")
    10. public class EventResource {
    11. @Inject
    12. EventBus bus; (1)
    13. @GET
    14. @Path("/{name}")
    15. public CompletionStage<String> hello(String name) {
    16. return bus.<String>send("greeting", name) (2)
    17. .thenApply(Message::body);
    18. }
    19. }

    The EventBus object provides methods to:

    • send a message to a specific address - one single consumer receives the message.

    • publish a message to a specific address - all consumers receive the messages.

    • send a message and expect reply

    1. // Case 1
    2. bus.send("address", "hello");
    3. // Case 2
    4. bus.publish("address", "hello");
    5. // Case 3
    6. bus.send("address", "hello, how are you?").thenAccept(message -> {
    7. // reponse
    8. });

    Let’s revisit a greeting HTTP endpoint and use asynchronous message passing to delegate the call to a separated bean.It uses the request/reply dispatching mechanism.Instead of implementing the business logic inside the JAX-RS endpoint, we are sending a message.This message is consumed by another bean and the response is sent using the reply mechanism.

    First create a new project using:

    You can already start the application in dev mode using .

    Then, creates a new JAX-RS resource with the following content:

    src/main/java/org/acme/vertx/EventResource.java

    1. package org.acme.vertx;
    2. import io.vertx.axle.core.eventbus.Message;
    3. import javax.inject.Inject;
    4. import javax.ws.rs.GET;
    5. import javax.ws.rs.Path;
    6. import javax.ws.rs.PathParam;
    7. import java.util.concurrent.CompletionStage;
    8. @Path("/hello")
    9. public class EventResource {
    10. @Inject EventBus bus;
    11. @GET
    12. @Path("/async/{name}")
    13. public CompletionStage<String> hello(@PathParam("name") String name) {
    14. return bus.<String>send("greeting", name) (1)
    15. .thenApply(Message::body); (2)
    16. }
    17. }
    1send the name to the greeting address
    2when we get the reply, extract the body and send this as response to the user

    src/main/java/org/acme/vertx/GreetingService.java

    1. package org.acme.vertx;
    2. import io.quarkus.vertx.ConsumeEvent;
    3. import javax.enterprise.context.ApplicationScoped;
    4. @ApplicationScoped
    5. public class GreetingService {
    6. @ConsumeEvent("greeting")
    7. public String greeting(String name) {
    8. return "Hello " + name;
    9. }
    10. }

    This bean receives the name, and returns the greeting message.

    Now, open your browser to http://localhost:8080/async/Quarkus, and you should see:

    1. Hello Quarkus

    To better understand, let’s detail how the HTTP request/response has been handled:

    • The request is received by the hello method

    • a message containing the name is sent to the event bus

    • Another bean receives this message and computes the response

    • This response is sent back using the reply mechanism

    • Once the reply is received by the sender, the content is written to the HTTP response

    This application can be packaged using:

    You can also compile it as a native executable with: