Microservices Communication Made Simple: Exploring Mediator and Broker Patterns
Hello everyone! Today I'll be talking about two event-driven approaches in microservices, I'll be explaining the differences between both along with some code written in Golang explaining the thought process. Let's start.
The event-driven architecture consists of highly decoupled, event processing services that asynchronously receive and process events.
Mediator
Mediator topology is commonly used when you need to orchestrate multiple steps within an event through a central mediator. The central mediator then orchestrates the events. Let's look at an example
Let's say we have 3 services; An order service, a payment service and an invoice service.
When a customer places an order we need to orchestrate that it first goes to the order service and some processing happens, then if succeeded proceeds to the payment service and finally the invoice service
If any step of the above fail then we won't proceed to the next service.
Now orchestrating this flow comes within the mediator service. Which is a single point of communication between these services containing all the logic on whether to proceed or not.
The mediator receives the initial event and orchestrates that event by sending events to event consumers to execute each step of the process.
Events can be executed serially (just like our example) or even in parallel depending on your architecture.
Let's look at some code written in Golang demonstrating the mediator pattern
I'll be using Kafka as the messaging pub-sub between the services in this example.
We're going to be having a multi-module workspace in Go as follows;
The Kafka directory is a module that has a generic reader and writer that can be used in any service.
The orders directory is the only one implemented (only to show the concept)
The mediator directory is the orchestrator service
Let's look at the reader in the Kafka directory
package reader
import (
"context"
"fmt"
"log"
"github.com/segmentio/kafka-go"
)
func ReaderInit(topic string) *kafka.Reader {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"127.0.0.1:9092"},
Topic: topic,
MaxBytes: 10e6, // 10MB
})
return r
}
func Read(topic string, fn func(m kafka.Message)) {
fmt.Println("Reading from topic", topic)
r := ReaderInit(topic)
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
fn(m)
}
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
}
The Read function takes in a topic name and a callback, connects to the topic and will execute the callback on any message received.
In the writer Kafka module
package writer
import (
"context"
"errors"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func writerInit(topic string) *kafka.Writer {
w := &kafka.Writer{
Addr: kafka.TCP("127.0.0.1:9092"),
AllowAutoTopicCreation: true,
}
return w
}
func Write(message kafka.Message) {
w := writerInit(message.Topic)
var err error
const retries = 3
for i := 0; i < retries; i++ {
fmt.Println("Attempting to write to topic", message.Topic)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = w.WriteMessages(ctx, message)
if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
time.Sleep(time.Millisecond * 250)
continue
}
if err != nil {
log.Fatalf("unexpected error %v", err)
}
break
}
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}
The write function simply takes a Kafka message and writes it to a topic.
These 2 functions will be used in all our services.
Now for the order service first;
package main
import (
"fmt"
"kafka/reader"
"kafka/writer"
"github.com/segmentio/kafka-go"
)
func main() {
reader.Read("orders_TOPIC", processMessage)
}
func processMessage(m kafka.Message) {
fmt.Println("Processing message")
// some processing
fmt.Println("Message processed")
// write the result to the topic
m.Topic = "MEDIATOR_TOPIC"
fmt.Println("Writing message to mediator-service")
writer.Write(m)
fmt.Println("Message written to mediator-service")
}
I use the read function passing it a callback processMessage
which simply does some processing on the order message received and writes to the mediator topic the result of the processing (whether to proceed or not)
Let's look now at the mediator service. It has a package called order_service
which has logic specifically for the order service
package order_service
import (
"encoding/json"
"kafka/writer"
"github.com/segmentio/kafka-go"
)
// create order struct
type Order struct {
OrderID string `json:"order_id"`
Result bool `json:"result"`
OrderAmount float32 `json:"order_amount"`
}
func WriteToOrderTopic() {
// create order struct
order := Order{
OrderID: "123",
OrderAmount: 412.41
}
val, _ := json.Marshal(order)
// create kafka message instance
msg := kafka.Message{
Key: []byte("order"),
Value: val,
Topic: "orders_TOPIC",
}
// write to order topic
writer.Write(msg)
}
The package helps initialize the whole event flow where it can be started by the mediator or from any external service. It has a function WriteToOrderTopic
where it will write the order object to the order service.
The Kafka message key is order
here, we will have different keys for different message types, etc.
Now for the mediator service itself
package main
import (
"fmt"
"kafka/reader"
"mediator/order_service"
"github.com/segmentio/kafka-go"
)
func main() {
order_service.WriteToOrderTopic() // trigger a order event
reader.Read("MEDIATOR_TOPIC", processMessage)
}
func processMessage(m kafka.Message) {
fmt.Println("MEDIATOR_SERVICE:PROCESSING", string(m.Key))
// COMING FROM DIFFERENT SERVICES
// check the message key
// if key is order, write to order-service
// else if key is payment, write to payment-service, etc
key := string(m.Key)
if key == "order" {
// unmarsal the message
// if the result key is true then proceed to write to payment-service else fail
if checkIfSuccess(m.Value) {
// write to payment-service
println("write to payment-service")
} else {
// write to failure-topic
fmt.Println("write to failure-topic")
}
}
}
func checkIfSuccess(value []byte) bool {
// unmarshal the value
// check if the result key is true
// if true return true else return false
return true
}
The mediator simply checks the key of the message incoming, parses it to check its result and makes a decision on what to do based on the result. In our case, we simply write about a topic named failure-topic
but all in all the mediator is the main orchestrator for all the events flowing through our system.
For initializing the whole flow, the mediator can be a consumer on a topic that gets messages from an external service. Once the mediator consumes a message it starts the flow by sending it to the order service.
Pros
Decoupling: One of the primary benefits of the Mediator pattern is that it promotes loose coupling between microservices. Microservices communicate with each other through a central mediator, reducing direct dependencies between them. This makes it easier to add, remove, or modify microservices without affecting the entire system.
Simplified Communication: The Mediator pattern simplifies the communication flow between microservices. Instead of microservices needing to know the specifics of each other's interfaces, they only need to interact with the mediator. This can lead to cleaner and more manageable code.
Centralized Control: With a mediator in place, you can implement centralized control and coordination logic. This is especially useful for handling cross-cutting concerns like logging, security, and error handling consistently across microservices.
Flexibility: The Mediator pattern allows for flexible and dynamic interactions between microservices. You can easily add new communication channels or change the routing logic within the mediator to adapt to changing business requirements.
Testing: Testing individual microservices becomes more straightforward because you can create mock mediators for testing purposes. This isolation simplifies unit testing and reduces the need for complex integration tests.
Cons
Single Point of Failure: The mediator can become a single point of failure. If the mediator service goes down or experiences performance issues, it can disrupt communication between microservices and impact the entire system.
Complexity: While the Mediator pattern can simplify communication between microservices, it can also introduce complexity, especially in scenarios with a large number of microservices and complex routing logic within the mediator.
Performance Overhead: Depending on the implementation and the volume of messages being processed, the mediation process can introduce some performance overhead, as messages need to pass through an additional layer.
Potential Bottleneck: If not designed and scaled properly, the mediator can become a bottleneck in the system, especially in high-traffic scenarios. This requires careful planning and optimization.
Increased Latency: Because messages often pass through the mediator, there can be additional latency introduced in the communication process compared to direct communication between microservices.
Broker
The broker is nothing but a bunch of events chained together without the need for a central service.
The message flow is distributed across the event consumers in a chain-like fashion through a lightweight message broker. This topology is useful when you have a relatively simple event processing flow and you do not want (or need) central event orchestration.
As you can see every service has its logic and the logic to proceed or not is not in a central service. This flow can reduce complexity if your use case is simple and you don't want the headache of having a central service.
Every event consumer is responsible for processing an event and publishing a new event indicating the action it just performed.
I won't be showing this in the code but all there is to it is that we're going to be moving the mediator logic to each service and the service itself decides to push messages or not deciding on its processing.
Pros
Scalability: The Broker pattern can support high scalability. Message brokers are often designed to handle large volumes of messages and can distribute messages to multiple subscribers efficiently. This makes it easier to scale individual microservices independently.
Decoupling: Event consumers are single-purpose and completely decoupled from other event consumers Changes are generally isolated to one or a few event consumers and can be made quickly without impacting other components.
Ease of deployment: The broker topology tends to be easier to deploy than the mediator topology, primarily because the event mediator component is somewhat tightly coupled to the event processors: A change in an event processor component might also require a change in the event mediator, requiring both to be deployed for any given change.
Cons
Testing: While individual unit testing is not overly difficult, it does require some sort of specialized testing client or testing tool to generate events. Testing is also complicated by the asynchronous nature of this pattern.
Development: Development can be somewhat complicated due to the asynchronous nature of the pattern as well as the need for more advanced error-handling conditions within the code for unresponsive and/or failed message brokers.
Summary
Choosing between the Mediator and Broker patterns for communication is a pivotal decision. The Mediator pattern centralizes communication through a dedicated mediator component, fostering loose coupling and simplifying interaction. In contrast, the Broker pattern chains events into a message bus and relies on a message broker to facilitate asynchronous communication, enhancing scalability and reliability.
However, both patterns have potential downsides and the choice between these patterns should align with specific architectural needs, emphasizing factors like coupling, scalability, and system complexity.