8 Task and gen_tcp¶
{% include toc.html %}
In this chapter, we are going to learn how to use Erlang’s `:gen_tcp``
module <http://erlang.org/doc/man/gen_tcp.html>`__ to serve requests. In
future chapters we will expand our server so it can actually serve the
commands. This will also provide a great opportunity to explore Elixir’s
Task
module.
8.1 Echo server¶
We will start our TCP server by first implementing an echo server. It will simply send a response with the text it received in the request. We will slowly improve our server until it is supervised and ready to handle multiple connections.
A TCP server, in broad strokes, performs the following steps:
- Listens to a port until the port is available and it gets hold of the socket
- Waits for a client connection on that port and accepts it
- Reads the client request and writes a response back
Let’s implement those steps. Move to the apps/kv_server
application,
open up lib/kv_server.ex
, and add the following functions:
def accept(port) do
# The options below mean:
#
# 1. `:binary` - receives data as binaries (instead of lists)
# 2. `packet: :line` - receives data line by line
# 3. `active: false` - block on `:gen_tcp.recv/2` until data is available
#
{:ok, socket} = :gen_tcp.listen(port,
[:binary, packet: :line, active: false])
IO.puts "Accepting connections on port #{port}"
loop_acceptor(socket)
end
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
serve(client)
loop_acceptor(socket)
end
defp serve(client) do
client
|> read_line()
|> write_line(client)
serve(client)
end
defp read_line(socket) do
{:ok, data} = :gen_tcp.recv(socket, 0)
data
end
defp write_line(line, socket) do
:gen_tcp.send(socket, line)
end
We are going to start our server by calling KVServer.accept(4040)
,
where 4040 is the port. The first step in accept/1
is to listen to
the port until the socket becomes available and then call
loop_acceptor/1
. loop_acceptor/1
is just a loop accepting client
connections. For each accepted connection, we call serve/1
.
serve/1
is another loop that reads a line from the socket and writes
those lines back to the socket. Note that the serve/1
function uses
the pipeline operator ``|>` </docs/stable/elixir/Kernel.html#|>/2>`__
to express this flow of operations. The pipeline operator evaluates the
left side and passes its result as first argument to the function on the
right side. The example above:
socket |> read_line() |> write_line(socket)
is equivalent to:
write_line(read_line(socket), socket)
When using the ``|>`` operator, it is important to add parentheses
to the function calls due to how operator precedence works. In
particular, this code:
::
1..10 |> Enum.filter &(&1 <= 5) |> Enum.map &(&1 * 2)
Actually translates to:
::
1..10 |> Enum.filter(&(&1 <= 5) |> Enum.map(&(&1 * 2)))
Which is not what we want, since the function given to
``Enum.filter/2`` is the one passed as first argument to
``Enum.map/2``. The solution is to use explicit parentheses:
::
1..10 |> Enum.filter(&(&1 <= 5)) |> Enum.map(&(&1 * 2))
The read_line/1
implementation receives data from the socket using
:gen_tcp.recv/2
and write_line/2
writes to the socket using
:gen_tcp.send/2
.
This is pretty much all we need to implement our echo server. Let’s give it a try!
Start an iex session inside the kv_server
application with
iex -S mix
. Inside IEx, run:
iex> KVServer.accept(4040)
The server is now running, and you will even notice the console is blocked. Let’s use a ``telnet` client <http://en.wikipedia.org/wiki/Telnet>`__ to access our server. There are clients available on most operating systems, and their command lines are generally similar:
$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
is it me
is it me
you are looking for?
you are looking for?
Type “hello”, press enter, and you will get “hello” back. Excellent!
My particular telnet client can be exited by typing ctrl + ]
, typing
quit
, and pressing <Enter>
, but your client may require
different steps.
Once you exit the telnet client, you will likely see an error in the IEx session:
** (MatchError) no match of right hand side value: {:error, :closed}
(kv_server) lib/kv_server.ex:41: KVServer.read_line/1
(kv_server) lib/kv_server.ex:33: KVServer.serve/1
(kv_server) lib/kv_server.ex:27: KVServer.loop_acceptor/1
That’s because we were expecting data from :gen_tcp.recv/2
but the
client closed the connection. We need to handle such cases better in
future revisions of our server.
For now there is a more important bug we need to fix: what happens if our TCP acceptor crashes? Since there is no supervision, the server dies and we won’t be able to serve more requests, because it won’t be restarted. That’s why we must move our server inside a supervision tree.
8.2 Tasks¶
We have learned about agents, generic servers, and event managers. They are all meant to work with multiple messages or manage state. But what do we use when we only need to execute some task and that is it?
The Task module provides this
functionality exactly. For example, it has start_link/3
function
that receives a module, function and arguments, allowing us to run a
given function as part of a supervision tree.
Let’s give it a try. Open up lib/kv_server.ex
, and let’s change the
supervisor in the start/2
function to the following:
def start(_type, _args) do
import Supervisor.Spec
children = [
worker(Task, [KVServer, :accept, [4040]])
]
opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end
With this change, we are saying that we want to run
KVServer.accept(4040)
as a worker. We are hardcoding the port for
now, but we will discuss ways in which this could be changed later.
Now that the server is part of the supervision tree, it should start
automatically when we run the application. Type mix run --no-halt
in
the terminal, and once again use the telnet
client to make sure that
everything still works:
$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
say you
say you
say me
say me
Yes, it works! If you kill the client, causing the whole server to crash, you will see another one starts right away. However, does it scale?
Try to connect two telnet clients at the same time. When you do so, you will notice that the second client doesn’t echo:
$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello?
HELLOOOOOO?
It doesn’t seem to work at all. That’s because we are serving requests in the same process that are accepting connections. When one client is connected, we can’t accept another client.
8.3 Task supervisor¶
In order to make our server handle simultaneous connections, we need to have one process working as an acceptor that spawns other processes to serve requests. One solution would be to change:
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
serve(client)
loop_acceptor(socket)
end
to use Task.start_link/1
, which is similar to Task.start_link/3
,
but it receives an anonymous function instead of module, function and
arguments:
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
Task.start_link(fn -> serve(client) end)
loop_acceptor(socket)
end
But we’ve already made this mistake once. Do you remember?
This is similar to the mistake we made when we called
KV.Bucket.start_link/0
from the registry. That meant a failure in
any bucket would bring the whole registry down.
The code above would have the same flaw: if we link the
serve(client)
task to the acceptor, a crash when serving a request
would bring the acceptor, and consequently all other connections, down.
We fixed the issue for the registry by using a simple one for one supervisor. We are going to use the same tactic here, except that this pattern is so common with tasks that tasks already come with a solution: a simple one for one supervisor with temporary workers that we can just use in our supervision tree!
Let’s change start/2
once again, to add a supervisor to our tree:
def start(_type, _args) do
import Supervisor.Spec
children = [
supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]),
worker(Task, [KVServer, :accept, [4040]])
]
opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end
We simply start a
`Task.Supervisor
</docs/stable/elixir/Task.Supervisor.html>`__
process with name KVServer.TaskSupervisor
. Remember, since the
acceptor task depends on this supervisor, the supervisor must be started
first.
Now we just need to change loop_acceptor/2
to use
Task.Supervisor
to serve each request:
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
loop_acceptor(socket)
end
Start a new server with mix run --no-halt
and we can now open up
many concurrent telnet clients. You will also notice that quitting a
client does not bring the acceptor down. Excellent!
Here is the full echo server implementation, in a single module:
defmodule KVServer do
use Application
@doc false
def start(_type, _args) do
import Supervisor.Spec
children = [
supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]),
worker(Task, [KVServer, :accept, [4040]])
]
opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end
@doc """
Starts accepting connections on the given `port`.
"""
def accept(port) do
{:ok, socket} = :gen_tcp.listen(port,
[:binary, packet: :line, active: false])
IO.puts "Accepting connections on port #{port}"
loop_acceptor(socket)
end
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
loop_acceptor(socket)
end
defp serve(socket) do
socket
|> read_line()
|> write_line(socket)
serve(socket)
end
defp read_line(socket) do
{:ok, data} = :gen_tcp.recv(socket, 0)
data
end
defp write_line(line, socket) do
:gen_tcp.send(socket, line)
end
end
Since we have changed the supervisor specification, we need to ask: is our supervision strategy is still correct?
In this case, the answer is yes: if the acceptor crashes, there is no need to crash the existing connections. On the other hand, if the task supervisor crashes, there is no need to crash the acceptor too. This is a contrast to the registry, where we initially had to crash the supervisor every time the registry crashed, until we used ETS to persist state. However, tasks have no state and nothing will go stale if one of these processes dies.
In the next chapter we will start parsing the client requests and sending responses, finishing our server.