defmodule OpentelemetryEcto do @moduledoc """ Telemetry handler for creating OpenTelemetry Spans from Ecto query events. Any relation preloads, which are executed in parallel in separate tasks, will be linked to the span of the process that initiated the call. For example: Tracer.with_span "parent span" do Repo.all(Query.from(User, preload: [:posts, :comments])) end This will create a span called `"parent span:"` with three child spans for each query: users, posts, and comments. > #### Note {: .neutral} > > Due to limitations with how Ecto emits its telemetry, nested preloads are not > represented as nested spans within a trace. """ require OpenTelemetry.Tracer @typedoc """ Option that you can pass to `setup/2`. """ @typedoc since: "1.3.0" @type setup_option() :: {:time_unit, System.time_unit()} | {:span_prefix, String.t()} | {:additional_attributes, %{String.t() => term()}} | {:db_statement, :enabled | :disabled | (String.t() -> String.t())} @doc """ Attaches the `OpentelemetryEcto` handler to your repo events. This should be called from your application's `c:Application.start/2` callback on startup, before starting the application's top-level supervisor. `event_prefix` must be the prefix configured in the `Ecto.Repo` Telemetry configuration. By default, it's the snake-cased name of the repository module. For `MyApp.Repo`, it would be `[:my_app, :repo]`. For example: @impl Application def start(_type, _args) do OpentelemetryEcto.setup([:blog, :repo]) children = [...] Supervisor.start_link(children, strategy: :one_for_one) end ## Options You may also supply the following options in the second argument: * `:time_unit` - a time unit used to convert the values of query phase timings, defaults to `:microsecond`. See `System.convert_time_unit/3`. * `:span_prefix` - the first part of the span name. Defaults to the concatenation of the event name with periods, such as `"blog.repo.query"`. This will always be followed with a colon and the source (the table name for SQL adapters). For example: `"blog.repo.query:users"`. * `:additional_attributes` - additional attributes to include in the span. If there are conflits with default provided attributes, the ones provided with this config will have precedence. * `:db_statement` - `:disabled` (default), `:enabled`, or a function. Whether or not to include DB statements in the **span attributes** (as the `db.statement` attribute). Optionally provide a function that takes a query string and returns a sanitized version of it. This is useful for removing sensitive information from the query string. Unless this option is `:enabled` or a function, query statements will not be recorded on spans. """ @spec setup(:telemetry.event_name(), [setup_option()]) :: :ok | {:error, :already_exists} def setup(event_prefix, options \\ []) when is_list(options) do event = event_prefix ++ [:query] :telemetry.attach({__MODULE__, event}, event, &__MODULE__.handle_event/4, options) end @doc false def handle_event( event, measurements, %{query: query, source: source, result: query_result, repo: repo, type: type}, config ) do # Doing all this even if the span isn't sampled so the sampler # could technically use the attributes to decide if it should sample or not total_time = measurements.total_time end_time = :opentelemetry.timestamp() start_time = end_time - total_time database = repo.config()[:database] url = case repo.config()[:url] do nil -> # TODO: add port URI.to_string(%URI{scheme: "ecto", host: repo.config()[:hostname]}) url -> url end span_prefix = case Keyword.fetch(config, :span_prefix) do {:ok, prefix} -> prefix :error -> Enum.join(event, ".") end span_suffix = if source != nil, do: ":#{source}", else: "" span_name = span_prefix <> span_suffix time_unit = Keyword.get(config, :time_unit, :microsecond) additional_attributes = Keyword.get(config, :additional_attributes, %{}) db_type = case type do :ecto_sql_query -> :sql _ -> type end # TODO: need connection information to complete the required attributes # net.peer.name or net.peer.ip and net.peer.port base_attributes = %{ "db.type": db_type, source: source, "db.instance": database, "db.name": database, "db.url": url, "total_time_#{time_unit}s": System.convert_time_unit(total_time, :native, time_unit) } db_statement_config = Keyword.get(config, :db_statement, :disabled) attributes = base_attributes |> add_measurements(measurements, time_unit) |> maybe_add_db_statement(db_statement_config, query) |> maybe_add_db_system(repo.__adapter__()) |> add_additional_attributes(additional_attributes) parent_context = case OpentelemetryProcessPropagator.fetch_ctx(self()) do :undefined -> OpentelemetryProcessPropagator.fetch_parent_ctx(1, :"$callers") ctx -> ctx end parent_token = if parent_context != :undefined do OpenTelemetry.Ctx.attach(parent_context) else :undefined end s = OpenTelemetry.Tracer.start_span(span_name, %{ start_time: start_time, attributes: attributes, kind: :client }) case query_result do {:error, error} -> OpenTelemetry.Span.set_status(s, OpenTelemetry.status(:error, format_error(error))) {:ok, _} -> :ok end OpenTelemetry.Span.end_span(s) if parent_token != :undefined do OpenTelemetry.Ctx.detach(parent_token) end end defp format_error(%{__exception__: true} = exception) do Exception.message(exception) end defp format_error(_), do: "" defp add_measurements(attributes, measurements, time_unit) do measurements |> Enum.reduce(attributes, fn {k, v}, acc when not is_nil(v) and k in [:decode_time, :query_time, :queue_time, :idle_time] -> Map.put( acc, String.to_atom("#{k}_#{time_unit}s"), System.convert_time_unit(v, :native, time_unit) ) _, acc -> acc end) end defp maybe_add_db_statement(attributes, :enabled, query) do Map.put(attributes, :"db.statement", query) end defp maybe_add_db_statement(attributes, :disabled, _query) do attributes end defp maybe_add_db_statement(attributes, sanitizer, query) when is_function(sanitizer, 1) do Map.put(attributes, :"db.statement", sanitizer.(query)) end defp maybe_add_db_statement(attributes, _, _query) do attributes end defp maybe_add_db_system(attributes, Ecto.Adapters.Postgres) do Map.put(attributes, :"db.system", :postgresql) end defp maybe_add_db_system(attributes, Ecto.Adapters.MyXQL) do Map.put(attributes, :"db.system", :mysql) end defp maybe_add_db_system(attributes, Ecto.Adapters.SQLite3) do Map.put(attributes, :"db.system", :sqlite) end defp maybe_add_db_system(attributes, Ecto.Adapters.Tds) do Map.put(attributes, :"db.system", :mssql) end defp maybe_add_db_system(attributes, _) do attributes end defp add_additional_attributes(attributes, additional_attributes) do Map.merge(attributes, additional_attributes) end end