Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add some basic telemetry event metrics and support ping payloads #53

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions lib/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ defmodule Kadabra.Connection do
remote_settings: nil,
requested_streams: 0,
local_settings: nil,
queue: nil
queue: nil,
start_time: nil

use GenServer
require Logger
Expand All @@ -29,7 +30,8 @@ defmodule Kadabra.Connection do
config: term,
flow_control: term,
local_settings: Connection.Settings.t(),
queue: pid
queue: pid,
start_time: integer()
}

@type sock :: {:sslsocket, any, pid | {any, any}}
Expand Down Expand Up @@ -64,18 +66,20 @@ defmodule Kadabra.Connection do
config: config,
queue: queue,
local_settings: settings,
flow_control: %FlowControl{}
flow_control: %FlowControl{},
start_time: System.monotonic_time()
}
end

def close(pid) do
GenServer.call(pid, :close)
end

def ping(pid) do
GenServer.cast(pid, {:send, :ping})
def ping(pid, data) do
GenServer.cast(pid, {:send, {:ping, data}})
end


# handle_cast

def handle_cast({:send, type}, state) do
Expand Down Expand Up @@ -106,9 +110,9 @@ defmodule Kadabra.Connection do

# sendf

@spec sendf(:goaway | :ping, t) :: {:noreply, t}
def sendf(:ping, %Connection{config: config} = state) do
Egress.send_ping(config.socket)
@spec sendf(:goaway | {:ping, <<_::64>> | none}, t) :: {:noreply, t}
def sendf({:ping, data}, %Connection{config: config} = state) do
Egress.send_ping(config.socket, data)
{:noreply, state}
end

Expand Down Expand Up @@ -169,7 +173,15 @@ defmodule Kadabra.Connection do
end
end

def terminate(_reason, %{config: config}) do
def terminate(reason, %{config: config, start_time: start_time}) do
duration = System.monotonic_time() - start_time
:telemetry.execute([:kadabra, :connection, :stop],
%{duration: duration},
%{
uri: config.uri,
reason: reason,
connection: self()
})
Kernel.send(config.client, {:closed, config.queue})
:ok
end
Expand Down
4 changes: 2 additions & 2 deletions lib/connection/egress.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ defmodule Kadabra.Connection.Egress do
Socket.send(socket, bin)
end

def send_ping(socket) do
bin = Ping.new() |> Encodable.to_bin()
def send_ping(socket, data) do
bin = Ping.new(data) |> Encodable.to_bin()
Socket.send(socket, bin)
end

Expand Down
4 changes: 2 additions & 2 deletions lib/connection/processor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ defmodule Kadabra.Connection.Processor do
{:ok, state}
end

def process(%Ping{ack: true}, %{config: config} = state) do
Kernel.send(config.client, {:pong, self()})
def process(%Ping{ack: true, data: data}, %{config: config} = state) do
Kernel.send(config.client, {:pong, self(), data})
{:ok, state}
end

Expand Down
6 changes: 3 additions & 3 deletions lib/connection_pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule Kadabra.ConnectionPool do
GenServer.call(pid, {:request, [request]})
end

def ping(pid), do: GenServer.call(pid, :ping)
def ping(pid, data), do: GenServer.call(pid, {:ping, data})

def close(pid), do: GenServer.call(pid, :close)

Expand Down Expand Up @@ -74,8 +74,8 @@ defmodule Kadabra.ConnectionPool do
{:stop, :shutdown, :ok, state}
end

def handle_call(:ping, _from, state) do
Connection.ping(state.connection)
def handle_call({:ping, data}, _from, state) do
Connection.ping(state.connection, data)
{:reply, :ok, state}
end

Expand Down
35 changes: 19 additions & 16 deletions lib/frame/ping.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,39 @@ defmodule Kadabra.Frame.Ping do
stream_id: integer
}

@empty_payload <<0, 0, 0, 0, 0, 0, 0, 0>>

@doc ~S"""
Returns new unacked ping frame.
Returns new unacked ping frame. Optionally takes a payload of 8 bytes, which
can be used to help calculate RTT times of pings that your application sends.

Can also be used to initialize a new `Frame.Ping` given a `Frame`.

## Examples

iex> Kadabra.Frame.Ping.new
%Kadabra.Frame.Ping{data: <<0, 0, 0, 0, 0, 0, 0, 0>>,
ack: false, stream_id: 0}
iex> Kadabra.Frame.Ping.new(payload: <<1, 2, 3, 4, 5, 6, 7, 8>>)
%Kadabra.Frame.Ping{data: <<1, 2, 3, 4, 5, 6, 7, 8>>,
ack: false, stream_id: 0}
iex> frame = %Kadabra.Frame{payload: <<0, 0, 0, 0, 0, 0, 0, 0>>,
...> flags: 0x1, type: 0x6, stream_id: 0}
iex> Kadabra.Frame.Ping.new(frame)
%Kadabra.Frame.Ping{data: <<0, 0, 0, 0, 0, 0, 0, 0>>, ack: true,
stream_id: 0}
"""
@spec new() :: t
def new do
@spec new(<<_::64>> | none() | Frame.t()) :: t
def new(nil), do: new(@empty_payload)

def new(<<data::64>>) do
%__MODULE__{
ack: false,
data: <<0, 0, 0, 0, 0, 0, 0, 0>>,
data: <<data::64>>,
stream_id: 0
}
end

@doc ~S"""
Initializes a new `Frame.Ping` given a `Frame`.

## Examples

iex> frame = %Kadabra.Frame{payload: <<0, 0, 0, 0, 0, 0, 0, 0>>,
...> flags: 0x1, type: 0x6, stream_id: 0}
iex> Kadabra.Frame.Ping.new(frame)
%Kadabra.Frame.Ping{data: <<0, 0, 0, 0, 0, 0, 0, 0>>, ack: true,
stream_id: 0}
"""
@spec new(Frame.t()) :: t
def new(%Frame{type: 0x6, payload: <<data::64>>, flags: flags, stream_id: sid}) do
%__MODULE__{
ack: ack?(flags),
Expand Down
13 changes: 9 additions & 4 deletions lib/kadabra.ex
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,18 @@ defmodule Kadabra do
iex> {:ok, pid} = Kadabra.open('https://http2.golang.org')
iex> Kadabra.ping(pid)
iex> receive do
...> {:pong, _pid} -> "got pong!"
...> {:pong, _pid, _resp} -> "got pong!"
...> end
"got pong!"
iex> Kadabra.ping(pid, <<1::64>>) # Send 8-byte data
iex> receive do
...> {:pong, _pid, <<1::64>>} -> "got our data back!"
...> end
"got our data back!"
"""
@spec ping(pid) :: no_return
def ping(pid) do
Kadabra.ConnectionPool.ping(pid)
@spec ping(pid, <<_::64>> | none) :: no_return
def ping(pid, data \\ nil) when is_nil(data) or byte_size(data) == 8 do
Kadabra.ConnectionPool.ping(pid, data)
end

@doc ~S"""
Expand Down
10 changes: 7 additions & 3 deletions lib/socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ defmodule Kadabra.Socket do
def parse_bin(socket, bin, state) do
case FrameParser.parse(bin) do
{:ok, frame, rest} ->
:telemetry.execute([:kadabra, :socket, :recv_frame], %{}, %{frame: frame, socket: socket})
Kernel.send(state.active_user, {:recv, frame})
parse_bin(socket, rest, state)

Expand All @@ -131,11 +132,12 @@ defmodule Kadabra.Socket do
# Internal socket helpers

defp socket_send({:sslsocket, _, _} = pid, bin) do
# IO.puts("Sending #{byte_size(bin)} bytes")
:telemetry.execute([:kadabra, :socket, :send], %{}, %{type: :ssl, bin: bin, socket: pid})
:ssl.send(pid, bin)
end

defp socket_send(pid, bin) do
:telemetry.execute([:kadabra, :socket, :send], %{}, %{type: :tcp, bin: bin, socket: pid})
:gen_tcp.send(pid, bin)
end

Expand Down Expand Up @@ -179,7 +181,8 @@ defmodule Kadabra.Socket do
do_recv_bin(bin, state)
end

def handle_info({:tcp_closed, _socket}, state) do
def handle_info({:tcp_closed, socket}, state) do
:telemetry.execute([:kadabra, :socket, :closed], %{}, %{type: :tcp, socket: socket})
Kernel.send(state.active_user, {:closed, self()})
{:noreply, %{state | socket: nil}}
end
Expand All @@ -188,7 +191,8 @@ defmodule Kadabra.Socket do
do_recv_bin(bin, state)
end

def handle_info({:ssl_closed, _socket}, state) do
def handle_info({:ssl_closed, socket}, state) do
:telemetry.execute([:kadabra, :socket, :closed], %{}, %{type: :ssl, socket: socket})
Kernel.send(state.active_user, {:closed, self()})
{:noreply, %{state | socket: nil}}
end
Expand Down
24 changes: 20 additions & 4 deletions lib/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ defmodule Kadabra.Stream do
flow: nil,
uri: nil,
headers: [],
on_response: nil
on_response: nil,
start_time: nil

require Logger

Expand All @@ -35,7 +36,8 @@ defmodule Kadabra.Stream do
uri: URI.t(),
flow: Kadabra.Stream.FlowControl.t(),
headers: [...],
body: binary
body: binary,
start_time: integer()
}

@closed :closed
Expand All @@ -60,11 +62,13 @@ defmodule Kadabra.Stream do
encoder: config.encoder,
decoder: config.decoder,
connection: self(),
flow: Stream.FlowControl.new(flow_opts)
flow: Stream.FlowControl.new(flow_opts),
start_time: System.monotonic_time()
}
end

def start_link(%Stream{} = stream) do
:telemetry.execute([:kadabra, :stream, :start], %{}, %{stream_id: stream.id, uri: stream.uri, client: stream.client, connection: stream.connection})
:gen_statem.start_link(__MODULE__, stream, [])
end

Expand Down Expand Up @@ -309,7 +313,19 @@ defmodule Kadabra.Stream do

def callback_mode, do: [:handle_event_function, :state_enter]

def terminate(_reason, _state, _stream), do: :void
def terminate(reason, _state, stream) do
duration = System.monotonic_time() - stream.start_time
:telemetry.execute([:kadabra, :stream, :stop],
%{duration: duration},
%{
stream_id: stream.id,
uri: stream.uri,
client: stream.client,
connection: stream.connection,
reason: reason
})
:void
end

def code_change(_vsn, state, data, _extra), do: {:ok, state, data}
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ defmodule Kadabra.Mixfile do
defp deps do
[
{:certifi, "~> 2.5"},
{:telemetry, "~> 1.2"},
{:credo, "~> 1.0", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.0", only: [:dev], runtime: false},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm", "fec8660eb7733ee4117b85f55799fd3833eb769a6df71ccf8903e8dc5447cfce"},
"scribe": {:hex, :scribe, "0.4.1", "4bf5395fb882995f705172b817b9a4ccae6ac83f9918fdedb6a2b9dafb57ec5f", [:mix], [{:pane, "~> 0.1", [hex: :pane, repo: "hexpm", optional: false]}], "hexpm"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
}