fix: setting oban attributes (#247)

* fix: passing custom oban attributes to span
This commit is contained in:
Yordis Prieto 2024-02-14 11:47:07 -05:00 committed by GitHub
parent 102b61349c
commit b12a464b5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 245 additions and 39 deletions

View File

@ -1,5 +1,37 @@
# Changelog # Changelog
## 1.1.0
### Changed
* Improve `OpentelemetryOban.PluginHandler` Tracer span attributes.
The Plugin's span introduce a set of attributes prefixed with `oban.`.
Previously, no attributes were added to the span. The new attributes are:
* All Plugin:
* `oban.plugin`
* `Oban.Plugins.Cron` Plugin:
* `oban.jobs_count`
* `Oban.Plugins.Gossip` Plugin:
* `oban.gossip_count`
* `Oban.Plugins.Lifeline` Plugin:
* `oban.discarded_count`
* `oban.rescued_count`
* `Oban.Plugins.Pruner` Plugin:
* `oban.pruned_count`
* `Oban.Pro.Plugins.DynamicCron` Plugin:
* `oban.jobs_count`
* `Oban.Pro.Plugins.DynamicLifeline` Plugin:
* `oban.discarded_count`
* `oban.rescued_count`
* `Oban.Pro.Plugins.DynamicPrioritizer` Plugin:
* `oban.reprioritized_count`
* `Oban.Pro.Plugins.DynamicPruner` Plugin:
* `oban.pruned_count`
* `Oban.Pro.Plugins.DynamicScaler` Plugin:
* `oban.scaler.last_scaled_to`
* `oban.scaler.last_scaled_at`
## 1.0.0 ## 1.0.0
### Changed ### Changed

View File

@ -133,15 +133,15 @@ defmodule OpentelemetryOban do
Trace.messaging_system() => :oban, Trace.messaging_system() => :oban,
Trace.messaging_destination() => queue, Trace.messaging_destination() => queue,
Trace.messaging_destination_kind() => :queue, Trace.messaging_destination_kind() => :queue,
:"messaging.oban.worker" => worker :"oban.job.worker" => worker
} }
end end
defp attributes_after_insert(job) do defp attributes_after_insert(job) do
%{ %{
"messaging.oban.job_id": job.id, "oban.job.job_id": job.id,
"messaging.oban.priority": job.priority, "oban.job.priority": job.priority,
"messaging.oban.max_attempts": job.max_attempts "oban.job.max_attempts": job.max_attempts
} }
end end
end end

View File

@ -64,14 +64,13 @@ defmodule OpentelemetryOban.JobHandler do
Trace.messaging_destination() => queue, Trace.messaging_destination() => queue,
Trace.messaging_destination_kind() => :queue, Trace.messaging_destination_kind() => :queue,
Trace.messaging_operation() => :process, Trace.messaging_operation() => :process,
:"messaging.oban.job_id" => id, :"oban.job.job_id" => id,
:"messaging.oban.worker" => worker, :"oban.job.worker" => worker,
:"messaging.oban.priority" => priority, :"oban.job.priority" => priority,
:"messaging.oban.attempt" => attempt, :"oban.job.attempt" => attempt,
:"messaging.oban.max_attempts" => max_attempts, :"oban.job.max_attempts" => max_attempts,
:"messaging.oban.inserted_at" => :"oban.job.inserted_at" => DateTime.to_iso8601(inserted_at),
if(inserted_at, do: DateTime.to_iso8601(inserted_at), else: nil), :"oban.job.scheduled_at" => DateTime.to_iso8601(scheduled_at)
:"messaging.oban.scheduled_at" => DateTime.to_iso8601(scheduled_at)
} }
span_name = "#{worker} process" span_name = "#{worker} process"

View File

@ -1,4 +1,5 @@
defmodule OpentelemetryOban.PluginHandler do defmodule OpentelemetryOban.PluginHandler do
alias OpenTelemetry.Tracer
alias OpenTelemetry.Span alias OpenTelemetry.Span
@tracer_id __MODULE__ @tracer_id __MODULE__
@ -41,11 +42,12 @@ defmodule OpentelemetryOban.PluginHandler do
@tracer_id, @tracer_id,
"#{plugin} process", "#{plugin} process",
metadata, metadata,
%{} %{attributes: %{"oban.plugin": plugin}}
) )
end end
def handle_plugin_stop(_event, _measurements, metadata, _config) do def handle_plugin_stop(_event, _measurements, metadata, _config) do
Tracer.set_attributes(end_span_plugin_attrs(metadata))
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
end end
@ -63,4 +65,54 @@ defmodule OpentelemetryOban.PluginHandler do
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
end end
defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Cron} = metadata) do
%{"oban.plugins.cron.jobs_count": length(metadata[:jobs])}
end
defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Gossip} = metadata) do
%{"oban.plugins.gossip.gossip_count": metadata[:gossip_count]}
end
defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Lifeline} = metadata) do
%{
"oban.plugins.lifeline.discarded_count": metadata[:discarded_count],
"oban.plugins.lifeline.rescued_count": metadata[:rescued_count]
}
end
defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Pruner} = metadata) do
%{"oban.plugins.pruner.pruned_count": metadata[:pruned_count]}
end
defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicCron} = metadata) do
%{"oban.pro.plugins.dynamic_cron.jobs_count": length(metadata[:jobs])}
end
defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicLifeline} = metadata) do
%{
"oban.pro.plugins.dynamic_lifeline.discarded_count": metadata[:discarded_count],
"oban.pro.plugins.dynamic_lifeline.rescued_count": metadata[:rescued_count]
}
end
defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicPrioritizer} = metadata) do
%{"oban.pro.plugins.dynamic_prioritizer.reprioritized_count": metadata[:reprioritized_count]}
end
defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicPruner} = metadata) do
%{"oban.pro.plugins.dynamic_pruner.pruned_count": metadata[:pruned_count]}
end
defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicScaler} = metadata) do
%{
"oban.pro.plugins.dynamic_scaler.scaler.last_scaled_to": metadata[:scaler][:last_scaled_to],
"oban.pro.plugins.dynamic_scaler.scaler.last_scaled_at":
DateTime.to_iso8601(metadata[:scaler][:last_scaled_at])
}
end
defp end_span_plugin_attrs(_) do
%{}
end
end end

View File

@ -1,7 +1,7 @@
defmodule OpentelemetryOban.MixProject do defmodule OpentelemetryOban.MixProject do
use Mix.Project use Mix.Project
@version "1.0.0" @version "1.1.0"
def project do def project do
[ [

View File

@ -109,4 +109,127 @@ defmodule OpentelemetryOban.PluginHandlerTest do
assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == assert [:"exception.message", :"exception.stacktrace", :"exception.type"] ==
Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) Enum.sort(Map.keys(:otel_attributes.map(event_attributes)))
end end
describe "[:oban, :plugin, :stop] spans" do
test "Oban.Plugins.Cron plugin" do
execute_plugin(Oban.Plugins.Cron, %{jobs: [1, 3, 4]})
assert %{
"oban.plugin": Elixir.Oban.Plugins.Cron,
"oban.plugins.cron.jobs_count": 3
} ==
receive_span_attrs(Oban.Plugins.Cron)
end
test "Oban.Plugins.Gossip plugin" do
execute_plugin(Oban.Plugins.Gossip, %{gossip_count: 3})
assert %{
"oban.plugin": Elixir.Oban.Plugins.Gossip,
"oban.plugins.gossip.gossip_count": 3
} ==
receive_span_attrs(Oban.Plugins.Gossip)
end
test "Oban.Plugins.Lifeline plugin" do
execute_plugin(Oban.Plugins.Lifeline, %{discarded_count: 3, rescued_count: 2})
assert %{
"oban.plugin": Elixir.Oban.Plugins.Lifeline,
"oban.plugins.lifeline.discarded_count": 3,
"oban.plugins.lifeline.rescued_count": 2
} ==
receive_span_attrs(Oban.Plugins.Lifeline)
end
test "Oban.Plugins.Pruner plugin" do
execute_plugin(Oban.Plugins.Pruner, %{pruned_count: 3})
assert %{
"oban.plugin": Elixir.Oban.Plugins.Pruner,
"oban.plugins.pruner.pruned_count": 3
} ==
receive_span_attrs(Oban.Plugins.Pruner)
end
test "Oban.Pro.Plugins.DynamicCron plugin" do
execute_plugin(Oban.Pro.Plugins.DynamicCron, %{jobs: [1, 3, 4]})
assert %{
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicCron,
"oban.pro.plugins.dynamic_cron.jobs_count": 3
} ==
receive_span_attrs(Oban.Pro.Plugins.DynamicCron)
end
test "Oban.Pro.Plugins.DynamicLifeline plugin" do
execute_plugin(Oban.Pro.Plugins.DynamicLifeline, %{discarded_count: 3, rescued_count: 2})
assert %{
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicLifeline,
"oban.pro.plugins.dynamic_lifeline.discarded_count": 3,
"oban.pro.plugins.dynamic_lifeline.rescued_count": 2
} ==
receive_span_attrs(Oban.Pro.Plugins.DynamicLifeline)
end
test "Oban.Pro.Plugins.DynamicPrioritizer plugin" do
execute_plugin(Oban.Pro.Plugins.DynamicPrioritizer, %{reprioritized_count: 3})
assert %{
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicPrioritizer,
"oban.pro.plugins.dynamic_prioritizer.reprioritized_count": 3
} ==
receive_span_attrs(Oban.Pro.Plugins.DynamicPrioritizer)
end
test "Oban.Pro.Plugins.DynamicPruner plugin" do
execute_plugin(Oban.Pro.Plugins.DynamicPruner, %{pruned_count: 3})
assert %{
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicPruner,
"oban.pro.plugins.dynamic_pruner.pruned_count": 3
} ==
receive_span_attrs(Oban.Pro.Plugins.DynamicPruner)
end
test "Oban.Pro.Plugins.DynamicScaler plugin" do
execute_plugin(Oban.Pro.Plugins.DynamicScaler, %{
scaler: %{last_scaled_to: 3, last_scaled_at: ~U[2021-08-01 12:00:00Z]}
})
assert %{
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicScaler,
"oban.pro.plugins.dynamic_scaler.scaler.last_scaled_to": 3,
"oban.pro.plugins.dynamic_scaler.scaler.last_scaled_at": "2021-08-01T12:00:00Z"
} ==
receive_span_attrs(Oban.Pro.Plugins.DynamicScaler)
end
end
defp receive_span_attrs(name) do
name = "#{name} process"
assert_receive(
{:span, span(name: ^name, attributes: attributes)},
100,
"expected span with name #{name} to be received"
)
elem(attributes, 4)
end
defp execute_plugin(plugin_name, metadata) do
:telemetry.execute(
[:oban, :plugin, :start],
%{system_time: System.system_time()},
%{plugin: plugin_name}
)
:telemetry.execute(
[:oban, :plugin, :stop],
%{duration: 42069},
Map.merge(metadata, %{plugin: plugin_name})
)
end
end end

View File

@ -47,10 +47,10 @@ defmodule OpentelemetryObanTest do
assert %{ assert %{
"messaging.destination": "events", "messaging.destination": "events",
"messaging.destination_kind": :queue, "messaging.destination_kind": :queue,
"messaging.oban.job_id": _job_id, "oban.job.job_id": _job_id,
"messaging.oban.max_attempts": 1, "oban.job.max_attempts": 1,
"messaging.oban.priority": 0, "oban.job.priority": 0,
"messaging.oban.worker": "TestJob", "oban.job.worker": "TestJob",
"messaging.system": :oban "messaging.system": :oban
} = :otel_attributes.map(attributes) } = :otel_attributes.map(attributes)
end end
@ -147,13 +147,13 @@ defmodule OpentelemetryObanTest do
assert %{ assert %{
"messaging.destination": "events", "messaging.destination": "events",
"messaging.destination_kind": :queue, "messaging.destination_kind": :queue,
"messaging.oban.attempt": 1, "oban.job.attempt": 1,
"messaging.oban.inserted_at": _inserted_at, "oban.job.inserted_at": _inserted_at,
"messaging.oban.job_id": _job_id, "oban.job.job_id": _job_id,
"messaging.oban.max_attempts": 1, "oban.job.max_attempts": 1,
"messaging.oban.priority": 0, "oban.job.priority": 0,
"messaging.oban.scheduled_at": _scheduled_at, "oban.job.scheduled_at": _scheduled_at,
"messaging.oban.worker": "TestJob", "oban.job.worker": "TestJob",
"messaging.operation": :process, "messaging.operation": :process,
"messaging.system": :oban "messaging.system": :oban
} = :otel_attributes.map(attributes) } = :otel_attributes.map(attributes)
@ -177,13 +177,13 @@ defmodule OpentelemetryObanTest do
assert %{ assert %{
"messaging.destination": "events", "messaging.destination": "events",
"messaging.destination_kind": :queue, "messaging.destination_kind": :queue,
"messaging.oban.attempt": 1, "oban.job.attempt": 1,
"messaging.oban.inserted_at": _inserted_at, "oban.job.inserted_at": _inserted_at,
"messaging.oban.job_id": _job_id, "oban.job.job_id": _job_id,
"messaging.oban.max_attempts": 1, "oban.job.max_attempts": 1,
"messaging.oban.priority": 0, "oban.job.priority": 0,
"messaging.oban.scheduled_at": _scheduled_at, "oban.job.scheduled_at": _scheduled_at,
"messaging.oban.worker": "TestJobThatReturnsError", "oban.job.worker": "TestJobThatReturnsError",
"messaging.operation": :process, "messaging.operation": :process,
"messaging.system": :oban "messaging.system": :oban
} = :otel_attributes.map(attributes) } = :otel_attributes.map(attributes)
@ -255,13 +255,13 @@ defmodule OpentelemetryObanTest do
assert %{ assert %{
"messaging.destination": "events", "messaging.destination": "events",
"messaging.destination_kind": :queue, "messaging.destination_kind": :queue,
"messaging.oban.attempt": 1, "oban.job.attempt": 1,
"messaging.oban.inserted_at": _inserted_at, "oban.job.inserted_at": _inserted_at,
"messaging.oban.job_id": _job_id, "oban.job.job_id": _job_id,
"messaging.oban.max_attempts": 1, "oban.job.max_attempts": 1,
"messaging.oban.priority": 0, "oban.job.priority": 0,
"messaging.oban.scheduled_at": _scheduled_at, "oban.job.scheduled_at": _scheduled_at,
"messaging.oban.worker": "TestJobThatThrowsException", "oban.job.worker": "TestJobThatThrowsException",
"messaging.operation": :process, "messaging.operation": :process,
"messaging.system": :oban "messaging.system": :oban
} = :otel_attributes.map(attributes) } = :otel_attributes.map(attributes)