Repository: link
Concept
In this post we will create notification service. Which can be used to send notifications from server to client (e.g. from backend to frontend)
Use Cases
- Group Chat, messages to groups should be sent to all users on that group
- Event System, subscribe to some events and get notified
- User RabbitMq Message Broker
Specification
- Send notification to user
- Subscribe to specific user notifications as topic (each subscription will get a copy)
- User RabbitMq Message Broker and queue per user approach
Project Setup
Setup Spring Boot Project
Dependencies
Maven
<dependencies>
<dependency>
<groupId>io.projectreactor.rabbitmq</groupId>
<artifactId>reactor-rabbitmq</artifactId>
<version>1.5.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp'
compile "io.projectreactor.rabbitmq:reactor-rabbitmq:1.5.2"
implementation 'org.springframework.boot:spring-boot-starter-webflux'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
}
RabbitMq Configuration
We create following Beans for configuring RabbitMQ
Mono Connection:
// the mono for connection, it is cached to re-use the connection across sender and receiver instances
// this should work properly in most cases
@Bean
Mono<Connection> rabbitMqConnection(RabbitProperties rabbitProperties) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(rabbitProperties.getHost());
connectionFactory.setPort(rabbitProperties.getPort());
connectionFactory.useNio(); // <- with this flag our RabbitMq connection will be non-blocking
connectionFactory.setUsername(rabbitProperties.getUsername());
connectionFactory.setPassword(rabbitProperties.getPassword());
return Mono.fromCallable(() -> connectionFactory.newConnection("reactor-rabbit")).cache();
}
Sender bean for sending messages from application to message broker:
@Bean
Sender sender(Mono<Connection> connectionMono) {
return RabbitFlux.createSender(new SenderOptions()
.connectionMono(connectionMono));
}
Receiver bean for receiving messages from message broker any topic:
@Bean
Receiver receiver(Mono<Connection> connectionMono) {
return RabbitFlux.createReceiver(new ReceiverOptions()
.connectionMono(connectionMono));
}
Close broker connection before application is destroying:
@PreDestroy
public void close() throws Exception {
rabbitMqConnectionMono.block().close();
}
Architecture
Application General Flow:
RabbitMq Flow:
All Messages send to exchange(left side).
All User subscriptions(listening to new messages) are bound to separate queue.
Queues are An exclusive queue, means that after user disconnect from subscription queue will be deleted automatically.
Api for Sending Message
Controller: link
@PostMapping
public Mono<Void> sendMessage(@PathVariable long userId,
@RequestBody String content) {
return userMessageService.sendMessage(userId, content);
}
Service: link
topicName is the name of exchange:
private final String topicName = "user-simple-message";
We need to define routingKey to send a message to correct groups, each user has only one routingKey
String routingKey = topicName + "-" + userId;
OutboundMessage message = new OutboundMessage(topicName, routingKey, content.getBytes());
Declare Exchange:
final Mono<AMQP.Exchange.DeclareOk> declareExchange = sender.declareExchange(
ExchangeSpecification.exchange()
.name(topicName)
.durable(true)
.type("topic")
);
Send message:
return declareExchange.flatMap(item -> sender.send(Mono.fromSupplier(() -> message)));
Final Code:
@Override
public Mono<Void> sendMessage(long userId, String content) {
String routingKey = topicName + "-" + userId;
OutboundMessage message = new OutboundMessage(topicName, routingKey, content.getBytes());
final Mono<AMQP.Exchange.DeclareOk> declareExchange = sender.declareExchange(
ExchangeSpecification.exchange()
.name(topicName)
.durable(true)
.type("topic")
);
return declareExchange
.flatMap(item -> sender.send(Mono.fromSupplier(() -> message)));
}
Subscribe to user notifications
Controller: link
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> receive(@PathVariable long userId,
@RequestParam(required = false) Integer timeout,
@RequestParam(required = false) Integer maxMessageCount) {
return userMessageService.receive(userId, timeout == null ? null : Duration.ofMillis(timeout), maxMessageCount);
}
Service: link
topicName is the name of exchange:
private final String topicName = "user-simple-message";
RotingKey will be used to bind queue to exchange
String routingKey = topicName + "-" + userId;
Declare Queue:
final Mono<AMQP.Queue.DeclareOk> declareQueue = sender
.declareQueue(QueueSpecification.queue())
.log("declare-queue", Level.FINER);
Bind Queue: This code will create a queue and bind it to exchange via routingKey, queueName is returned
final Mono<String> bindQueue = declareQueue
.flatMap(declareOk ->
sender.bindQueue(
BindingSpecification.binding()
.queue(declareOk.getQueue())
.exchange(topicName)
.routingKey(routingKey)
).map(bindOk -> declareOk.getQueue())) // this code is for returning queueName instead of bind result
.log("bind-queue", Level.FINER);
Prepare receiver flux:
Flux<String> result = bindQueue
.flatMapMany(receiver::consumeAutoAck)
.map(item -> new String(item.getBody()));
If timeout or maxMessage count is given, prepare them:
if (timeout != null) {
result = result.timeout(timeout);
}
if (maxMessageCount != null) {
result = result.limitRequest(maxMessageCount);
}
Final Code:
@Override
public Flux<String> receive(long userId, Duration timeout, Integer maxMessageCount) {
final String routingKey = topicName + "-" + userId;
final Mono<AMQP.Queue.DeclareOk> declareQueue = sender
.declareQueue(QueueSpecification.queue())
.log("declare-queue", Level.FINER);
final Mono<String> bindQueue = declareQueue
.flatMap(declareOk ->
sender.bindQueue(
BindingSpecification.binding()
.queue(declareOk.getQueue())
.exchange(topicName)
.routingKey(routingKey)
).map(bindOk -> declareOk.getQueue())) // this code is for returning queueName instead of bind result
.log("bind-queue", Level.FINER);
Flux<String> result = bindQueue
.flatMapMany(receiver::consumeAutoAck)
.map(item -> new String(item.getBody()));
if (timeout != null) {
result = result.timeout(timeout);
}
if (maxMessageCount != null) {
result = result.limitRequest(maxMessageCount);
}
return result;
}