From 0 to DynamicSupervisor —  Handling Processes in Elixir

Handling Elixir Processes: From 0 to DynamicSupervisor (Source: https://ferd.ca/the-zen-of-erlang.html) 

I started my software development career as a Java developer and it had been my language of choice until December of 2016, which was the time when I decided to join Coletiv and started coding in Elixir 😀.

With Java it’s possible to use processes and parallelize them but it isn’t common and it’s always a struggle. One of the use cases I used processes for was when I had to process a big amount of data in a reduced timeframe, but as a rule of thumb I avoided them as much as I could.

With Elixir I found another world that, citing Bruce Williams, doesn’t treat concurrency and process management like afterthoughts or advanced features.

A few months ago Elixir was updated to version 1.6 which introduced the DynamicSupervisor.

DynamicSupervisor in my way

In one of my current projects users have tasks assigned to them and the backend has to notify the user about the status of their tasks every 5 minutes.

In terms of database structure I have users and they have tasks assigned to them.

Database structure Database structure

When it comes to the erlang/elixir process structure we decided to have a GenServer (Task worker) for each active task, whose responsibility is to constantly update the users about their tasks status.

On top of these processes there is a DynamicSupervisor responsible for supervising and working as an interface to communicate with the child processes.

In terms of process lifetime it goes like this:

  • When the application starts all the Task workers are started, one by one, for each active task.

  • When a user adds a new task, a new Task worker is started.

  • When a user completes a task the respective Task worker is killed.

Elixir process tree Elixir process tree

DynamicSupervisor Code

The TaskSupervisor is a DynamicSupervisor and we just need to initialise it (start_link/1 & init/1) and define the functions to start (start_task_worker/1) and finish (terminate_task_worker/2) the workers related to them.

defmodule MyApp.TaskSupervisor do
  use DynamicSupervisor

  alias MyApp.Management
  alias MyApp.WorkerStateStorage

  def start_link(arg) do
    DynamicSupervisor.start_link(__MODULE__, arg, name: __MODULE__)
  end

  def init(arg) do
    DynamicSupervisor.init(arg)
  end

  def start_task_worker(task_id) do
    spec = {MyApp.TaskWorker, %{task_id: task_id}}
    DynamicSupervisor.start_child(__MODULE__, spec)
  end

  def start_all_task_workers() do
    Enum.each(Management.list_all_tasks(), fn %{id: task_id} ->
      start_task_worker(task_id)
    end)
  end

  def terminate_task_worker(pid, task_id) do
    WorkerStateStorage.delete(task_id)
    Supervisor.terminate_child(__MODULE__, pid)
  end
end

TaskWorker Code

The TaskWorker is a GenServer that verifies every 5 minutes if the task is active and notifies the user that the task is still open. When a task is finished the related Task worker dies.

One important thing is the restart strategy for the GenServer, we are using the :transient strategy so that if the Task worker is abnormally terminated (e.g. crashes for some unexpected reason) it is immediately respawned by the DynamicSupervisor. If the Task worker is normally terminated (e.g. when the task is finished) it is not respawned.

defmodule MyApp.TaskWorker do

  use GenServer, restart: :transient

  alias MyApp.Notify
  alias MyApp.Management
  alias MyApp.WorkerStateStorage

  # client

  def start_link(state) do
    GenServer.start_link(__MODULE__, state)
  end

  # server

  def init(state) do
    state = Map.put(state, :pid, :erlang.pid_to_list(self()))
    schedule_notify_task(300000)
    case WorkerStateStorage.save(state) do
      true ->
        {:ok, state}

      false ->
        case WorkerStateStorage.get(state.task_id) do
          nil ->
            {:ok, state}

          pid ->
            stop(:erlang.list_to_pid(pid), state.task_id)
            WorkerStateStorage.save(state)
            {:ok, state}
        end
    end
  end

  def handle_info(:schedule_notify_task, state) do
    case notify(state) do
      true ->
        stop(self(), state.task_id)

      false ->
        # Reschedule once more
        schedule_notify_task(300000)
    end

    {:noreply, state}
  end

  def handle_cast(
        {:notify_task},
        state
      ) do
    case notify(state) do
      true ->
        WorkerStateStorage.delete(state.task_id)
        {:stop, :normal, state}

      false ->
        {:noreply, state}
    end
  end

  defp schedule_notify_task(time) do
    Process.send_after(self(), :schedule_notify_task, time)
  end

  defp notify(%{task_id: task_id, pid: pid} = state) do
    task = Management.get_task!(task_id)

    case task.is_open do
      false ->
        Notify.notify(task)

      true ->
        true
    end
  end

  defp stop(pid, task_id) do
    WorkerStateStorage.delete(task_id)

    if Process.alive?(pid) do
      Process.exit(pid, :normal)
    end
  end
end

StarterWorker Code

In order to start all the workers when the application starts we created a StarterWorker that whose single responsibility is to start all Task workers. One Task worker per active Task.

defmodule MyApp.StarterWorker do
  @moduledoc """
  Start lost children
  """
  use GenServer

  def start_link() do
    GenServer.start_link(__MODULE__, [])
  end

  def init(state) do
    :ets.new(:task_backup, [:set, :public, :named_table])
    MyApp.Supervisor.start_all_task_workers()
    {:ok, state}
  end
end

Lesson learned

Like me, when you start with Elixir you might be afraid of using processes because of the experience you had with previous languages, instead give it a try and you will be surprised how quick and easy it is to work with them.

There are a few great links you should check when trying processes:

Elixir processes everywhere Generated at https://memegenerator.net

Thank you for reading! 😊

Thank you so much for reading, it means a lot to us! Also don’t forget to follow Coletiv on Twitter and LinkedIn as we keep posting more and more interesting articles on multiple technologies.

In case you don’t know, Coletiv is a software development studio from Porto specialised in Elixir, iOS, and Android app development. But we do all kinds of stuff. We take care of UX/UI design, web development, and even security for you.

So, let’s craft something together?