Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenTracing #249

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ config :annon_api, :configuration_cache,
adapter: {:system, :module, "CONFIGURATION_CACHE_ADAPTER", Annon.Configuration.CacheAdapters.ETS},
cache_space: :configuration

# TODO: Replace with statix
config :ex_statsd,
host: "localhost",
port: 8125,
namespace: "annon"

# Configure Elixir logger
config :logger,
level: :debug
Expand Down Expand Up @@ -63,5 +57,13 @@ config :skycluster,
config :annon_api,
sql_sandbox: {:system, :boolean, "SQL_SANDBOX", false}

config :annon_api, :metrics_collector,
enabled?: {:system, :boolean, "METRICS_COLLECTOR_ENABLED", true},
send_tags: {:system, :boolean, "METRICS_COLLECTOR_SEND_TAGS", true},
host: {:system, :string, "METRICS_COLLECTOR_HOST", "localhost"},
port: {:system, :number, "METRICS_COLLECTOR_PORT", 32768},
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Large numbers should be written with underscores: 32_768

namespace: {:system, :string, "METRICS_COLLECTOR_HOST", "annon"},
sample_rate: {:system, :float, "METRICS_COLLECTOR_SAMPLE_RATE", 0.25}

import_config "plugins.exs"
import_config "#{Mix.env}.exs"
5 changes: 0 additions & 5 deletions config/prod.exs
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
use Mix.Config

config :ex_statsd,
host: "${STATSD_HOST}",
port: 8125,
namespace: "gateway"

config :skycluster,
strategy: {:system, :module, "SKYCLUSTER_STRATEGY", Cluster.Strategy.Kubernetes},
kubernetes_selector: {:system, "SKYCLUSTER_KUBERNETES_SELECTOR", "app=annon,component=api"},
Expand Down
4 changes: 2 additions & 2 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ config :annon_api, Annon.Requests.Repo,
database: System.get_env("MIX_LOGGER_TEST_DATABASE") || "annon_api_logger_test",
pool: Ecto.Adapters.SQL.Sandbox

config :ex_statsd,
config :annon_api, :metrics_collector,
sink: [],
namespace: "test",
test_mode: true
sample_rate: 1

config :annon_api, :acceptance,
management: [
Expand Down
5 changes: 5 additions & 0 deletions lib/annon_api/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule Annon do
children = [
supervisor(Annon.Configuration.Repo, []),
supervisor(Annon.Requests.Repo, []),
worker(DogStat, [metrics_collector_opts()]),
worker(Annon.Configuration.Matcher, [matcher_opts()]),
worker(Annon.AutoClustering, []),
management_endpoint_spec(),
Expand All @@ -34,6 +35,10 @@ defmodule Annon do
Application.get_env(:annon_api, :configuration_cache)
end

defp metrics_collector_opts do
Confex.get_map(:annon_api, :metrics_collector)
end

# Loads configuration in `:on_init` callbacks and replaces `{:system, ..}` tuples via Confex
@doc false
def load_from_system_env(config) do
Expand Down
3 changes: 2 additions & 1 deletion lib/annon_api/management_api/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Annon.ManagementAPI.Router do
use Plug.ErrorHandler
alias Annon.Helpers.Response
alias Annon.ManagementAPI.Render
alias Annon.Monitoring.ClusterStatus

if Confex.get(:annon_api, :sql_sandbox) do
plug Phoenix.Ecto.SQL.Sandbox
Expand Down Expand Up @@ -36,7 +37,7 @@ defmodule Annon.ManagementAPI.Router do
end

get "/cluster_status" do
status = Annon.Monitoring.get_status()
status = ClusterStatus.get_cluster_status()
Render.render_one({:ok, status}, conn)
end

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
defmodule Annon.Monitoring do
defmodule Annon.Monitoring.ClusterStatus do
@moduledoc """
Monitoring service for Annons clusters.
This module provides functions to collects status from all nodes in Annon cluster.
"""

def get_status do
def get_cluster_status do
cluster_nodes = :erlang.nodes()
cluster_strategy = get_cluster_strategy()
nodes_status =
Expand Down
7 changes: 7 additions & 0 deletions lib/annon_api/monitoring/latencies.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
defmodule Annon.Monitoring.Latencies do
@moduledoc false

defstruct client_request: nil,
upstream: nil,
gateway: nil
end
24 changes: 24 additions & 0 deletions lib/annon_api/monitoring/metrics_collector.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Annon.Monitoring.MetricsCollector do
@moduledoc """
This module provides helper functions to persist meaningful metrics to StatsD or DogstatsD servers.

Code is based on [Statix](https://github.com/lexmag/statix) library.
"""
import DogStat
alias Annon.Monitoring.Latencies

def track_request(_request_id, nil, opts),
do: increment("request_count", 1, opts)
def track_request(_request_id, content_length, opts) do
increment("request_count", 1, opts)
histogram("request_size", content_length, opts)
end

def track_response(_request_id, latencies, opts) do
%Latencies{client_request: client, upstream: upstream, gateway: gateway} = latencies

histogram("latencies_client", client, opts)
histogram("latencies_upstream", upstream, opts)
histogram("latencies_gateway", gateway, opts)
end
end
63 changes: 63 additions & 0 deletions lib/annon_api/monitoring/trace.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
defmodule Annon.Monitoring.Trace do
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modules should have a @moduledoc tag.

alias Annon.Monitoring.Trace
alias Annon.Monitoring.Trace.BinaryAnnotation
alias Annon.Monitoring.Trace.Endpoint
alias Plug.Conn

defstruct traceId: nil, # Randomly generated, unique for a trace, set on all spans within it. 16-32 chars
name: nil, # Span name in lowercase (e.g. rpc method)
parentId: nil, # Parent span id. 8-byte identifier encoded as 16 lowercase hex characters.
# Can be omitted or set to nil if span is the root span of a trace.
id: nil, # Id of current span, unique in context of traceId.
# 8-byte identifier encoded as 16 lowercase hex characters.
timestamp: nil, # Epoch **microseconds** of the start of this span,
# possibly absent if this an incomplete span.
duration: nil, # Duration in **microseconds** of the critical path, if known.
# Durations of less than one are rounded up.
debug: false,
annotations: [],
binaryAnnotations: []

def start_span(%Conn{} = conn, opts \\ []) do
request_id = get_request_id(conn, Ecto.UUID.generate())
timestamp = System.monotonic_time() |> System.convert_time_unit(:native, :microseconds)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a function call when a pipeline is only one function long

endpoint = nil

annotations =
opts
|> Keyword.get(:annotations, [])
|> Enum.map(fn {key, value} -> %BinaryAnnotation{key: key, value: value, endpoint: endpoint} end)

%Trace{
traceId: request_id,
name: "gateway request",
id: Ecto.UUID.generate(),
timestamp: timestamp,
binaryAnnotations: annotations
}
end

def end_span(%Trace{} = trace, opts \\ []) do
duration = System.convert_time_unit(System.monotonic_time(), :native, :microseconds) - trace.timestamp
endpoint = nil

annotations =
opts
|> Keyword.get(:annotations, [])
|> Enum.reduce(trace.annotations, fn {key, value}, annotations ->
[%BinaryAnnotation{key: key, value: value, endpoint: endpoint}] ++ annotations
end)

%{trace |
duration: duration,
annotations: annotations
}
end

defp get_request_id(conn, default) do
case Conn.get_resp_header(conn, "x-request-id") do
[] -> default
[id | _] -> id
end
end
end
5 changes: 5 additions & 0 deletions lib/annon_api/monitoring/trace/annotation.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
defmodule Annon.Monitoring.Trace.Annotation do
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modules should have a @moduledoc tag.

defstruct timestamp: nil,
value: nil,
endpoint: %Annon.Monitoring.Trace.Endpoint{}
end
5 changes: 5 additions & 0 deletions lib/annon_api/monitoring/trace/binary_annotation.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
defmodule Annon.Monitoring.Trace.BinaryAnnotation do
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modules should have a @moduledoc tag.

defstruct key: nil,
value: nil,
endpoint: %Annon.Monitoring.Trace.Endpoint{}
end
6 changes: 6 additions & 0 deletions lib/annon_api/monitoring/trace/endpoint.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
defmodule Annon.Monitoring.Trace.Endpoint do
defstruct serviceName: nil, # Classifier of this endpoint in lowercase, such as "acme-front-end"
ipv4: nil, # The text representation of a IPv4 address associated with this endpoint. Ex. 192.168.99.100
ipv6: nil, # The text representation of a IPv6 address associated with this endpoint. Ex. 2001:db8::c001
port: nil
end
17 changes: 17 additions & 0 deletions lib/annon_api/monitoring/trace_collector.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Annon.Monitoring.TraceCollector do
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modules should have a @moduledoc tag.

alias Annon.Monitoring.Trace

def send_span(conn) do
span =
conn
|> Trace.start_span()
|> Trace.end_span()

spans = Poison.encode!([span])

IO.inspect HTTPoison.post!("http://localhost:9411/api/v1/spans", spans, [
{"content-type", "application/json"},
{"accept", "application/json"},
])
end
end
7 changes: 2 additions & 5 deletions lib/annon_api/plugins/logger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,8 @@ defmodule Annon.Plugins.Logger do
end

defp get_latencies_data(conn) do
%{
gateway: Map.get(conn.assigns, :latencies_gateway),
upstream: Map.get(conn.assigns, :latencies_upstream),
client_request: Map.get(conn.assigns, :latencies_client)
}
conn.assigns
|> Map.get(:latencies)
|> prepare_params
end

Expand Down
111 changes: 62 additions & 49 deletions lib/annon_api/plugins/monitoring.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ defmodule Annon.Plugins.Monitoring do
"""
use Annon.Plugin, plugin_name: :monitoring
alias Plug.Conn
alias Annon.Monitoring.MetricsCollector
alias Annon.Monitoring.Latencies

def validate_settings(changeset),
do: changeset
Expand All @@ -16,75 +18,86 @@ defmodule Annon.Plugins.Monitoring do
do: %{}

def execute(%Conn{} = conn, %{api: api, start_time: request_start_time}, _settings) do
api_tags = tags(conn, api)
content_length = get_content_length(conn, nil)

conn
|> get_request_size()
|> ExStatsD.histogram("request_size", tags: api_tags)
sample_rate =
:annon_api
|> Application.get_env(:metrics_collector)
|> Keyword.get(:sample_rate, 1)
|> Confex.process_env()

collector_opts = [
tags: tags(conn, api),
sample_rate: sample_rate
]

ExStatsD.increment("request_count", tags: api_tags)
request_id = get_request_id(conn, nil)

MetricsCollector.track_request(request_id, content_length, collector_opts)

conn
|> Conn.register_before_send(&write_metrics(&1, api))
|> Conn.register_before_send(&assign_latencies(&1, request_start_time))
|> Conn.register_before_send(&track_latencies(&1, request_id, request_start_time, collector_opts))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a function call when a pipeline is only one function long

end

defp assign_latencies(conn, request_start_time) do
defp track_latencies(conn, request_id, request_start_time, collector_opts) do
request_end_time = System.monotonic_time()
latencies_client = System.convert_time_unit(request_end_time - request_start_time, :native, :micro_seconds)
request_duration = latencies_client - Map.get(conn.assigns, :latencies_upstream, 0)

conn
|> Conn.assign(:latencies_gateway, request_duration)
|> Conn.assign(:latencies_client, latencies_client)
end
latencies_upstream = Map.get(conn.assigns, :latencies_upstream, 0)
latencies_gateway = latencies_client - latencies_upstream

defp write_metrics(%Conn{} = conn, api) do
api_tags = tags(conn, api) ++ ["http_status:#{to_string conn.status}"]
ExStatsD.timer(conn.assigns.latencies_client, "latency", tags: api_tags)
ExStatsD.increment("response_count", tags: api_tags)
conn
end
latencies = %Latencies{
client_request: latencies_client,
upstream: latencies_upstream,
gateway: latencies_gateway
}

defp tags(%Conn{host: host, method: method, port: port} = conn, api),
do: ["http_host:#{to_string host}",
"http_method:#{to_string method}",
"http_port:#{to_string port}"] ++ api_tags(api) ++ get_request_id(conn)
status = conn |> get_conn_status(0) |> Integer.to_string()

defp api_tags(%{name: api_name, id: api_id}),
do: ["api_name:#{to_string api_name}", "api_id:#{to_string api_id}"]
defp api_tags(_),
do: ["api_name:unknown", "api_id:unknown"]
MetricsCollector.track_response(request_id, latencies, [
tags: ["http.status:#{status}"] ++ collector_opts[:tags],
sample_rate: collector_opts[:sample_rate]
])

defp get_request_id(conn) do
id = conn
|> Conn.get_resp_header("x-request-id")
|> Enum.at(0)
Annon.Monitoring.TraceCollector.send_span(conn)

["request_id:#{to_string id}"]
conn
|> Conn.assign(:latencies_gateway, latencies_gateway)
|> Conn.assign(:latencies_client, latencies_client)
|> Conn.assign(:latencies, latencies)
end

defp get_request_size(conn) do
get_headers_size(conn) + get_body_size(conn) + get_query_string_size(conn)
defp tags(%Conn{host: host, method: method, port: port} = conn, nil) do
port = Integer.to_string(port)
request_id = get_request_id(conn, "unknown")

["http.host:#{host}", "http.method:#{method}", "http.port:#{port}",
"api.name:unknown", "api.id:unknown", "request.id:#{request_id}"]
end
defp tags(%Conn{host: host, method: method, port: port} = conn, api) do
port = Integer.to_string(port)
request_id = get_request_id(conn, "unknown")
%{id: api_id, name: api_name} = api

defp get_headers_size(%Conn{req_headers: req_headers}) do
req_headers
|> Enum.map(&Tuple.to_list(&1))
|> List.flatten
|> Enum.join
|> byte_size
["http.host:#{host}", "http.method:#{method}", "http.port:#{port}",
"api.name:#{api_name}", "api.id:#{api_id}", "request.id:#{request_id}"]
end

defp get_body_size(conn) do
conn
|> Conn.read_body
|> elem(1)
|> byte_size
defp get_request_id(conn, default) do
case Conn.get_resp_header(conn, "x-request-id") do
[] -> default
[id | _] -> id
end
end

defp get_query_string_size(%Conn{query_string: query_string}) do
query_string
|> byte_size
defp get_content_length(conn, default) do
case Conn.get_resp_header(conn, "content-length") do
[] -> default
[id | _] -> id
end
end

def get_conn_status(%{status: nil}, default),
do: default
def get_conn_status(%{status: status}, _default),
do: status
end
Loading