This post is about an unfinished project called observerio that I haven't had enough free time to complete recently.
I've been working on this project to enable real-time testing of different game strategies. The result is a project with an HTTP API, TCP server, Ember dashboard, and a core library based on Go (using xgo compilation for cross-platform support).
I'd like to share some examples that might be useful for others.
Why Elixir?
If someone asked me why I chose Elixir for the backend, I'd find it hard to explain precisely. I was likely inspired by my previous experience at my startup musicfeed. Elixir has an elegant syntax, a supportive community, and excellent documentation. However, concepts like gen_server and other tools can be complex to understand if you're learning Elixir as your first programming language. While some might tell you that you don't need to learn Erlang to use Elixir, that's only partially true, though the number of Elixir libraries is growing rapidly.
Project Architecture
I decided to use a TCP server and client to send variables and logs from mobile games via an internal SDK. When the TCP server receives variables or logs, it publishes messages through PubSub (gateway) to subscribed WebSocket handlers. If a WebSocket connection is active, it automatically publishes the changes to the Ember dashboard. This might sound more like a fun experiment than a production-ready solution, but I built it for enjoyment anyway!
Let's review the TCP server implementation (please feel free to suggest improvements, as this is open source):
Supervisors for TCP Server and Client
First, we define supervisors for the TCP server and client. The client supervisor is used to create communication between the WebSocket (via PubSub) and the TCP client:
defmodule Web.Tcp.ServerSupervisor douse Supervisordef start_link doSupervisor.start_link(__MODULE__, [], name: :tcp_server_supervisor)enddef init(_) dochildren = [worker(Web.Tcp.Server, [])]supervise(children, strategy: :one_for_one)endenddefmodule Web.Tcp.ClientSupervisor douse Supervisordef start_link doSupervisor.start_link(__MODULE__, [], name: :tcp_client_supervisor)enddef start_client(token) doSupervisor.start_child(:tcp_client_supervisor, [token])enddef init(_) dochildren = [worker(Web.Tcp.Client, [])]# We use the `simple_one_for_one` strategy.# With this strategy, we define just a "template" for a child,# no process is started during the Supervisor initialization, only# when we call `start_child/2`supervise(children, strategy: :simple_one_for_one)endend
Server Worker
Next, we define the server worker and pass the module name Web.Tcp.Handler
as the entry point for processing connected clients:
defmodule Web.Tcp.Server dorequire Loggerdef start_link doLogger.debug("[tcp] starting server on port :#{_port()}")opts = [port: _port()]{:ok, _} = :ranch.start_listener(:tcp, _acceptors_size(), :ranch_tcp, opts, Web.Tcp.Handler, [])enddef _port doString.to_integer Application.get_env(:web, :tcp_port)enddef _acceptors_size doString.to_integer Application.get_env(:web, :tcp_acceptors_size)endend
Client Worker
Now we define a module that works as a worker for connected TCP clients. On initialization, it subscribes to a PubSub channel using the API key (per user). This allows it to receive messages from PubSub and send them back to the TCP client socket:
defmodule Web.Tcp.Client dorequire Loggerrequire Poisonalias Web.Pubsubdef start_link(token) doGenServer.start_link(__MODULE__, token, name: String.to_atom(token))enddef init(token) doPubsub.subscribe("#{token}:vars:callback"){:ok, %{token: token, messages: []}}enddef handle_info(%{vars: vars}, %{token: token, messages: messages} = state) doLogger.debug("[tcp.client] received message: #{inspect(vars)}")message = _pack(token, "vars", vars)Logger.debug("[tcp.client] packed message: #{inspect(message)}")messages = messages ++ [message]Logger.debug("[tcp.client] begin send message: #{inspect(messages)}")state = token |> _get_socket |> _send_back(messages, state)Logger.debug("[tcp.client] done send message: #{inspect(messages)}")Logger.debug("[tcp.client] messages: #{inspect(messages)}"){:noreply, state}enddef terminate(reason, status) doLogger.debug("[tcp.client] reason: #{inspect(reason)}, status: #{inspect(status)}"):okenddefp _send_back({:ok, socket}, messages, state) do:ok = _send(socket, messages)%{state | messages: []}enddefp _send_back(:enqueue, messages, state) do%{state | messages: messages}enddefp _send(s, []), do: :okdefp _send({socket, transport} = s, [message | messages]) dotransport.send(socket, message)_send(s, messages)enddef _pack(token, "vars", vars) dovars = vars|> Poison.encode!|> Base.encode64"v:#{token}:#{vars}\n"enddefp _get_socket(token) doLogger.debug("[tcp.socket] search for socket, transport by token: #{inspect(token)}")response = case Registry.lookup(Registry.Sockets, token) do[{_, socket}] -> {:ok, socket}[] -> :enqueueendLogger.debug("[tcp.client] _get_socket: #{inspect(response)}")responseendend
TCP Handler
The TCP handler registers the socket in the registry when it receives a connection and starts a new client worker to subscribe to the PubSub channel by API key, establishing communication between PubSub and the TCP client:
defmodule Web.Tcp.Handler dorequire Loggeralias Web.Pubsub@moduledoc """`Handler` waits for lines separated by \n (newline). If the handler doesn'tsee a newline, it accumulates data until it receives one.`Registry.Sockets` contains api_key -> socket records for easy communicationfrom the dashboard page to TCP clients."""def start_link(ref, socket, transport, opts) dopid = spawn_link(__MODULE__, :init, [ref, socket, transport, opts]){:ok, pid}enddef init(ref, socket, transport, _opts = []) do:ok = :ranch.accept_ack(ref)case transport.peername(socket) do{:ok, _peer} -> loop(socket, transport, ""){:error, reason} -> Logger.error("[tcp.handler] init receive error reason: #{inspect(reason)}")endend@timeout 5_000def loop(socket, transport, acc) do# Don't flood messages of transport, receive once and leave the remaining# data in socket until we run recv again.transport.setopts(socket, [active: :once])# Before proceeding with the receive block on messages, we should call# transport.messages() once to ping ranch{ok, closed, error} = transport.messages()receive do{ok, socket, data} ->Logger.debug("[tcp.handler] received data: #{inspect(data)}")acc <> data|> String.split("\n")|> Enum.map(&(String.trim(&1)))|> _process(socket, transport)loop(socket, transport, ""){closed, socket} ->Logger.debug("[tcp.handler] closed socket: #{inspect(socket)}"){error, socket, reason} ->Logger.error("[tcp.handler] socket: #{inspect(socket)}, closed because of error reason: #{inspect(reason)}"){:error, error} ->Logger.error("[tcp.handler] error: #{inspect(error)}"){'EXIT', parent, reason} ->Logger.error("[tcp.handler] exit parent reason: #{inspect(reason)}")Process.exit(self(), :kill)message ->Logger.debug("[tcp.handler] message on receive block: #{inspect(message)}")endenddefp _kill(), do: Process.exit(self(), :kill)defp _process([], socket, transport), do: loop(socket, transport, "")defp _process([""], socket, transport), do: loop(socket, transport, "")defp _process([line, ""], socket, transport) do_protocol(line, socket, transport)loop(socket, transport, "")enddefp _process([line], socket, transport), do: loop(socket, transport, line)defp _process([line | lines], socket, transport) do_protocol(line, socket, transport)_process(lines, socket, transport)enddefp _protocol(line, socket, transport) doLogger.debug("[_protocol] line: #{line}")case line |> Web.Tcp.Protocol.process do{:verified, api_key} ->_register_socket(api_key, socket, transport)Web.Tcp.ClientSupervisor.start_client(api_key)Logger.debug("[tcp.server] transport should respond with OK")case transport.send(socket, "OK\n") do{:error, reason} ->Logger.error(inspect(reason))_ ->end{:error, reason} ->Logger.error("[tcp] #{inspect(reason)}"):error ->Logger.error("error on processing: #{inspect(line)}")_ ->endenddef _register_socket(api_key, socket, transport) doLogger.debug("[tcp.handler] _register_socket token: #{api_key}")case Registry.register(Registry.Sockets, api_key, {socket, transport}) do{:error, {:already_registered, _pid}} ->Registry.update_value(Registry.Sockets, api_key, fn (_) -> {socket, transport} end){:error, reason} ->Logger.error("[tcp] reason: #{inspect(reason)}")_ ->endendend
Protocol Handler
Web.Tcp.Protocol
defines the parser and handler for commands passed via the TCP client:
Example commands:
l:logs:<base64 json messages>
i:vars:<base64 json variables>
v:api_key - client should pass api key to verify that we have a registered client
defmodule Web.Tcp.Protocol dorequire Loggerrequire Poisonalias Web.Gatewayalias Web.Db.Users@moduledoc """Server messages:- `l:logs`logs - should come as json array and encoded base64- `i:vars`vars - should come as json dictionary and encoded by base64- `v:api_key`api_key - should verify key using our registryClient messages:- `i:s:name:value` - var set by name value inside of app"""def process("l:" <> <<api_key :: bytes-size(12)>> <> ":" <> logs) doLogger.debug("[protocol] api_key: #{api_key}, logs: #{inspect(logs)}")logs|> Base.decode64!|> Poison.decode!|> Gateway.logs(api_key)enddef process("i:" <> <<api_key :: bytes-size(12)>> <> ":" <> vars) doLogger.debug("[protocol] api_key: #{api_key}, vars: #{inspect(vars)}")vars|> Base.decode64!|> Poison.decode!|> Gateway.vars(api_key)enddef process("v:" <> <<api_key :: bytes-size(12)>>) doif Users.verify_key(api_key) do{:verified, api_key}else{:error, "not registered user with api_key: #{api_key}"}endenddef process(_), do: :errorend
Test Cases
Finally, here's a test case to review the basic expected behavior tcp_test.ex:
defmodule Web.TcpTest douse ExUnit.Casedoctest Web.Tcpalias Web.Db.Usersrequire RedisPoolexrequire Loggerrequire Poisonrequire Tirexs.Queryalias RedisPoolex, as: Redissetup doRedis.query(["FLUSHDB"]){:ok, api_key} = Users.register(%{email: "user1@example.com", password: "12345678"})port = Application.get_env(:web, :tcp_port)host = "127.0.0.1" |> String.to_char_list{:ok, socket} = :gen_tcp.connect(host, port, [active: false]){:ok, socket: socket, api_key: api_key}endtest "should register user session", %{socket: socket, api_key: api_key} do:ok = :gen_tcp.send(socket, "v:" <> api_key <> "\n"){:ok, reply} = :gen_tcp.recv(socket, 0, 1000)assert reply == 'OK'endtest "should bulk insert logs on tcp request", %{socket: socket, api_key: api_key} do:ok = :gen_tcp.send(socket, "v:" <> api_key <> "\n"):ok = :gen_tcp.send(socket, "l:" <> api_key <> ":" <> ([%{message: "testing1", timestamp: 123123123}, %{message: "testing2", timestamp: 123123123}] |> Poison.encode! |> Base.encode64) <> "\n"):timer.sleep(2000)assert {:ok, 200, %{hits: %{hits: hits}} = response} = Tirexs.Query.create_resource([index: "logs-#{api_key}", search: ""])assert Enum.count(hits) == 2end# TODO: add marker support to create separate sessions on multiple devices.# we could have separate dashboards for the different devices.test "should store vars on tcp request", %{socket: socket, api_key: api_key} do:ok = :gen_tcp.send(socket, "v:" <> api_key <> "\n"):ok = :gen_tcp.send(socket, "i:" <> api_key <> ":" <> ([%{name: "testing1", type: "string", value: "example"}, %{name: "testing2", type: "integer", value: "-1"}] |> Poison.encode! |> Base.encode64) <> "\n"):timer.sleep(2000)vars = Redis.query ["HKEYS", "#{api_key}:vs"]assert Enum.count(vars) == 4assert vars == ["testing1:type", "testing1:value", "testing2:type", "testing2:value"]endend
In future posts, I'll share the modules for PubSub and Gateway implementation, which should be straightforward to understand.
This project was created for fun, so if you plan to use similar approaches in production, please proceed with caution. I don't work with Elixir daily, so there might be areas for improvement in this implementation.
Thanks for reading!