opentelemetry-erlang-contrib/instrumentation/opentelemetry_ecto/lib/opentelemetry_ecto.ex

115 lines
3.2 KiB
Elixir
Raw Normal View History

defmodule OpentelemetryEcto do
@moduledoc """
Telemetry handler for creating OpenTelemetry Spans from Ecto query events.
"""
require OpenTelemetry.Tracer
@doc """
Attaches the OpentelemetryEcto handler to your repo events. This should be called
from your application behaviour on startup.
Example:
OpentelemetryEcto.setup([:blog, :repo])
You may also supply the following options in the second argument:
* `:time_unit` - a time unit used to convert the values of query phase
timings, defaults to `:microsecond`. See `System.convert_time_unit/3`
* `:span_prefix` - the first part of the span name, as a `String.t`,
defaults to the concatenation of the event name with periods, e.g.
`"blog.repo.query"`. This will always be followed with a colon and the
source (the table name for SQL adapters).
"""
def setup(event_prefix, config \\ []) do
event = event_prefix ++ [:query]
:telemetry.attach({__MODULE__, event}, event, &__MODULE__.handle_event/4, config)
end
@doc false
def handle_event(
event,
measurements,
%{query: query, source: source, result: query_result, repo: repo, type: type},
config
) do
# Doing all this even if the span isn't sampled so the sampler
# could technically use the attributes to decide if it should sample or not
total_time = measurements.total_time
end_time = :opentelemetry.timestamp()
start_time = end_time - total_time
database = repo.config()[:database]
url =
case repo.config()[:url] do
nil ->
# TODO: add port
URI.to_string(%URI{scheme: "ecto", host: repo.config()[:hostname]})
url ->
url
end
span_name =
case Keyword.fetch(config, :span_prefix) do
{:ok, prefix} -> prefix
:error -> Enum.join(event, ".")
end <> if source != nil, do: ":#{source}", else: ""
time_unit = Keyword.get(config, :time_unit, :microsecond)
db_type =
case type do
:ecto_sql_query -> :sql
_ -> type
end
# TODO: need connection information to complete the required attributes
# net.peer.name or net.peer.ip and net.peer.port
base_attributes = %{
"db.type": db_type,
"db.statement": query,
source: source,
"db.instance": database,
"db.url": url,
"total_time_#{time_unit}s": System.convert_time_unit(total_time, :native, time_unit)
}
attributes =
measurements
|> Enum.reduce(%{}, fn
{k, v}, acc when not is_nil(v) and k in [:decode_time, :query_time, :queue_time] ->
Map.put(acc, String.to_atom("#{k}_#{time_unit}s"), System.convert_time_unit(v, :native, time_unit))
_, acc ->
acc
end)
s =
OpenTelemetry.Tracer.start_span(span_name, %{
start_time: start_time,
attributes: Map.merge(attributes, base_attributes),
kind: :client
})
case query_result do
{:error, error} ->
OpenTelemetry.Span.set_status(s, OpenTelemetry.status(:error, format_error(error)))
{:ok, _} ->
:ok
end
OpenTelemetry.Span.end_span(s)
end
defp format_error(%{__exception__: true} = exception) do
Exception.message(exception)
end
defp format_error(_), do: ""
end