Elixir TCP server and client example

#elixir#tcp#server#client

This post is about an unfinished project called observerio that I haven't had enough free time to complete recently.

I've been working on this project to enable real-time testing of different game strategies. The result is a project with an HTTP API, TCP server, Ember dashboard, and a core library based on Go (using xgo compilation for cross-platform support).

I'd like to share some examples that might be useful for others.

Why Elixir?

If someone asked me why I chose Elixir for the backend, I'd find it hard to explain precisely. I was likely inspired by my previous experience at my startup musicfeed. Elixir has an elegant syntax, a supportive community, and excellent documentation. However, concepts like gen_server and other tools can be complex to understand if you're learning Elixir as your first programming language. While some might tell you that you don't need to learn Erlang to use Elixir, that's only partially true, though the number of Elixir libraries is growing rapidly.

Project Architecture

I decided to use a TCP server and client to send variables and logs from mobile games via an internal SDK. When the TCP server receives variables or logs, it publishes messages through PubSub (gateway) to subscribed WebSocket handlers. If a WebSocket connection is active, it automatically publishes the changes to the Ember dashboard. This might sound more like a fun experiment than a production-ready solution, but I built it for enjoyment anyway!

Let's review the TCP server implementation (please feel free to suggest improvements, as this is open source):

tcp.ex

Supervisors for TCP Server and Client

First, we define supervisors for the TCP server and client. The client supervisor is used to create communication between the WebSocket (via PubSub) and the TCP client:

defmodule Web.Tcp.ServerSupervisor do
  use Supervisor

  def start_link do
    Supervisor.start_link(__MODULE__, [], name: :tcp_server_supervisor)
  end

  def init(_) do
    children = [
      worker(Web.Tcp.Server, [])
    ]

    supervise(children, strategy: :one_for_one)
  end
end

defmodule Web.Tcp.ClientSupervisor do
  use Supervisor

  def start_link do
    Supervisor.start_link(__MODULE__, [], name: :tcp_client_supervisor)
  end

  def start_client(token) do
    Supervisor.start_child(:tcp_client_supervisor, [token])
  end

  def init(_) do
    children = [
      worker(Web.Tcp.Client, [])
    ]

    # We use the `simple_one_for_one` strategy.
    # With this strategy, we define just a "template" for a child,
    # no process is started during the Supervisor initialization, only
    # when we call `start_child/2`
    supervise(children, strategy: :simple_one_for_one)
  end
end

Server Worker

Next, we define the server worker and pass the module name Web.Tcp.Handler as the entry point for processing connected clients:

defmodule Web.Tcp.Server do
  require Logger

  def start_link do
    Logger.debug("[tcp] starting server on port :#{_port()}")
    opts = [port: _port()]
    {:ok, _} = :ranch.start_listener(
      :tcp, _acceptors_size(), :ranch_tcp, opts, Web.Tcp.Handler, [])
  end

  def _port do
    String.to_integer Application.get_env(:web, :tcp_port)
  end

  def _acceptors_size do
    String.to_integer Application.get_env(:web, :tcp_acceptors_size)
  end
end

Client Worker

Now we define a module that works as a worker for connected TCP clients. On initialization, it subscribes to a PubSub channel using the API key (per user). This allows it to receive messages from PubSub and send them back to the TCP client socket:

defmodule Web.Tcp.Client do
  require Logger
  require Poison

  alias Web.Pubsub

  def start_link(token) do
    GenServer.start_link(__MODULE__, token, name: String.to_atom(token))
  end

  def init(token) do
    Pubsub.subscribe("#{token}:vars:callback")
    {:ok, %{token: token, messages: []}}
  end

  def handle_info(%{vars: vars}, %{token: token, messages: messages} = state) do
    Logger.debug("[tcp.client] received message: #{inspect(vars)}")
    message = _pack(token, "vars", vars)
    Logger.debug("[tcp.client] packed message: #{inspect(message)}")

    messages = messages ++ [message]

    Logger.debug("[tcp.client] begin send message: #{inspect(messages)}")
    state = token |> _get_socket |> _send_back(messages, state)
    Logger.debug("[tcp.client] done send message: #{inspect(messages)}")

    Logger.debug("[tcp.client] messages: #{inspect(messages)}")

    {:noreply, state}
  end

  def terminate(reason, status) do
    Logger.debug("[tcp.client] reason: #{inspect(reason)}, status: #{inspect(status)}")
    :ok
  end

  defp _send_back({:ok, socket}, messages, state) do
    :ok = _send(socket, messages)
    %{state | messages: []}
  end
  defp _send_back(:enqueue, messages, state) do
    %{state | messages: messages}
  end

  defp _send(s, []), do: :ok
  defp _send({socket, transport} = s, [message | messages]) do
    transport.send(socket, message)
    _send(s, messages)
  end

  def _pack(token, "vars", vars) do
    vars = vars
    |> Poison.encode!
    |> Base.encode64

    "v:#{token}:#{vars}\n"
  end

  defp _get_socket(token) do
    Logger.debug("[tcp.socket] search for socket, transport by token: #{inspect(token)}")
    response = case Registry.lookup(Registry.Sockets, token) do
      [{_, socket}] -> {:ok, socket}
      [] -> :enqueue
    end
    Logger.debug("[tcp.client] _get_socket: #{inspect(response)}")
    response
  end
end

TCP Handler

The TCP handler registers the socket in the registry when it receives a connection and starts a new client worker to subscribe to the PubSub channel by API key, establishing communication between PubSub and the TCP client:

defmodule Web.Tcp.Handler do
  require Logger

  alias Web.Pubsub

  @moduledoc """
  `Handler` waits for lines separated by \n (newline). If the handler doesn't
  see a newline, it accumulates data until it receives one.

  `Registry.Sockets` contains api_key -> socket records for easy communication
  from the dashboard page to TCP clients.
  """

  def start_link(ref, socket, transport, opts) do
    pid = spawn_link(__MODULE__, :init, [ref, socket, transport, opts])
    {:ok, pid}
  end

  def init(ref, socket, transport, _opts = []) do
    :ok = :ranch.accept_ack(ref)

    case transport.peername(socket) do
      {:ok, _peer} -> loop(socket, transport, "")
      {:error, reason} -> Logger.error("[tcp.handler] init receive error reason: #{inspect(reason)}")
    end
  end

  @timeout 5_000

  def loop(socket, transport, acc) do
    # Don't flood messages of transport, receive once and leave the remaining
    # data in socket until we run recv again.
    transport.setopts(socket, [active: :once])

    # Before proceeding with the receive block on messages, we should call
    # transport.messages() once to ping ranch
    {ok, closed, error} = transport.messages()

    receive do
      {ok, socket, data} ->
        Logger.debug("[tcp.handler] received data: #{inspect(data)}")

        acc <> data
        |> String.split("\n")
        |> Enum.map(&(String.trim(&1)))
        |> _process(socket, transport)

        loop(socket, transport, "")
      {closed, socket} ->
        Logger.debug("[tcp.handler] closed socket: #{inspect(socket)}")
      {error, socket, reason} ->
        Logger.error("[tcp.handler] socket: #{inspect(socket)}, closed because of error reason: #{inspect(reason)}")
      {:error, error} ->
        Logger.error("[tcp.handler] error: #{inspect(error)}")
      {'EXIT', parent, reason} ->
        Logger.error("[tcp.handler] exit parent reason: #{inspect(reason)}")
        Process.exit(self(), :kill)
      message ->
        Logger.debug("[tcp.handler] message on receive block: #{inspect(message)}")
    end
  end

  defp _kill(), do: Process.exit(self(), :kill)

  defp _process([], socket, transport), do: loop(socket, transport, "")
  defp _process([""], socket, transport), do: loop(socket, transport, "")
  defp _process([line, ""], socket, transport) do
    _protocol(line, socket, transport)
    loop(socket, transport, "")
  end
  defp _process([line], socket, transport), do: loop(socket, transport, line)
  defp _process([line | lines], socket, transport) do
    _protocol(line, socket, transport)
    _process(lines, socket, transport)
  end

  defp _protocol(line, socket, transport) do
    Logger.debug("[_protocol] line: #{line}")

    case line |> Web.Tcp.Protocol.process do
      {:verified, api_key} ->
        _register_socket(api_key, socket, transport)

        Web.Tcp.ClientSupervisor.start_client(api_key)

        Logger.debug("[tcp.server] transport should respond with OK")

        case transport.send(socket, "OK\n") do
          {:error, reason} ->
            Logger.error(inspect(reason))
          _ ->
        end
      {:error, reason} ->
        Logger.error("[tcp] #{inspect(reason)}")
      :error ->
        Logger.error("error on processing: #{inspect(line)}")
      _ ->
    end
  end

  def _register_socket(api_key, socket, transport) do
    Logger.debug("[tcp.handler] _register_socket token: #{api_key}")

    case Registry.register(Registry.Sockets, api_key, {socket, transport}) do
      {:error, {:already_registered, _pid}} ->
        Registry.update_value(Registry.Sockets, api_key, fn (_) -> {socket, transport} end)
      {:error, reason} ->
        Logger.error("[tcp] reason: #{inspect(reason)}")
      _ ->
    end
  end
end

Protocol Handler

Web.Tcp.Protocol defines the parser and handler for commands passed via the TCP client:

Example commands:

l:logs:<base64 json messages>
i:vars:<base64 json variables>
v:api_key - client should pass api key to verify that we have a registered client
defmodule Web.Tcp.Protocol do
  require Logger
  require Poison

  alias Web.Gateway
  alias Web.Db.Users

  @moduledoc """
    Server messages:

      - `l:logs`
        logs - should come as json array and encoded base64

      - `i:vars`
        vars - should come as json dictionary and encoded by base64

      - `v:api_key`
        api_key - should verify key using our registry

    Client messages:
      - `i:s:name:value` - var set by name value inside of app
  """
  def process("l:" <> <<api_key :: bytes-size(12)>> <> ":" <> logs) do
    Logger.debug("[protocol] api_key: #{api_key}, logs: #{inspect(logs)}")
    logs
    |> Base.decode64!
    |> Poison.decode!
    |> Gateway.logs(api_key)
  end

  def process("i:" <> <<api_key :: bytes-size(12)>> <> ":" <> vars) do
    Logger.debug("[protocol] api_key: #{api_key}, vars: #{inspect(vars)}")
    vars
    |> Base.decode64!
    |> Poison.decode!
    |> Gateway.vars(api_key)
  end

  def process("v:" <> <<api_key :: bytes-size(12)>>) do
    if Users.verify_key(api_key) do
      {:verified, api_key}
    else
      {:error, "not registered user with api_key: #{api_key}"}
    end
  end

  def process(_), do: :error
end

Test Cases

Finally, here's a test case to review the basic expected behavior tcp_test.ex:

defmodule Web.TcpTest do
  use ExUnit.Case
  doctest Web.Tcp

  alias Web.Db.Users

  require RedisPoolex
  require Logger
  require Poison
  require Tirexs.Query

  alias RedisPoolex, as: Redis

  setup do
    Redis.query(["FLUSHDB"])

    {:ok, api_key} = Users.register(%{email: "user1@example.com", password: "12345678"})

    port = Application.get_env(:web, :tcp_port)
    host = "127.0.0.1" |> String.to_char_list

    {:ok, socket} = :gen_tcp.connect(host, port, [active: false])
    {:ok, socket: socket, api_key: api_key}
  end

  test "should register user session", %{socket: socket, api_key: api_key} do
    :ok = :gen_tcp.send(socket, "v:" <> api_key <> "\n")
    {:ok, reply} = :gen_tcp.recv(socket, 0, 1000)
    assert reply == 'OK'
  end

  test "should bulk insert logs on tcp request", %{socket: socket, api_key: api_key} do
    :ok = :gen_tcp.send(socket, "v:" <> api_key <> "\n")
    :ok = :gen_tcp.send(socket, "l:" <> api_key <> ":" <> ([%{message: "testing1", timestamp: 123123123}, %{message: "testing2", timestamp: 123123123}] |> Poison.encode! |> Base.encode64) <> "\n")

    :timer.sleep(2000)

    assert {:ok, 200, %{hits: %{hits: hits}} = response} = Tirexs.Query.create_resource([index: "logs-#{api_key}", search: ""])
    assert Enum.count(hits) == 2
  end

  # TODO: add marker support to create separate sessions on multiple devices.
  # we could have separate dashboards for the different devices.
  test "should store vars on tcp request", %{socket: socket, api_key: api_key} do
    :ok = :gen_tcp.send(socket, "v:" <> api_key <> "\n")
    :ok = :gen_tcp.send(socket, "i:" <> api_key <> ":" <> ([%{name: "testing1", type: "string", value: "example"}, %{name: "testing2", type: "integer", value: "-1"}] |> Poison.encode! |> Base.encode64) <> "\n")

    :timer.sleep(2000)

    vars = Redis.query ["HKEYS", "#{api_key}:vs"]
    assert Enum.count(vars) == 4
    assert vars == ["testing1:type", "testing1:value", "testing2:type", "testing2:value"]
  end
end

In future posts, I'll share the modules for PubSub and Gateway implementation, which should be straightforward to understand.

This project was created for fun, so if you plan to use similar approaches in production, please proceed with caution. I don't work with Elixir daily, so there might be areas for improvement in this implementation.

Thanks for reading!

← Back to all posts

© Copyright 2023 Bitscorp