From b12a464b5f183bd464c0aaaa9b7ff69e28f0242c Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Wed, 14 Feb 2024 11:47:07 -0500 Subject: [PATCH] fix: setting oban attributes (#247) * fix: passing custom oban attributes to span --- .../opentelemetry_oban/CHANGELOG.md | 32 +++++ .../lib/opentelemetry_oban.ex | 8 +- .../lib/opentelemetry_oban/job_handler.ex | 15 +-- .../lib/opentelemetry_oban/plugin_handler.ex | 54 +++++++- instrumentation/opentelemetry_oban/mix.exs | 2 +- .../plugin_handler_test.exs | 123 ++++++++++++++++++ .../test/opentelemetry_oban_test.exs | 50 +++---- 7 files changed, 245 insertions(+), 39 deletions(-) diff --git a/instrumentation/opentelemetry_oban/CHANGELOG.md b/instrumentation/opentelemetry_oban/CHANGELOG.md index 654b49a..2eeb534 100644 --- a/instrumentation/opentelemetry_oban/CHANGELOG.md +++ b/instrumentation/opentelemetry_oban/CHANGELOG.md @@ -1,5 +1,37 @@ # Changelog +## 1.1.0 + +### Changed + +* Improve `OpentelemetryOban.PluginHandler` Tracer span attributes. + The Plugin's span introduce a set of attributes prefixed with `oban.`. + Previously, no attributes were added to the span. The new attributes are: + + * All Plugin: + * `oban.plugin` + * `Oban.Plugins.Cron` Plugin: + * `oban.jobs_count` + * `Oban.Plugins.Gossip` Plugin: + * `oban.gossip_count` + * `Oban.Plugins.Lifeline` Plugin: + * `oban.discarded_count` + * `oban.rescued_count` + * `Oban.Plugins.Pruner` Plugin: + * `oban.pruned_count` + * `Oban.Pro.Plugins.DynamicCron` Plugin: + * `oban.jobs_count` + * `Oban.Pro.Plugins.DynamicLifeline` Plugin: + * `oban.discarded_count` + * `oban.rescued_count` + * `Oban.Pro.Plugins.DynamicPrioritizer` Plugin: + * `oban.reprioritized_count` + * `Oban.Pro.Plugins.DynamicPruner` Plugin: + * `oban.pruned_count` + * `Oban.Pro.Plugins.DynamicScaler` Plugin: + * `oban.scaler.last_scaled_to` + * `oban.scaler.last_scaled_at` + ## 1.0.0 ### Changed diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex index a6af0c0..617062b 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex @@ -133,15 +133,15 @@ defmodule OpentelemetryOban do Trace.messaging_system() => :oban, Trace.messaging_destination() => queue, Trace.messaging_destination_kind() => :queue, - :"messaging.oban.worker" => worker + :"oban.job.worker" => worker } end defp attributes_after_insert(job) do %{ - "messaging.oban.job_id": job.id, - "messaging.oban.priority": job.priority, - "messaging.oban.max_attempts": job.max_attempts + "oban.job.job_id": job.id, + "oban.job.priority": job.priority, + "oban.job.max_attempts": job.max_attempts } end end diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex index 469443e..8d14f78 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex @@ -64,14 +64,13 @@ defmodule OpentelemetryOban.JobHandler do Trace.messaging_destination() => queue, Trace.messaging_destination_kind() => :queue, Trace.messaging_operation() => :process, - :"messaging.oban.job_id" => id, - :"messaging.oban.worker" => worker, - :"messaging.oban.priority" => priority, - :"messaging.oban.attempt" => attempt, - :"messaging.oban.max_attempts" => max_attempts, - :"messaging.oban.inserted_at" => - if(inserted_at, do: DateTime.to_iso8601(inserted_at), else: nil), - :"messaging.oban.scheduled_at" => DateTime.to_iso8601(scheduled_at) + :"oban.job.job_id" => id, + :"oban.job.worker" => worker, + :"oban.job.priority" => priority, + :"oban.job.attempt" => attempt, + :"oban.job.max_attempts" => max_attempts, + :"oban.job.inserted_at" => DateTime.to_iso8601(inserted_at), + :"oban.job.scheduled_at" => DateTime.to_iso8601(scheduled_at) } span_name = "#{worker} process" diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex index dfc94c1..c86e09b 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex @@ -1,4 +1,5 @@ defmodule OpentelemetryOban.PluginHandler do + alias OpenTelemetry.Tracer alias OpenTelemetry.Span @tracer_id __MODULE__ @@ -41,11 +42,12 @@ defmodule OpentelemetryOban.PluginHandler do @tracer_id, "#{plugin} process", metadata, - %{} + %{attributes: %{"oban.plugin": plugin}} ) end def handle_plugin_stop(_event, _measurements, metadata, _config) do + Tracer.set_attributes(end_span_plugin_attrs(metadata)) OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) end @@ -63,4 +65,54 @@ defmodule OpentelemetryOban.PluginHandler do OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) end + + defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Cron} = metadata) do + %{"oban.plugins.cron.jobs_count": length(metadata[:jobs])} + end + + defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Gossip} = metadata) do + %{"oban.plugins.gossip.gossip_count": metadata[:gossip_count]} + end + + defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Lifeline} = metadata) do + %{ + "oban.plugins.lifeline.discarded_count": metadata[:discarded_count], + "oban.plugins.lifeline.rescued_count": metadata[:rescued_count] + } + end + + defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Pruner} = metadata) do + %{"oban.plugins.pruner.pruned_count": metadata[:pruned_count]} + end + + defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicCron} = metadata) do + %{"oban.pro.plugins.dynamic_cron.jobs_count": length(metadata[:jobs])} + end + + defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicLifeline} = metadata) do + %{ + "oban.pro.plugins.dynamic_lifeline.discarded_count": metadata[:discarded_count], + "oban.pro.plugins.dynamic_lifeline.rescued_count": metadata[:rescued_count] + } + end + + defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicPrioritizer} = metadata) do + %{"oban.pro.plugins.dynamic_prioritizer.reprioritized_count": metadata[:reprioritized_count]} + end + + defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicPruner} = metadata) do + %{"oban.pro.plugins.dynamic_pruner.pruned_count": metadata[:pruned_count]} + end + + defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicScaler} = metadata) do + %{ + "oban.pro.plugins.dynamic_scaler.scaler.last_scaled_to": metadata[:scaler][:last_scaled_to], + "oban.pro.plugins.dynamic_scaler.scaler.last_scaled_at": + DateTime.to_iso8601(metadata[:scaler][:last_scaled_at]) + } + end + + defp end_span_plugin_attrs(_) do + %{} + end end diff --git a/instrumentation/opentelemetry_oban/mix.exs b/instrumentation/opentelemetry_oban/mix.exs index 842d9d9..0a56149 100644 --- a/instrumentation/opentelemetry_oban/mix.exs +++ b/instrumentation/opentelemetry_oban/mix.exs @@ -1,7 +1,7 @@ defmodule OpentelemetryOban.MixProject do use Mix.Project - @version "1.0.0" + @version "1.1.0" def project do [ diff --git a/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs b/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs index 81d596f..3b1a4f6 100644 --- a/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs +++ b/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs @@ -109,4 +109,127 @@ defmodule OpentelemetryOban.PluginHandlerTest do assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) end + + describe "[:oban, :plugin, :stop] spans" do + test "Oban.Plugins.Cron plugin" do + execute_plugin(Oban.Plugins.Cron, %{jobs: [1, 3, 4]}) + + assert %{ + "oban.plugin": Elixir.Oban.Plugins.Cron, + "oban.plugins.cron.jobs_count": 3 + } == + receive_span_attrs(Oban.Plugins.Cron) + end + + test "Oban.Plugins.Gossip plugin" do + execute_plugin(Oban.Plugins.Gossip, %{gossip_count: 3}) + + assert %{ + "oban.plugin": Elixir.Oban.Plugins.Gossip, + "oban.plugins.gossip.gossip_count": 3 + } == + receive_span_attrs(Oban.Plugins.Gossip) + end + + test "Oban.Plugins.Lifeline plugin" do + execute_plugin(Oban.Plugins.Lifeline, %{discarded_count: 3, rescued_count: 2}) + + assert %{ + "oban.plugin": Elixir.Oban.Plugins.Lifeline, + "oban.plugins.lifeline.discarded_count": 3, + "oban.plugins.lifeline.rescued_count": 2 + } == + receive_span_attrs(Oban.Plugins.Lifeline) + end + + test "Oban.Plugins.Pruner plugin" do + execute_plugin(Oban.Plugins.Pruner, %{pruned_count: 3}) + + assert %{ + "oban.plugin": Elixir.Oban.Plugins.Pruner, + "oban.plugins.pruner.pruned_count": 3 + } == + receive_span_attrs(Oban.Plugins.Pruner) + end + + test "Oban.Pro.Plugins.DynamicCron plugin" do + execute_plugin(Oban.Pro.Plugins.DynamicCron, %{jobs: [1, 3, 4]}) + + assert %{ + "oban.plugin": Elixir.Oban.Pro.Plugins.DynamicCron, + "oban.pro.plugins.dynamic_cron.jobs_count": 3 + } == + receive_span_attrs(Oban.Pro.Plugins.DynamicCron) + end + + test "Oban.Pro.Plugins.DynamicLifeline plugin" do + execute_plugin(Oban.Pro.Plugins.DynamicLifeline, %{discarded_count: 3, rescued_count: 2}) + + assert %{ + "oban.plugin": Elixir.Oban.Pro.Plugins.DynamicLifeline, + "oban.pro.plugins.dynamic_lifeline.discarded_count": 3, + "oban.pro.plugins.dynamic_lifeline.rescued_count": 2 + } == + receive_span_attrs(Oban.Pro.Plugins.DynamicLifeline) + end + + test "Oban.Pro.Plugins.DynamicPrioritizer plugin" do + execute_plugin(Oban.Pro.Plugins.DynamicPrioritizer, %{reprioritized_count: 3}) + + assert %{ + "oban.plugin": Elixir.Oban.Pro.Plugins.DynamicPrioritizer, + "oban.pro.plugins.dynamic_prioritizer.reprioritized_count": 3 + } == + receive_span_attrs(Oban.Pro.Plugins.DynamicPrioritizer) + end + + test "Oban.Pro.Plugins.DynamicPruner plugin" do + execute_plugin(Oban.Pro.Plugins.DynamicPruner, %{pruned_count: 3}) + + assert %{ + "oban.plugin": Elixir.Oban.Pro.Plugins.DynamicPruner, + "oban.pro.plugins.dynamic_pruner.pruned_count": 3 + } == + receive_span_attrs(Oban.Pro.Plugins.DynamicPruner) + end + + test "Oban.Pro.Plugins.DynamicScaler plugin" do + execute_plugin(Oban.Pro.Plugins.DynamicScaler, %{ + scaler: %{last_scaled_to: 3, last_scaled_at: ~U[2021-08-01 12:00:00Z]} + }) + + assert %{ + "oban.plugin": Elixir.Oban.Pro.Plugins.DynamicScaler, + "oban.pro.plugins.dynamic_scaler.scaler.last_scaled_to": 3, + "oban.pro.plugins.dynamic_scaler.scaler.last_scaled_at": "2021-08-01T12:00:00Z" + } == + receive_span_attrs(Oban.Pro.Plugins.DynamicScaler) + end + end + + defp receive_span_attrs(name) do + name = "#{name} process" + + assert_receive( + {:span, span(name: ^name, attributes: attributes)}, + 100, + "expected span with name #{name} to be received" + ) + + elem(attributes, 4) + end + + defp execute_plugin(plugin_name, metadata) do + :telemetry.execute( + [:oban, :plugin, :start], + %{system_time: System.system_time()}, + %{plugin: plugin_name} + ) + + :telemetry.execute( + [:oban, :plugin, :stop], + %{duration: 42069}, + Map.merge(metadata, %{plugin: plugin_name}) + ) + end end diff --git a/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs b/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs index c75b3bb..de3828d 100644 --- a/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs +++ b/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs @@ -47,10 +47,10 @@ defmodule OpentelemetryObanTest do assert %{ "messaging.destination": "events", "messaging.destination_kind": :queue, - "messaging.oban.job_id": _job_id, - "messaging.oban.max_attempts": 1, - "messaging.oban.priority": 0, - "messaging.oban.worker": "TestJob", + "oban.job.job_id": _job_id, + "oban.job.max_attempts": 1, + "oban.job.priority": 0, + "oban.job.worker": "TestJob", "messaging.system": :oban } = :otel_attributes.map(attributes) end @@ -147,13 +147,13 @@ defmodule OpentelemetryObanTest do assert %{ "messaging.destination": "events", "messaging.destination_kind": :queue, - "messaging.oban.attempt": 1, - "messaging.oban.inserted_at": _inserted_at, - "messaging.oban.job_id": _job_id, - "messaging.oban.max_attempts": 1, - "messaging.oban.priority": 0, - "messaging.oban.scheduled_at": _scheduled_at, - "messaging.oban.worker": "TestJob", + "oban.job.attempt": 1, + "oban.job.inserted_at": _inserted_at, + "oban.job.job_id": _job_id, + "oban.job.max_attempts": 1, + "oban.job.priority": 0, + "oban.job.scheduled_at": _scheduled_at, + "oban.job.worker": "TestJob", "messaging.operation": :process, "messaging.system": :oban } = :otel_attributes.map(attributes) @@ -177,13 +177,13 @@ defmodule OpentelemetryObanTest do assert %{ "messaging.destination": "events", "messaging.destination_kind": :queue, - "messaging.oban.attempt": 1, - "messaging.oban.inserted_at": _inserted_at, - "messaging.oban.job_id": _job_id, - "messaging.oban.max_attempts": 1, - "messaging.oban.priority": 0, - "messaging.oban.scheduled_at": _scheduled_at, - "messaging.oban.worker": "TestJobThatReturnsError", + "oban.job.attempt": 1, + "oban.job.inserted_at": _inserted_at, + "oban.job.job_id": _job_id, + "oban.job.max_attempts": 1, + "oban.job.priority": 0, + "oban.job.scheduled_at": _scheduled_at, + "oban.job.worker": "TestJobThatReturnsError", "messaging.operation": :process, "messaging.system": :oban } = :otel_attributes.map(attributes) @@ -255,13 +255,13 @@ defmodule OpentelemetryObanTest do assert %{ "messaging.destination": "events", "messaging.destination_kind": :queue, - "messaging.oban.attempt": 1, - "messaging.oban.inserted_at": _inserted_at, - "messaging.oban.job_id": _job_id, - "messaging.oban.max_attempts": 1, - "messaging.oban.priority": 0, - "messaging.oban.scheduled_at": _scheduled_at, - "messaging.oban.worker": "TestJobThatThrowsException", + "oban.job.attempt": 1, + "oban.job.inserted_at": _inserted_at, + "oban.job.job_id": _job_id, + "oban.job.max_attempts": 1, + "oban.job.priority": 0, + "oban.job.scheduled_at": _scheduled_at, + "oban.job.worker": "TestJobThatThrowsException", "messaging.operation": :process, "messaging.system": :oban } = :otel_attributes.map(attributes)