Reading time: 7 minutes and 31 seconds

When working with event-driven applications, you tend to see this on the screen:

We are processing your request and will notify you when it was treated successfully.

Let’s chop the sentence down into processing the request… This means that the server is processing your request, but the client is not sure if it was entirely successful because it got transformed into an event and published on a queue never to be seen again (fire and forget).
The client wonders what has happened and needs a way to give his users the state of the request.
The next statement tells the user it will notify him/her when the request was successful.
This part can get complex because you want to give a rapid response as soon as possible.

The first thing that comes to mind is, can our client not poll the state of the data from the table until it’s ready?

When our client receives a high amount of load, and it’s polling the database for the state of the data, it can put the database under unnecessary stress.
Since polling is a periodically check, it is not real-time, and we want to bring feedback to our users as soon as possible.
To let your client behave in real-time, we need push events.
Push events can be enabled by the concept of WebSockets, this bilateral communication connects the server and the client in an open connection with each other.
This tech post will explain how we enabled push events with RabbitMQ, MQTT, and Spring Cloud Stream.

Table Of Contents

WebSockets for communication

We chose WebSockets because it provides a bilateral open connection between the client and the server.
Because handling data becomes complex over TCP and requires hard work to do it yourself, WebSockets offer support for subprotocols.
These solutions offer us easy ways to transmit data over the wire.
First, let’s talk about opening a WebSocket connection.
To establish one, we need the client to send a WebSocket handshake request, for which the server returns a WebSocket handshake response.

The handshake starts with an HTTP request/response.
Once the connection is established, communication switches to a bidirectional binary protocol which does not conform to the HTTP protocol. The switch happens with the HTTP Upgrade Negotiation, this header allows us to tell the server to switch to the protocol the client desires and open up two-way communication between a client and server.

At a minimum, a successful WebSocket handshake must contain the protocol version, and an auto-generated challenge value sent by the client, followed by a 101 HTTP response code (Switching Protocols) from the server with a hashed challenge-response to confirm the selected protocol version:

  • Client must send Sec-WebSocket-Version and Sec-WebSocket-Key.
  • Server must confirm the protocol by returning Sec-WebSocket-Accept.
  • Client may send a list of application subprotocols via Sec-WebSocket-Protocol.
  • Server must select one of the advertised subprotocols and return it via Sec-WebSocket-Protocol. If the server does not support any, then the connection is aborted.
  • Client may send a list of protocol extensions in Sec-WebSocket-Extensions.
  • Server may confirm one or more selected extensions via Sec-WebSocket-Extensions. If no extensions are provided, then the connection proceeds without them.

Choosing a subprotocol

When I was searching for a suitable subprotocol for handling the data, I first experimented with STOMP.
STOMP has a rich messaging mechanism for handling data and great support for Spring and RabbitMQ.
I stumbled against an issue with our API gateway. To do a security scan, the API gateway had to parse it to XML, which didn’t go well with the UTF-8 text-based messages of STOMP.
Some further research brought us to our next candidate: MQTT.
MQTT, designed as an extremely lightweight pub/sub messaging transport for IoT and mobile devices, could offer us a way to enable WebSockets.

When experimenting, I stumbled on support with RabbitMQ MQTT plugin and RabbitMQ Web MQTT plugin. In MQTT over WebSockets, the MQTT messages are transferred over the network and encapsulated by one or more WebSocket frames.
To communicate with an MQTT broker over WebSockets, the broker must be able to handle native WebSockets.
To provide such support, we decided to use our own managed RabbitMQ. The plugin enables the possibility to use MQTT over a WebSocket connection.
To enable this easily in your broker, you just enable an internal plugin from RabbitMQ itself.

rabbitmq-plugins enable rabbitmq_web_mqtt

Spinning up a RabbitMQ

To try it out you can just run RabbitMQ in a Docker container.
Define the commands in a Dockerfile and off you go!

FROM rabbitmq:3.7-management
RUN rabbitmq-plugins enable --offline rabbitmq_web_mqtt
EXPOSE 4369 5671 5672 25672 15671 15672 15675 1883

Configuration

When accessing RabbitMQ via MQTT, credentials have to be given to authenticate yourself.
Because we will be accessing it from a JS client, we do not want to expose our credentials to our client because it can be exploited.
To avoid giving credentials, MQTT supports us to connect anonymously.

Add these to your rabbitmq.config file, and you’re good to go:

mqtt.default_user = $RABBITMQ_DEFAULT_USER  
mqtt.default_pass = $RABBITMQ_DEFAULT_PASS  
mqtt.allow_anonymous  = true  

Subscribing with a JavaScript client

Eclipse offers us a JavaScript client library to use for opening a WebSocket over MQTT.

With some basic setup, we can fix ourselves a quick WebSocket to the Rabbit to test the handshake.
As the JavaScript client, we will be subscribing to a queue and listen for any notifications from the backend.
To configure our client, we need to know what properties we need.
A list of properties can be found in the documentation.
The most important ones are enabling SSL and using the keep-alive period as described above.

var wsbroker = '{rabbitmq_hostname/ws}'
var wsport = 443; // port for above
// you can use randomizer to be unique "myclientid_" + parseInt(Math.random() * 100, 10));
var client = new Paho.MQTT.Client(wsbroker,wsport, "?access_token={token}","{client}"); 
    
client.onConnectionLost = function (responseObject) {
    console.log("CONNECTION LOST - " + responseObject.errorMessage);
};
client.onMessageArrived = function (message) {
    console.log("RECEIVE ON " + message.destinationName + " PAYLOAD " + message.payloadString);
};
  
  
client.connect({
    useSSL: true,
    onSuccess: function () {
        console.log("CONNECTION SUCCESS");
        client.subscribe('events', {qos: 0});
    },
    onFailure: function (message) {
        debug("CONNECTION FAILURE - " + message.errorMessage);
    }
});

Keeping the heartbeat alive

At any point after the handshake, either the client or the server can choose to send a ping to the other party.
When the ping is received, the recipient must send back a pong as soon as possible.
You can use this to make sure the client is still connected.
A best practice is to set the heartbeat between 20-30 seconds, see https://tools.ietf.org/html/rfc6202#page-13. The client sends a ping every 10 seconds, and the server waits 10 seconds to send back a pong.

MQTT keep-alive period

The keep-alive period is the answer from the MQTT protocol to the WebSocket heartbeat.
The keep-alive is a time interval measured in seconds. It is the maximum time interval that is permitted to elapse between the point at which the client finishes transmitting one control package and the point it starts sending the next.
It is the responsibility of the client to ensure the interval between the control packets being sent does not exceed the keep-alive value.
The client can send a ping at any time, irrespective of the keep-alive value, and use the pong to determine that the network and the server are working.

To configure the keep-alive period, the client can add the property to enable the feature.

client.connect({
        keepAliveInterval: 20
})

TLS over WebSockets

To achieve a secure connection, we need to enable TLS.
Like HTTP, WebSockets supports TLS with using the prefix wss:// instead of ws:// and port 443 instead of 80.

The client can enable TLS by adding a property.

client.connect({
        useSSL: true
})

Authorization token

The best practice for securing your resources is to propagate your token via the query parameter.
If you are targeting a backend, the backend can handle this token but for this use-case, we need a reverse proxy/API gateway to validate this token for us.

var client = new Paho.MQTT.Client(wsbroker,wsport, "?access_token={token}","{client}");

Clean Session

Clean session in the MQTT protocol means that if turned on, the server does not know on what topic the client has subscribed to.
When turned off, the client just needs to reconnect to its session that is stored on the server.

Default is true

Quality Of Service

In combination with the clean session property set to false, the QoS makes your messages durable.
When the client is offline, the server holds these messages until the client reconnects.

Default is 0

MQTT Client

When opening the WebSocket on RabbitMQ, the broker will create a new queue on the default topic amq.topic with a routingKey as the subscriber endpoint.

Workspace

Publishing Events with Spring Cloud Stream

So now we have our RabbitMQ up with the enabled plugin for MQTT over WebSockets, Spring Cloud Stream offers an abstraction for messaging with RabbitMQ as a binder.
Because RabbitMQ is our MQTT broker, we do not need any special configuration to handle MQTT messages. You can just set up a Spring Boot application with the https://start.spring.io.

We start by adding both the dependency for Spring Cloud Stream and the binder of choice.
This indicates that auto-configuration and abstraction are done for RabbitMQ.

Dependencies

<dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

Message Channel

Following up, we need a channel to publish our messages on, so we create an interface to define our channels.
You can have two kinds of channels, one for everyone (broadcast) or one-to-one (private).

public interface UserFeedbackChannel {

    String NOTIFICATION_EVERYONE = "globalNotificationChannel";
    String NOTIFICATION_USER = "specificNotificationChannel";

    @Output(NOTIFICATION_EVERYONE)
    MessageChannel globalNotificationChannel();

    @Output(NOTIFICATION_USER)
    MessageChannel specificNotificationChannel();
}

To let Spring know it is a custom channel, we need to annotate our configuration class with @EnableBinding({UserFeedbackChannel.class}).

Configuration

With RabbitMQ, some custom configuration needs to be taken care of.
Since MQTT takes the topic amq.topic as default, we need to target this as our destination for our messages.
The routingKeyExpression enables us to broadcast or privately send the message.
The headers.routingKey is bound to the user we want to message to.
Our pojo event consists of audit fields that we know of whom the message belongs to.
This way, we can give feedback to the user who did the transaction.
If the header is filled with events, it broadcasts the message.

spring:
  application:
    name: notifications
  cloud:
    stream:
      bindings:
        globalNotificationChannel:
          destination: amq.topic
        specificNotificationChannel:
          destination: amq.topic
      rabbit:
        bindings:
          globalNotificationChannel:
            producer:
              routingKeyExpression: '''events'''
              declareExchange: false
          specificNotificationChannel:
            producer:
              routingKeyExpression: headers.routingKey
              declareExchange: false

Publisher

Create the pojo you need, so we can start publishing!
Be aware, before pushing the pojo, it needs to be converted to a String for MQTT to understand the format.

@Component
public class NotificationSocketPublisher {
    private final UserFeedbackChannel channel;
    private final ObjectMapper objectMapper;

    public NotificationSocketPublisher(UserFeedbackChannel channel, ObjectMapper objectMapper) {
        this.channel = channel;
        this.objectMapper = objectMapper;
    }

    public void sendPrivateNotificationToUser(NotificationSocketEvent event) {
        String object = convertToString(event);
        var notification = MessageBuilder.withPayload(object).setHeader("routingKey", "events." + event.getCreatedBy().toUpperCase());;

        channel.specificNotificationChannel().send(notification.build());
    }
    
}

Result

When the JS client, RabbitMQ, and Spring Cloud backend are running, you can try ìt out by triggering messages from the backend onto the RabbitMQ.
This will result in communication to the correct subscriber.
The JS subscriber will interpret these messages and parse readable content from it.

Workspace

Kevin Van Houtte is a Software Engineer at Ordina Belgium. Passionate in the Spring ecosystem, Kevin is eager to discover new and efficient ways to solve problems. He enjoys a good challenge and is interested in cutting edge technologies. Kevin has a strong focus on building cloud native architectures with the right mindset on security and API design.