A little over a year ago I wrote about loosely coupling Phoenix Contexts using a message quque (in this case Kafka). I received some comments on that blog asking whether I considered using Oban.
Despite being a big fan of Oban, at the time I suggested that it wouldn’t be good for this use case, as its not based on a publish/subscribe model. Instead you queue specific background tasks to be executed, and those tasks are explicitly declared and executed as part of the same business logic code.
However recently I was thinking about how Oban could be used as a message queue specifically as a publish/subscribe system to decouple Phoenix Contexts.
To demonstrate how this could be done I created a basic Phoenix project called Brokker. The logic is similar to the initial post centered around company expenses. Whereby we decouple the payments Context from the expenses Context and AML (anti money laundering), the expenses and AML Contexts both care about payments, but the payment Context should not couple to these Contexts. The Oban architecture I created to demonstrate this is as follows
When payments are created in the payments Context it creates a Ecto Multi transaction, the logic is as follows
def create_payment() do
Ecto.Multi.new()
# Imagine some transaction for payment logic here
|> MessageBroker.queue_publication(:payment_created, %{payment_id: 1, merchant: "Ocado"})
|> Repo.transaction()
end
as we can see this calls MessageBroker.queue_publication
, the logic for this is the following
def queue_publication(multi, event, payload) do
consumers = Application.get_env(:brokker, :message_broker_consumers)[event]
Oban.insert_all(
multi,
:jobs,
Enum.map(consumers, fn consumer -> consumer.new(payload) end)
)
end
this finds all consumer workers that want to be notified about the particular event, and creates an Oban job insert for each one of those workers.
Each consumer is defined within the context, for example
defmodule Brokker.Aml.PaymentMadeConsumerWorker do
use Oban.Worker
require Logger
@impl Oban.Worker
def perform(%Oban.Job{args: %{"payment_id" => id}}) do
Logger.warn("Creating aml flag for payment id '#{id}'")
:ok
end
end
given that the expenses Context also care about this event, they have a similar consumer. You can create a worker for any number of interested consumers.
These consumers are then registered in the elixir config as follows
config :brokker, :message_broker_consumers,
payment_created: [
Brokker.Expenses.PaymentMadeConsumerWorker,
Brokker.Aml.PaymentMadeConsumerWorker
]
Therefore when the MessageBroker.queue_publication
needs to insert the workers, it does a lookup for workers in the config for the given event ID.
This means whenever the producer commits the transaction, the consumer workers will execute. This acts as a multi consumer to one producer architecture.
We can see from this post that it is possible to emulate a publish/subscribe pattern (similar to a message queue like Kafka) just using Oban. There are pros and cons to this approach.
We can see that we can achieve a reliable decoupled architecture with nothing more than Elixir, Postgres and Oban.
I want to give a few shoutouts to posts that inspired me to write this
Happy coding!