Add opentelemetry integration to Oban (#6)
By default a new trace is automatically started when a job is processed
by monitoring these events:
* `[:oban, :job, :start]` — at the point a job is fetched from the database and will execute
* `[:oban, :job, :stop]` — after a job succeeds and the success is recorded in the database
* `[:oban, :job, :exception]` — after a job fails and the failure is recorded in the database
To also record a span when a job is created and to link traces together
`Oban.insert/2` has to be replaced by `OpentelemetryOban.insert/2`.
Before:
```elixir
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> Oban.insert()
```
After:
```elixir
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> OpentelemetryOban.insert()
```
Co-authored-by: Tristan Sloughter <t@crashfast.com>
2021-12-08 15:41:36 +00:00
|
|
|
defmodule OpentelemetryOban do
|
|
|
|
@moduledoc """
|
|
|
|
OpentelemetryOban uses [telemetry](https://hexdocs.pm/telemetry/) handlers to
|
|
|
|
create `OpenTelemetry` spans from Oban events.
|
|
|
|
|
|
|
|
Supported events include job start/stop and also when an exception is raised.
|
|
|
|
|
|
|
|
## Usage
|
|
|
|
|
|
|
|
In your application start:
|
|
|
|
|
|
|
|
def start(_type, _args) do
|
|
|
|
OpentelemetryOban.setup()
|
|
|
|
|
|
|
|
# ...
|
|
|
|
end
|
|
|
|
"""
|
|
|
|
|
|
|
|
alias Ecto.Changeset
|
|
|
|
alias OpenTelemetry.Span
|
|
|
|
|
|
|
|
require OpenTelemetry.Tracer
|
|
|
|
|
|
|
|
@doc """
|
|
|
|
Initializes and configures telemetry handlers.
|
|
|
|
|
|
|
|
By default jobs and plugins are traced. If you wish to trace only jobs then
|
|
|
|
use:
|
|
|
|
|
|
|
|
OpentelemetryOban.setup(trace: [:jobs])
|
|
|
|
|
|
|
|
Note that if you don't trace plugins, but inside the plugins, there are spans
|
|
|
|
from other instrumentation libraries (e.g. ecto) then these will still be
|
|
|
|
traced. This setting controls only the spans that are created by
|
|
|
|
opentelemetry_oban.
|
|
|
|
"""
|
|
|
|
@spec setup() :: :ok
|
|
|
|
def setup(opts \\ []) do
|
|
|
|
trace = Keyword.get(opts, :trace, [:jobs, :plugins])
|
|
|
|
|
|
|
|
if Enum.member?(trace, :jobs) do
|
|
|
|
OpentelemetryOban.JobHandler.attach()
|
|
|
|
end
|
|
|
|
|
|
|
|
if Enum.member?(trace, :plugins) do
|
|
|
|
OpentelemetryOban.PluginHandler.attach()
|
|
|
|
end
|
|
|
|
|
|
|
|
:ok
|
|
|
|
end
|
|
|
|
|
|
|
|
def insert(name \\ Oban, %Changeset{} = changeset) do
|
|
|
|
attributes = attributes_before_insert(changeset)
|
|
|
|
worker = Changeset.get_field(changeset, :worker, "unknown")
|
|
|
|
|
|
|
|
OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do
|
|
|
|
changeset = add_tracing_information_to_meta(changeset)
|
|
|
|
|
|
|
|
case Oban.insert(name, changeset) do
|
|
|
|
{:ok, job} ->
|
|
|
|
OpenTelemetry.Tracer.set_attributes(attributes_after_insert(job))
|
|
|
|
{:ok, job}
|
|
|
|
|
|
|
|
other ->
|
|
|
|
other
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
def insert(name \\ Oban, multi, multi_name, changeset_or_fun) do
|
|
|
|
Oban.insert(name, multi, multi_name, changeset_or_fun)
|
|
|
|
end
|
|
|
|
|
|
|
|
def insert!(name \\ Oban, %Changeset{} = changeset) do
|
|
|
|
attributes = attributes_before_insert(changeset)
|
|
|
|
worker = Changeset.get_field(changeset, :worker, "unknown")
|
|
|
|
|
|
|
|
OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do
|
|
|
|
changeset = add_tracing_information_to_meta(changeset)
|
|
|
|
|
|
|
|
try do
|
|
|
|
job = Oban.insert!(name, changeset)
|
|
|
|
OpenTelemetry.Tracer.set_attributes(attributes_after_insert(job))
|
|
|
|
job
|
|
|
|
rescue
|
|
|
|
exception ->
|
|
|
|
ctx = OpenTelemetry.Tracer.current_span_ctx()
|
|
|
|
Span.record_exception(ctx, exception, __STACKTRACE__)
|
|
|
|
Span.set_status(ctx, OpenTelemetry.status(:error, ""))
|
|
|
|
reraise exception, __STACKTRACE__
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
def insert_all(name \\ Oban, changesets_or_wrapper)
|
|
|
|
|
|
|
|
def insert_all(name, %{changesets: changesets}) when is_list(changesets) do
|
|
|
|
insert_all(name, changesets)
|
|
|
|
end
|
|
|
|
|
|
|
|
def insert_all(name, changesets) when is_list(changesets) do
|
|
|
|
# changesets in insert_all can include different workers and different
|
|
|
|
# queues. This means we cannot provide much information here, but we can
|
|
|
|
# still record the insert and propagate the context information.
|
2021-12-28 23:39:06 +00:00
|
|
|
OpenTelemetry.Tracer.with_span :"Oban bulk insert", kind: :producer do
|
Add opentelemetry integration to Oban (#6)
By default a new trace is automatically started when a job is processed
by monitoring these events:
* `[:oban, :job, :start]` — at the point a job is fetched from the database and will execute
* `[:oban, :job, :stop]` — after a job succeeds and the success is recorded in the database
* `[:oban, :job, :exception]` — after a job fails and the failure is recorded in the database
To also record a span when a job is created and to link traces together
`Oban.insert/2` has to be replaced by `OpentelemetryOban.insert/2`.
Before:
```elixir
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> Oban.insert()
```
After:
```elixir
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> OpentelemetryOban.insert()
```
Co-authored-by: Tristan Sloughter <t@crashfast.com>
2021-12-08 15:41:36 +00:00
|
|
|
changesets = Enum.map(changesets, &add_tracing_information_to_meta/1)
|
|
|
|
Oban.insert_all(name, changesets)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2022-08-28 11:45:02 +00:00
|
|
|
def insert_all(name \\ Oban, multi, multi_name, changesets_or_wrapper) do
|
Add opentelemetry integration to Oban (#6)
By default a new trace is automatically started when a job is processed
by monitoring these events:
* `[:oban, :job, :start]` — at the point a job is fetched from the database and will execute
* `[:oban, :job, :stop]` — after a job succeeds and the success is recorded in the database
* `[:oban, :job, :exception]` — after a job fails and the failure is recorded in the database
To also record a span when a job is created and to link traces together
`Oban.insert/2` has to be replaced by `OpentelemetryOban.insert/2`.
Before:
```elixir
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> Oban.insert()
```
After:
```elixir
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> OpentelemetryOban.insert()
```
Co-authored-by: Tristan Sloughter <t@crashfast.com>
2021-12-08 15:41:36 +00:00
|
|
|
Oban.insert_all(name, multi, multi_name, changesets_or_wrapper)
|
|
|
|
end
|
|
|
|
|
|
|
|
defp add_tracing_information_to_meta(changeset) do
|
|
|
|
meta = Changeset.get_field(changeset, :meta, %{})
|
|
|
|
|
|
|
|
new_meta =
|
|
|
|
[]
|
|
|
|
|> :otel_propagator_text_map.inject()
|
|
|
|
|> Enum.into(meta)
|
|
|
|
|
|
|
|
Changeset.change(changeset, %{meta: new_meta})
|
|
|
|
end
|
|
|
|
|
|
|
|
defp attributes_before_insert(changeset) do
|
|
|
|
queue = Changeset.get_field(changeset, :queue, "unknown")
|
|
|
|
worker = Changeset.get_field(changeset, :worker, "unknown")
|
|
|
|
|
2021-12-28 23:39:06 +00:00
|
|
|
%{
|
|
|
|
"messaging.system": :oban,
|
Add opentelemetry integration to Oban (#6)
By default a new trace is automatically started when a job is processed
by monitoring these events:
* `[:oban, :job, :start]` — at the point a job is fetched from the database and will execute
* `[:oban, :job, :stop]` — after a job succeeds and the success is recorded in the database
* `[:oban, :job, :exception]` — after a job fails and the failure is recorded in the database
To also record a span when a job is created and to link traces together
`Oban.insert/2` has to be replaced by `OpentelemetryOban.insert/2`.
Before:
```elixir
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> Oban.insert()
```
After:
```elixir
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> OpentelemetryOban.insert()
```
Co-authored-by: Tristan Sloughter <t@crashfast.com>
2021-12-08 15:41:36 +00:00
|
|
|
"messaging.destination": queue,
|
2021-12-28 23:39:06 +00:00
|
|
|
"messaging.destination_kind": :queue,
|
Add opentelemetry integration to Oban (#6)
By default a new trace is automatically started when a job is processed
by monitoring these events:
* `[:oban, :job, :start]` — at the point a job is fetched from the database and will execute
* `[:oban, :job, :stop]` — after a job succeeds and the success is recorded in the database
* `[:oban, :job, :exception]` — after a job fails and the failure is recorded in the database
To also record a span when a job is created and to link traces together
`Oban.insert/2` has to be replaced by `OpentelemetryOban.insert/2`.
Before:
```elixir
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> Oban.insert()
```
After:
```elixir
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> OpentelemetryOban.insert()
```
Co-authored-by: Tristan Sloughter <t@crashfast.com>
2021-12-08 15:41:36 +00:00
|
|
|
"messaging.oban.worker": worker
|
2021-12-28 23:39:06 +00:00
|
|
|
}
|
Add opentelemetry integration to Oban (#6)
By default a new trace is automatically started when a job is processed
by monitoring these events:
* `[:oban, :job, :start]` — at the point a job is fetched from the database and will execute
* `[:oban, :job, :stop]` — after a job succeeds and the success is recorded in the database
* `[:oban, :job, :exception]` — after a job fails and the failure is recorded in the database
To also record a span when a job is created and to link traces together
`Oban.insert/2` has to be replaced by `OpentelemetryOban.insert/2`.
Before:
```elixir
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> Oban.insert()
```
After:
```elixir
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> OpentelemetryOban.insert()
```
Co-authored-by: Tristan Sloughter <t@crashfast.com>
2021-12-08 15:41:36 +00:00
|
|
|
end
|
|
|
|
|
|
|
|
defp attributes_after_insert(job) do
|
2021-12-28 23:39:06 +00:00
|
|
|
%{
|
Add opentelemetry integration to Oban (#6)
By default a new trace is automatically started when a job is processed
by monitoring these events:
* `[:oban, :job, :start]` — at the point a job is fetched from the database and will execute
* `[:oban, :job, :stop]` — after a job succeeds and the success is recorded in the database
* `[:oban, :job, :exception]` — after a job fails and the failure is recorded in the database
To also record a span when a job is created and to link traces together
`Oban.insert/2` has to be replaced by `OpentelemetryOban.insert/2`.
Before:
```elixir
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> Oban.insert()
```
After:
```elixir
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> OpentelemetryOban.insert()
```
Co-authored-by: Tristan Sloughter <t@crashfast.com>
2021-12-08 15:41:36 +00:00
|
|
|
"messaging.oban.job_id": job.id,
|
|
|
|
"messaging.oban.priority": job.priority,
|
|
|
|
"messaging.oban.max_attempts": job.max_attempts
|
2021-12-28 23:39:06 +00:00
|
|
|
}
|
Add opentelemetry integration to Oban (#6)
By default a new trace is automatically started when a job is processed
by monitoring these events:
* `[:oban, :job, :start]` — at the point a job is fetched from the database and will execute
* `[:oban, :job, :stop]` — after a job succeeds and the success is recorded in the database
* `[:oban, :job, :exception]` — after a job fails and the failure is recorded in the database
To also record a span when a job is created and to link traces together
`Oban.insert/2` has to be replaced by `OpentelemetryOban.insert/2`.
Before:
```elixir
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> Oban.insert()
```
After:
```elixir
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> OpentelemetryOban.insert()
```
Co-authored-by: Tristan Sloughter <t@crashfast.com>
2021-12-08 15:41:36 +00:00
|
|
|
end
|
|
|
|
end
|