mikeybower93.github.io

View My GitHub Profile

Using Oban as a Message Queue

Introduction

A little over a year ago I wrote about loosely coupling Phoenix Contexts using a message quque (in this case Kafka). I received some comments on that blog asking whether I considered using Oban.

Despite being a big fan of Oban, at the time I suggested that it wouldn’t be good for this use case, as its not based on a publish/subscribe model. Instead you queue specific background tasks to be executed, and those tasks are explicitly declared and executed as part of the same business logic code.

However recently I was thinking about how Oban could be used as a message queue specifically as a publish/subscribe system to decouple Phoenix Contexts.

Example

To demonstrate how this could be done I created a basic Phoenix project called Brokker. The logic is similar to the initial post centered around company expenses. Whereby we decouple the payments Context from the expenses Context and AML (anti money laundering), the expenses and AML Contexts both care about payments, but the payment Context should not couple to these Contexts. The Oban architecture I created to demonstrate this is as follows

Producer Logic

When payments are created in the payments Context it creates a Ecto Multi transaction, the logic is as follows

def create_payment() do
    Ecto.Multi.new()
    # Imagine some transaction for payment logic here
    |> MessageBroker.queue_publication(:payment_created, %{payment_id: 1, merchant: "Ocado"})
    |> Repo.transaction()
  end

as we can see this calls MessageBroker.queue_publication, the logic for this is the following

def queue_publication(multi, event, payload) do
  consumers = Application.get_env(:brokker, :message_broker_consumers)[event]

  Oban.insert_all(
    multi,
    :jobs,
    Enum.map(consumers, fn consumer -> consumer.new(payload) end)
  )
end

this finds all consumer workers that want to be notified about the particular event, and creates an Oban job insert for each one of those workers.

Consumer Logic

Each consumer is defined within the context, for example

defmodule Brokker.Aml.PaymentMadeConsumerWorker do
  use Oban.Worker
  require Logger

  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"payment_id" => id}}) do
    Logger.warn("Creating aml flag for payment id '#{id}'")
    :ok
  end
end

given that the expenses Context also care about this event, they have a similar consumer. You can create a worker for any number of interested consumers.

These consumers are then registered in the elixir config as follows

config :brokker, :message_broker_consumers,
  payment_created: [
    Brokker.Expenses.PaymentMadeConsumerWorker,
    Brokker.Aml.PaymentMadeConsumerWorker
  ]

Therefore when the MessageBroker.queue_publication needs to insert the workers, it does a lookup for workers in the config for the given event ID.

This means whenever the producer commits the transaction, the consumer workers will execute. This acts as a multi consumer to one producer architecture.

Conclusions

We can see from this post that it is possible to emulate a publish/subscribe pattern (similar to a message queue like Kafka) just using Oban. There are pros and cons to this approach.

Pros

Cons

Final Words

We can see that we can achieve a reliable decoupled architecture with nothing more than Elixir, Postgres and Oban.

I want to give a few shoutouts to posts that inspired me to write this

Happy coding!