How to support a list of uploads as input with Absinthe GraphQL
As you might guess, in our day-to-day, we write GraphQL queries and mutations for Phoenix applications using Absinthe to be able to create, read, update and delete records.
In one of our current projects here at Coletiv we had the need to continuously keep the data synced between multiple mobile devices. The first solution that came to our minds to keep the state up to date was to poll the server each minute allied with push notifications.
Clearly this wasn’t the best solution in terms of performance and reliability. Since our Backend was made using Phoenix, the initial naive solution naturally evolved to use Phoenix Channels.
In this article we will describe our experience implementing a proof of concept to test Phoenix Channels using the game Slime Soccer. This is a simple game for two players where each one controls a slime and the objective is to score a goal.
The proof of concept was implemented in Swift as a Mac app, but it could be easily implemented in any other language or platform of your choice.
The idea is to have two instances of the game running on different computers, each one will connect to a Phoenix server through Phoenix Channels. The server is responsible for managing the game score and handle the communications between the two players.
There are a few Swift open source projects to help with the channel communications:
We decided to use Birdsong simply because we think that at the moment it was the project that best fits our needs. It has a clean and simple code style, performs a periodic ping to the server and allows some customisations like handling the disconnect actions.
If you want to follow along you can find the server source code here and the client one here.
The following diagram depicts the general communication flow between the Mac clients and the server during a game.
To define a new channel in Phoenix we simply add this line to UserSocket.ex:
channel (“game: *”, SuperSlimeGameWeb.GameChannel)
Our connect function is quite simple because we chose to ignore the authentication steps since it would imply additional development and that was not our goal with this test, but if you need authentication this would be a possible place to had your checks.
defmodule SuperSlimeGameWeb.UserSocket do
use Phoenix.Socket
## Channels
## Define game channel
channel("game:*", SuperSlimeGameWeb.GameChannel)
## Transports - long polling
transport(:websocket, Phoenix.Transports.WebSocket)
def connect(_params, socket) do
{:ok, socket}
end
def id(_socket), do: nil
end
In the connect method we define both onConnect
and onDisconnect
handlers. These closures will be called after the socket is connected/disconnected. In our test we will join the channel immediately after the socket is connected. When the socket is disconnected a reconnect will be performed after a short delay.
public func connect(with onConnect: @escaping () -> (), retryOnDisconnect: Bool = true) {
guard let socket = socket else { return }
self.retryOnDisconnect = retryOnDisconnect
socket.onConnect = onConnect
socket.onDisconnect = {[weak self] error in
guard let weakSelf = self else { return }
weakSelf.reconnect()
}
if !socket.isConnected {
socket.connect()
}
}
private func reconnect() {
guard let socket = socket else { return }
//try to reconnect after a delay
if retryOnDisconnect == true {
DispatchQueue.main.asyncAfter(deadline: .now() + WebSocket.reconnectInterval) {
socket.connect()
}
}
}
func connectSocket() {
WebSocket.instance.setup(with: "http://192.168.1.146:4000/socket/websocket")
WebSocket.instance.connect(with: {[weak self] in
guard
let weakSelf = self
else { return }
WebSocket.instance.joinChannel(with: "game:slimeSoccer",
payload: ["email":"email@email.com"],
channelEventListeners: weakSelf.channelListeners(),
onJoinSuccess: { (payload) in },
onJoinError: { (payload) in })
})
}
The main part of the application is the join function where players join a game. The channel nomenclature and the payload structure are defined as follows:
Channel nomenclature
“game:” <> code
Payload structure
%{“email” => email}
defmodule SuperSlimeGameWeb.GameChannel do
use SuperSlimeGameWeb, :channel
alias SuperSlimeGame.GameState
alias SuperSlimeGame.Error
# join to topic game:*
def join("game:" <> code, %{"email" => email}, socket) do
case Map.has_key?(GameState.games(), code) do
true ->
socket = assign(socket, :player, 2)
game =
code
|> GameState.get_game()
|> Map.put(:player2, %{:email => email, :score => 0})
|> GameState.update_game()
socket = assign(socket, :game, game)
{:ok, game, socket}
false ->
socket = assign(socket, :player, 1)
game =
GameState.create_game(code)
|> Map.put(:player1, %{:email => email, :score => 0})
|> GameState.update_game()
socket = assign(socket, :game, game)
{:ok, game, socket}
end
end
# topic not found
def join(_topic, _payload, _socket) do
{:error, Error.get(:resource_not_found)}
end
def handle_in("playerAction", payload, socket) do
broadcast!(socket, "playerAction", Map.put(payload, :from_player, socket.assigns.player))
{:noreply, socket}
end
end
The game code game:[a_code]
is used to check if the game already exists in the GameState
agent process that was created to manage games. If the game already exists we will add the new player passed in the payload structure, otherwise a new game is created and a new player is added.
The GameState
agent is used to save the games and match players, it is started alongside the server as a supervised child process.
To start the agent add the worker to the Application.ex like this:
worker(SuperSlimeGame.GameState, [])
defmodule SuperSlimeGame.GameState do
@moduledoc """
This module holds the game current state. It also contains the game logic.
Allows to add new players to the board, move them and detect collisions.
"""
@doc """
Used by the supervisor to start the Agent that will keep the game state persistent.
The initial value passed to the Agent is an empty map.
"""
def start_link do
Agent.start_link(fn -> %{} end, name: __MODULE__)
end
@doc """
create a game
"""
def create_game(code) do
game = generate_new_game(code)
Agent.update(__MODULE__, &Map.put_new(&1, game.id, game))
game
end
@doc """
update game to a game
"""
def update_game(game) do
Agent.update(__MODULE__, &Map.put(&1, game.id, game))
game
end
@doc """
Retrieve a game from the map
"""
def get_game(code) do
Agent.get(__MODULE__, &Map.get(&1, code))
end
@doc """
Get all the games in the map
"""
def games do
Agent.get(__MODULE__, & &1)
end
# Game logic
defp generate_new_game(code) do
%{
:id => code,
:player1 => %{:email => "", :score => 0},
:player2 => %{:email => "", :score => 0}
}
end
end
Within the join method we define the channel event listeners. These listeners will be executed if a specific action is returned from the channel (i.e. player moved). It is important to verify the state of the socket to ensure that the persistence of the connection is maintained.
public func joinChannel(with channelId: String,
payload: Socket.Payload,
channelEventListeners: [ChannelEventListener],
onJoinSuccess: @escaping (Socket.Payload) -> (),
onJoinError: @escaping (Socket.Payload) -> ()) {
guard let socket = socket else { return }
let joinChannelBlock: () -> () = {[weak self] in
guard
let weakSelf = self
else { return }
let channel: Channel = socket.channel(channelId, payload: payload)
//map listeners to channel
for listener in channelEventListeners {
channel.on(listener.event, callback: listener.callback)
}
channel.join()?
.receive("ok", callback: onJoinSuccess)
.receive("error", callback: onJoinError)
weakSelf.channels[channelId] = channel
}
// Verify if the socket is connected
if !socket.isConnected {
connect(with: joinChannelBlock)
} else {
joinChannelBlock()
}
}
When the playerAction
command is received, the server will broadcast it to all players that joined the channel.
def handle_in("playerAction", payload, socket) do
broadcast!(socket, "playerAction", Map.put(payload, :from_player, socket.assigns.player))
{:noreply, socket}
end
Each time the scene is updated, a message is sent to the channel with the local player coordinates and actions.
public func sendMessage(with channelId: String,
event: String,
payload: Socket.Payload,
onSuccess: @escaping (Socket.Payload) -> (),
onError: @escaping (Socket.Payload) -> ()) {
guard let socket = socket else { return }
if socket.isConnected {
socket.channels[channelId]?.send(event, payload: payload)?
.receive("ok", callback: onSuccess)
.receive("error", callback: onError)
} else {
// Call error callback when the socket is not connected
onError([:])
socket.connect()
}
}
func sendActions(with player: Player) {
WebSocket.instance.sendMessage(with: "game:slimeSoccer",
event: "playerAction",
payload: [
"player" : player.id,
"jump": player.isJumping,
"left": player.isMovingLeft,
"right": player.isMovingRight,
"x": player.sprite?.position.x as Any,
"y": player.sprite?.position.y as Any],
onSuccess: { (message) in },
onError: { (reason) in })
}
In this example we didn’t add any security within the channel connection, but this is a MUST have in a real world context.
In other projects developed by Coletiv, we need to authenticate the user to allow it to access their resources. This is usually accomplished by using Guardian + ueberauth + JWT + Comeonin. Our concern here is to either send the JWT token in the connect
or join
functions.
We’ve decided to use it within the connect
function because without a valid token no one has access to the endpoint. Otherwise, if the token is passed in the join
function, everyone will perform a successful connection, making the endpoint public.
In this game both players have the responsibility to draw the player positions and simulate the game physics. Obviously this is not the best way to accomplish the movement synchronizations, perhaps a master-slave strategy could be a better solution.
The server MUST accept incoming secure connections using WSS protocol. By default, most servers don’t accept this.
# Configures the endpoint
config :myapp, MyApp.Web.Endpoint,
url: [host: "localhost"],
check_origin: ["//dev.example.com", "//www.example.com"],
secret_key_base: "SECRET_KEY",
render_errors: [view: MyApp.ErrorView, accepts: ~w(json)],
pubsub: [name: MyApp.PubSub, adapter: Phoenix.PubSub.PG2]
On client side its very important to maintain the socket connection alive, which led to more code complexity. We’ve tried to achieve this by implementing the reconnect()
function when the connection is lost, but this is not enough if we need to rejoin the channel and/or use authentication.
During this experience with Phoenix Channels we’ve noticed that the message exchange process is reliable and fast. This simple game, that was not optimized in terms of communication, has a satisfactory performance.
We think that Phoenix Channels serve our purpose of maintaining the state synchronized between multiple clients.
We are implementing a similar solution in one of our current projects and we hope to write soon about our real world experience with Phoenix Channels.
You can find the server source code here and client one here.
Join our newsletter
Be part of our community and stay up to date with the latest blog posts.
SubscribeJoin our newsletter
Be part of our community and stay up to date with the latest blog posts.
SubscribeAs you might guess, in our day-to-day, we write GraphQL queries and mutations for Phoenix applications using Absinthe to be able to create, read, update and delete records.
If you are a Flutter developer you might have heard about or even tried the “new” way of navigating with Navigator 2.0, which might be one of the most controversial APIs I have seen.
A database cron job is a process for scheduling a procedure or command on your database to automate repetitive tasks. By default, cron jobs are disabled on PostgreSQL instances. Here is how you can enable them on Amazon Web Services (AWS) RDS console.