From 0 to DynamicSupervisor — Handling Processes in Elixir
Handling Elixir Processes: From 0 to DynamicSupervisor (Source: https://ferd.ca/the-zen-of-erlang.html)I started my software development career as a Java developer and it had been my language of choice until December of 2016, which was the time when I decided to join Coletiv and started coding in Elixir 😀.
With Java it’s possible to use processes and parallelize them but it isn’t common and it’s always a struggle. One of the use cases I used processes for was when I had to process a big amount of data in a reduced timeframe, but as a rule of thumb I avoided them as much as I could.
With Elixir I found another world that, citing Bruce Williams, doesn’t treat concurrency and process management like afterthoughts or advanced features.
A few months ago Elixir was updated to version 1.6 which introduced the DynamicSupervisor.
DynamicSupervisor in my way
In one of my current projects users
have tasks
assigned to them and the backend has to notify the user about the status of their tasks every 5 minutes.
In terms of database structure I have users and they have tasks assigned to them.
When it comes to the erlang/elixir process structure we decided to have a GenServer (Task worker) for each active task, whose responsibility is to constantly update the users about their tasks status.
On top of these processes there is a DynamicSupervisor
responsible for supervising and working as an interface to communicate with the child processes.
In terms of process lifetime it goes like this:
- When the application starts all the
Task workers
are started, one by one, for each active task. - When a user adds a new task, a new
Task worker
is started. - When a user completes a task the respective
Task worker
is killed.
DynamicSupervisor Code
The TaskSupervisor
is a DynamicSupervisor
and we just need to initialise it (start_link/1
& init/1
) and define the functions to start (start_task_worker/1
) and finish (terminate_task_worker/2
) the workers related to them.
defmodule MyApp.TaskSupervisor do
use DynamicSupervisor
alias MyApp.Management
alias MyApp.WorkerStateStorage
def start_link(arg) do
DynamicSupervisor.start_link(__MODULE__, arg, name: __MODULE__)
end
def init(arg) do
DynamicSupervisor.init(arg)
end
def start_task_worker(task_id) do
spec = {MyApp.TaskWorker, %{task_id: task_id}}
DynamicSupervisor.start_child(__MODULE__, spec)
end
def start_all_task_workers() do
Enum.each(Management.list_all_tasks(), fn %{id: task_id} ->
start_task_worker(task_id)
end)
end
def terminate_task_worker(pid, task_id) do
WorkerStateStorage.delete(task_id)
Supervisor.terminate_child(__MODULE__, pid)
end
end
TaskWorker Code
The TaskWorker
is a GenServer
that verifies every 5 minutes if the task is active and notifies the user that the task is still open. When a task is finished the related Task worker
dies.
One important thing is the restart strategy for the GenServer
, we are using the :transient
strategy so that if the Task worker is abnormally terminated (e.g. crashes for some unexpected reason) it is immediately respawned by the DynamicSupervisor
. If the Task worker is normally terminated (e.g. when the task is finished) it is not respawned.
defmodule MyApp.TaskWorker do
use GenServer, restart: :transient
alias MyApp.Notify
alias MyApp.Management
alias MyApp.WorkerStateStorage
# client
def start_link(state) do
GenServer.start_link(__MODULE__, state)
end
# server
def init(state) do
state = Map.put(state, :pid, :erlang.pid_to_list(self()))
schedule_notify_task(300000)
case WorkerStateStorage.save(state) do
true ->
{:ok, state}
false ->
case WorkerStateStorage.get(state.task_id) do
nil ->
{:ok, state}
pid ->
stop(:erlang.list_to_pid(pid), state.task_id)
WorkerStateStorage.save(state)
{:ok, state}
end
end
end
def handle_info(:schedule_notify_task, state) do
case notify(state) do
true ->
stop(self(), state.task_id)
false ->
# Reschedule once more
schedule_notify_task(300000)
end
{:noreply, state}
end
def handle_cast(
{:notify_task},
state
) do
case notify(state) do
true ->
WorkerStateStorage.delete(state.task_id)
{:stop, :normal, state}
false ->
{:noreply, state}
end
end
defp schedule_notify_task(time) do
Process.send_after(self(), :schedule_notify_task, time)
end
defp notify(%{task_id: task_id, pid: pid} = state) do
task = Management.get_task!(task_id)
case task.is_open do
false ->
Notify.notify(task)
true ->
true
end
end
defp stop(pid, task_id) do
WorkerStateStorage.delete(task_id)
if Process.alive?(pid) do
Process.exit(pid, :normal)
end
end
end
StarterWorker Code
In order to start all the workers when the application starts we created a StarterWorker
that whose single responsibility is to start all Task workers. One Task worker per active Task
.
defmodule MyApp.StarterWorker do
@moduledoc """
Start lost children
"""
use GenServer
def start_link() do
GenServer.start_link(__MODULE__, [])
end
def init(state) do
:ets.new(:task_backup, [:set, :public, :named_table])
MyApp.Supervisor.start_all_task_workers()
{:ok, state}
end
end
Lesson learned
Like me, when you start with Elixir you might be afraid of using processes because of the experience you had with previous languages, instead give it a try and you will be surprised how quick and easy it is to work with them.
There are a few great links you should check when trying processes:
Generated at https://memegenerator.net
Thank you for reading! 😊
Thank you so much for reading, it means a lot to us! Also don’t forget to follow Coletiv on Twitter and LinkedIn as we keep posting more and more interesting articles on multiple technologies.
In case you don’t know, Coletiv is a software development studio from Porto specialised in Elixir, iOS, and Android app development. But we do all kinds of stuff. We take care of UX/UI design, web development, and even security for you.