Add instrumentation for Nebulex, a distributed cache library. This library provides solid telemetry support for this initial implementation. Caching implementation is mostly based on in-memory storage (like ETS) and RPC calls for distribution (via OTP libraries, like :erpc). AFAICT, there is not much specifics for how to translate into Semantic Attributes: those caches are not quite a DB, except maybe for the one which implements the storage; the RPC can't be reliably captured either. Given the above constraints, this initial implementation instruments the library via custom attributes (namespaced as `nebulex.*`). It's not 100% clear the behaviour of OTel for actual distributed caches - from my tests, that may create some orphan spans. I think that's fine as first release. Nebulex follow the patterns of Ecto, so this instrumentation follows a similar pattern of OpentelemetryEcto. It does include a `setup_all/1` function for convenience, that leverages the :init events Nebulex emit on process start. Co-authored-by: Tristan Sloughter <>
112 lines
3.4 KiB
112 lines
3.4 KiB
defmodule OpentelemetryNebulex do
@moduledoc """
OpentelemetryNebulex uses `telemetry` handlers to create `OpenTelemetry` spans
from Nebulex command events.
@tracer_id __MODULE__
@doc """
Initializes and configures telemetry handlers for a given cache.
OpentelemetryNebulex.setup([:blog, :partitioned_cache])
def setup(event_prefix, opts \\ []) do
{__MODULE__, event_prefix, :command_start},
event_prefix ++ [:command, :start],
{__MODULE__, event_prefix, :command_stop},
event_prefix ++ [:command, :stop],
{__MODULE__, event_prefix, :command_exception},
event_prefix ++ [:command, :exception],
@doc """
Initializes and configures telemetry handlers for all caches.
Use the `[:nebulex, :cache, :init]` event to automatically discover caches, and attach
the handlers dynamically. It only works for caches that start after this function is called.
def setup_all(opts \\ []) do
[:nebulex, :cache, :init],
@doc false
def handle_init(_event, _measurements, metadata, config) do
setup(metadata[:opts][:telemetry_prefix], config)
@doc false
def handle_command_start(_event, _measurements, metadata, _config) do
span_name = "nebulex #{metadata.function_name}"
attributes =
"nebulex.cache": metadata.adapter_meta.cache
|> maybe_put(:"nebulex.backend", metadata.adapter_meta[:backend])
|> maybe_put(:"nebulex.keyslot", metadata.adapter_meta[:keyslot])
|> maybe_put(:"nebulex.model", metadata.adapter_meta[:model])
OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name, metadata, %{
attributes: attributes
@doc false
def handle_command_stop(_event, _measurements, metadata, _config) do
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata)
if action = extract_action(metadata) do
OpenTelemetry.Span.set_attribute(ctx, :"nebulex.action", action)
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
@doc false
def handle_command_exception(_event, _measurements, metadata, _config) do
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata)
OpenTelemetry.Span.record_exception(ctx, metadata.reason, metadata.stacktrace)
OpenTelemetry.Tracer.set_status(OpenTelemetry.status(:error, format_error(metadata.reason)))
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
defp maybe_put(attributes, _key, nil), do: attributes
defp maybe_put(attributes, key, value), do: Map.put(attributes, key, value)
defp extract_action(%{function_name: f, result: :"$expired"}) when f in [:get, :take], do: :miss
defp extract_action(%{function_name: f, result: nil}) when f in [:get, :take], do: :miss
defp extract_action(%{function_name: f, result: _}) when f in [:get, :take], do: :hit
defp extract_action(_), do: nil
defp format_error(exception) when is_exception(exception), do: Exception.message(exception)
defp format_error(error), do: inspect(error)