Elixir TCP server and client example
This post is about an unfinished project called observerio that I haven't had enough free time to complete recently.
I've been working on this project to enable real-time testing of different game strategies. The result is a project with an HTTP API, TCP server, Ember dashboard, and a core library based on Go (using xgo compilation for cross-platform support).
I'd like to share some examples that might be useful for others.
Why Elixir?
If someone asked me why I chose Elixir for the backend, I'd find it hard to explain precisely. I was likely inspired by my previous experience at my startup musicfeed. Elixir has an elegant syntax, a supportive community, and excellent documentation. However, concepts like gen_server and other tools can be complex to understand if you're learning Elixir as your first programming language. While some might tell you that you don't need to learn Erlang to use Elixir, that's only partially true, though the number of Elixir libraries is growing rapidly.
Project Architecture
I decided to use a TCP server and client to send variables and logs from mobile games via an internal SDK. When the TCP server receives variables or logs, it publishes messages through PubSub (gateway) to subscribed WebSocket handlers. If a WebSocket connection is active, it automatically publishes the changes to the Ember dashboard. This might sound more like a fun experiment than a production-ready solution, but I built it for enjoyment anyway!
Let's review the TCP server implementation (please feel free to suggest improvements, as this is open source):
Supervisors for TCP Server and Client
First, we define supervisors for the TCP server and client. The client supervisor is used to create communication between the WebSocket (via PubSub) and the TCP client:
defmodule Web.Tcp.ServerSupervisor do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, [], name: :tcp_server_supervisor)
end
def init(_) do
children = [
worker(Web.Tcp.Server, [])
]
supervise(children, strategy: :one_for_one)
end
end
defmodule Web.Tcp.ClientSupervisor do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, [], name: :tcp_client_supervisor)
end
def start_client(token) do
Supervisor.start_child(:tcp_client_supervisor, [token])
end
def init(_) do
children = [
worker(Web.Tcp.Client, [])
]
# We use the `simple_one_for_one` strategy.
# With this strategy, we define just a "template" for a child,
# no process is started during the Supervisor initialization, only
# when we call `start_child/2`
supervise(children, strategy: :simple_one_for_one)
end
end
Server Worker
Next, we define the server worker and pass the module name Web.Tcp.Handler
as the entry point for processing connected clients:
defmodule Web.Tcp.Server do
require Logger
def start_link do
Logger.debug("[tcp] starting server on port :#{_port()}")
opts = [port: _port()]
{:ok, _} = :ranch.start_listener(
:tcp, _acceptors_size(), :ranch_tcp, opts, Web.Tcp.Handler, [])
end
def _port do
String.to_integer Application.get_env(:web, :tcp_port)
end
def _acceptors_size do
String.to_integer Application.get_env(:web, :tcp_acceptors_size)
end
end
Client Worker
Now we define a module that works as a worker for connected TCP clients. On initialization, it subscribes to a PubSub channel using the API key (per user). This allows it to receive messages from PubSub and send them back to the TCP client socket:
defmodule Web.Tcp.Client do
require Logger
require Poison
alias Web.Pubsub
def start_link(token) do
GenServer.start_link(__MODULE__, token, name: String.to_atom(token))
end
def init(token) do
Pubsub.subscribe("#{token}:vars:callback")
{:ok, %{token: token, messages: []}}
end
def handle_info(%{vars: vars}, %{token: token, messages: messages} = state) do
Logger.debug("[tcp.client] received message: #{inspect(vars)}")
message = _pack(token, "vars", vars)
Logger.debug("[tcp.client] packed message: #{inspect(message)}")
messages = messages ++ [message]
Logger.debug("[tcp.client] begin send message: #{inspect(messages)}")
state = token |> _get_socket |> _send_back(messages, state)
Logger.debug("[tcp.client] done send message: #{inspect(messages)}")
Logger.debug("[tcp.client] messages: #{inspect(messages)}")
{:noreply, state}
end
def terminate(reason, status) do
Logger.debug("[tcp.client] reason: #{inspect(reason)}, status: #{inspect(status)}")
:ok
end
defp _send_back({:ok, socket}, messages, state) do
:ok = _send(socket, messages)
%{state | messages: []}
end
defp _send_back(:enqueue, messages, state) do
%{state | messages: messages}
end
defp _send(s, []), do: :ok
defp _send({socket, transport} = s, [message | messages]) do
transport.send(socket, message)
_send(s, messages)
end
def _pack(token, "vars", vars) do
vars = vars
|> Poison.encode!
|> Base.encode64
"v:#{token}:#{vars}\n"
end
defp _get_socket(token) do
Logger.debug("[tcp.socket] search for socket, transport by token: #{inspect(token)}")
response = case Registry.lookup(Registry.Sockets, token) do
[{_, socket}] -> {:ok, socket}
[] -> :enqueue
end
Logger.debug("[tcp.client] _get_socket: #{inspect(response)}")
response
end
end
TCP Handler
The TCP handler registers the socket in the registry when it receives a connection and starts a new client worker to subscribe to the PubSub channel by API key, establishing communication between PubSub and the TCP client:
defmodule Web.Tcp.Handler do
require Logger
alias Web.Pubsub
@moduledoc """
`Handler` waits for lines separated by \n (newline). If the handler doesn't
see a newline, it accumulates data until it receives one.
`Registry.Sockets` contains api_key -> socket records for easy communication
from the dashboard page to TCP clients.
"""
def start_link(ref, socket, transport, opts) do
pid = spawn_link(__MODULE__, :init, [ref, socket, transport, opts])
{:ok, pid}
end
def init(ref, socket, transport, _opts = []) do
:ok = :ranch.accept_ack(ref)
case transport.peername(socket) do
{:ok, _peer} -> loop(socket, transport, "")
{:error, reason} -> Logger.error("[tcp.handler] init receive error reason: #{inspect(reason)}")
end
end
@timeout 5_000
def loop(socket, transport, acc) do
# Don't flood messages of transport, receive once and leave the remaining
# data in socket until we run recv again.
transport.setopts(socket, [active: :once])
# Before proceeding with the receive block on messages, we should call
# transport.messages() once to ping ranch
{ok, closed, error} = transport.messages()
receive do
{ok, socket, data} ->
Logger.debug("[tcp.handler] received data: #{inspect(data)}")
acc <> data
|> String.split("\n")
|> Enum.map(&(String.trim(&1)))
|> _process(socket, transport)
loop(socket, transport, "")
{closed, socket} ->
Logger.debug("[tcp.handler] closed socket: #{inspect(socket)}")
{error, socket, reason} ->
Logger.error("[tcp.handler] socket: #{inspect(socket)}, closed because of error reason: #{inspect(reason)}")
{:error, error} ->
Logger.error("[tcp.handler] error: #{inspect(error)}")
{'EXIT', parent, reason} ->
Logger.error("[tcp.handler] exit parent reason: #{inspect(reason)}")
Process.exit(self(), :kill)
message ->
Logger.debug("[tcp.handler] message on receive block: #{inspect(message)}")
end
end
defp _kill(), do: Process.exit(self(), :kill)
defp _process([], socket, transport), do: loop(socket, transport, "")
defp _process([""], socket, transport), do: loop(socket, transport, "")
defp _process([line, ""], socket, transport) do
_protocol(line, socket, transport)
loop(socket, transport, "")
end
defp _process([line], socket, transport), do: loop(socket, transport, line)
defp _process([line | lines], socket, transport) do
_protocol(line, socket, transport)
_process(lines, socket, transport)
end
defp _protocol(line, socket, transport) do
Logger.debug("[_protocol] line: #{line}")
case line |> Web.Tcp.Protocol.process do
{:verified, api_key} ->
_register_socket(api_key, socket, transport)
Web.Tcp.ClientSupervisor.start_client(api_key)
Logger.debug("[tcp.server] transport should respond with OK")
case transport.send(socket, "OK\n") do
{:error, reason} ->
Logger.error(inspect(reason))
_ ->
end
{:error, reason} ->
Logger.error("[tcp] #{inspect(reason)}")
:error ->
Logger.error("error on processing: #{inspect(line)}")
_ ->
end
end
def _register_socket(api_key, socket, transport) do
Logger.debug("[tcp.handler] _register_socket token: #{api_key}")
case Registry.register(Registry.Sockets, api_key, {socket, transport}) do
{:error, {:already_registered, _pid}} ->
Registry.update_value(Registry.Sockets, api_key, fn (_) -> {socket, transport} end)
{:error, reason} ->
Logger.error("[tcp] reason: #{inspect(reason)}")
_ ->
end
end
end
Protocol Handler
Web.Tcp.Protocol
defines the parser and handler for commands passed via the TCP client:
Example commands:
l:logs:<base64 json messages>
i:vars:<base64 json variables>
v:api_key - client should pass api key to verify that we have a registered client
defmodule Web.Tcp.Protocol do
require Logger
require Poison
alias Web.Gateway
alias Web.Db.Users
@moduledoc """
Server messages:
- `l:logs`
logs - should come as json array and encoded base64
- `i:vars`
vars - should come as json dictionary and encoded by base64
- `v:api_key`
api_key - should verify key using our registry
Client messages:
- `i:s:name:value` - var set by name value inside of app
"""
def process("l:" <> <<api_key :: bytes-size(12)>> <> ":" <> logs) do
Logger.debug("[protocol] api_key: #{api_key}, logs: #{inspect(logs)}")
logs
|> Base.decode64!
|> Poison.decode!
|> Gateway.logs(api_key)
end
def process("i:" <> <<api_key :: bytes-size(12)>> <> ":" <> vars) do
Logger.debug("[protocol] api_key: #{api_key}, vars: #{inspect(vars)}")
vars
|> Base.decode64!
|> Poison.decode!
|> Gateway.vars(api_key)
end
def process("v:" <> <<api_key :: bytes-size(12)>>) do
if Users.verify_key(api_key) do
{:verified, api_key}
else
{:error, "not registered user with api_key: #{api_key}"}
end
end
def process(_), do: :error
end
Test Cases
Finally, here's a test case to review the basic expected behavior tcp_test.ex:
defmodule Web.TcpTest do
use ExUnit.Case
doctest Web.Tcp
alias Web.Db.Users
require RedisPoolex
require Logger
require Poison
require Tirexs.Query
alias RedisPoolex, as: Redis
setup do
Redis.query(["FLUSHDB"])
{:ok, api_key} = Users.register(%{email: "user1@example.com", password: "12345678"})
port = Application.get_env(:web, :tcp_port)
host = "127.0.0.1" |> String.to_char_list
{:ok, socket} = :gen_tcp.connect(host, port, [active: false])
{:ok, socket: socket, api_key: api_key}
end
test "should register user session", %{socket: socket, api_key: api_key} do
:ok = :gen_tcp.send(socket, "v:" <> api_key <> "\n")
{:ok, reply} = :gen_tcp.recv(socket, 0, 1000)
assert reply == 'OK'
end
test "should bulk insert logs on tcp request", %{socket: socket, api_key: api_key} do
:ok = :gen_tcp.send(socket, "v:" <> api_key <> "\n")
:ok = :gen_tcp.send(socket, "l:" <> api_key <> ":" <> ([%{message: "testing1", timestamp: 123123123}, %{message: "testing2", timestamp: 123123123}] |> Poison.encode! |> Base.encode64) <> "\n")
:timer.sleep(2000)
assert {:ok, 200, %{hits: %{hits: hits}} = response} = Tirexs.Query.create_resource([index: "logs-#{api_key}", search: ""])
assert Enum.count(hits) == 2
end
# TODO: add marker support to create separate sessions on multiple devices.
# we could have separate dashboards for the different devices.
test "should store vars on tcp request", %{socket: socket, api_key: api_key} do
:ok = :gen_tcp.send(socket, "v:" <> api_key <> "\n")
:ok = :gen_tcp.send(socket, "i:" <> api_key <> ":" <> ([%{name: "testing1", type: "string", value: "example"}, %{name: "testing2", type: "integer", value: "-1"}] |> Poison.encode! |> Base.encode64) <> "\n")
:timer.sleep(2000)
vars = Redis.query ["HKEYS", "#{api_key}:vs"]
assert Enum.count(vars) == 4
assert vars == ["testing1:type", "testing1:value", "testing2:type", "testing2:value"]
end
end
In future posts, I'll share the modules for PubSub and Gateway implementation, which should be straightforward to understand.
This project was created for fun, so if you plan to use similar approaches in production, please proceed with caution. I don't work with Elixir daily, so there might be areas for improvement in this implementation.
Thanks for reading!