diff --git a/.github/labeler.yml b/.github/labeler.yml index 9599c08..e7f9c87 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -58,3 +58,6 @@ opentelemetry_phoenix: opentelemetry_telemetry: - utilities/opentelemetry_telemetry/**/* + +opentelemetry_oban: + - instrumentation/opentelemetry_oban/**/* diff --git a/.github/workflows/elixir.yml b/.github/workflows/elixir.yml index b5dc615..ce6c7fb 100644 --- a/.github/workflows/elixir.yml +++ b/.github/workflows/elixir.yml @@ -141,3 +141,49 @@ jobs: run: mix format --check-formatted - name: Test run: mix test + + opentelemetry-oban: + needs: [test-matrix] + if: (contains(github.event.pull_request.labels.*.name, 'elixir') && contains(github.event.pull_request.labels.*.name, 'opentelemetry_oban')) + env: + app: 'opentelemetry_oban' + defaults: + run: + working-directory: instrumentation/${{ env.app }} + runs-on: ubuntu-18.04 + name: Opentelemetry Oban test on Elixir ${{ matrix.elixir_version }} (OTP ${{ matrix.otp_version }}) + strategy: + fail-fast: false + matrix: ${{ fromJson(needs.test-matrix.outputs.matrix) }} + services: + postgres: + image: circleci/postgres:13.3-ram + ports: ['5432:5432'] + options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: opentelemetry_oban_test + steps: + - uses: actions/checkout@v2 + - uses: erlef/setup-beam@v1 + with: + otp-version: ${{ matrix.otp_version }} + elixir-version: ${{ matrix.elixir_version }} + rebar3-version: ${{ matrix.rebar3_version }} + - name: Cache + uses: actions/cache@v2 + with: + path: | + instrumentation/${{ env.app }}/deps + instrumentation/${{ env.app }}/_build + key: ${{ runner.os }}-build-${{ matrix.otp_version }}-${{ matrix.elixir_version }}-v3-${{ hashFiles(format('{0}{1}', github.workspace, 'instrumentation/${{ env.app }}/mix.lock')) }} + - name: Fetch deps + if: steps.deps-cache.outputs.cache-hit != 'true' + run: mix deps.get + - name: Compile project + run: mix compile --warnings-as-errors + - name: Check formatting + run: mix format --check-formatted + - name: Test + run: mix test diff --git a/CODEOWNERS b/CODEOWNERS index 542242b..d3f6c57 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -16,5 +16,6 @@ /instrumentation/opentelemetry_cowboy @bryannaegele @tsloughter /instrumentation/opentelemetry_ecto @bryannaegele @tsloughter +/instrumentation/opentelemetry_oban @indrekj /instrumentation/opentelemetry_phoenix @bryannaegele @tsloughter /utilities/opentelemetry_telemetry @bryannaegele @tsloughter diff --git a/instrumentation/opentelemetry_oban/.formatter.exs b/instrumentation/opentelemetry_oban/.formatter.exs new file mode 100644 index 0000000..d2cda26 --- /dev/null +++ b/instrumentation/opentelemetry_oban/.formatter.exs @@ -0,0 +1,4 @@ +# Used by "mix format" +[ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/instrumentation/opentelemetry_oban/.gitignore b/instrumentation/opentelemetry_oban/.gitignore new file mode 100644 index 0000000..ef8bb5d --- /dev/null +++ b/instrumentation/opentelemetry_oban/.gitignore @@ -0,0 +1,24 @@ +# The directory Mix will write compiled artifacts to. +/_build/ + +# If you run "mix test --cover", coverage assets end up here. +/cover/ + +# The directory Mix downloads your dependencies sources to. +/deps/ + +# Where third-party dependencies like ExDoc output generated docs. +/doc/ + +# Ignore .fetch files in case you like to edit your project deps locally. +/.fetch + +# If the VM crashes, it generates a dump, let's ignore it too. +erl_crash.dump + +# Also ignore archive artifacts (built via "mix archive.build"). +*.ez + +# Ignore package tarball (built via "mix hex.build"). +opentelemetry_oban-*.tar + diff --git a/instrumentation/opentelemetry_oban/.tool-versions b/instrumentation/opentelemetry_oban/.tool-versions new file mode 100644 index 0000000..0113092 --- /dev/null +++ b/instrumentation/opentelemetry_oban/.tool-versions @@ -0,0 +1,2 @@ +erlang 23.2 +elixir 1.11 diff --git a/instrumentation/opentelemetry_oban/CHANGELOG.md b/instrumentation/opentelemetry_oban/CHANGELOG.md new file mode 100644 index 0000000..e1f3f91 --- /dev/null +++ b/instrumentation/opentelemetry_oban/CHANGELOG.md @@ -0,0 +1,5 @@ +# Changelog + +## 0.1.0 + +* Initial release diff --git a/instrumentation/opentelemetry_oban/LICENSE b/instrumentation/opentelemetry_oban/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/instrumentation/opentelemetry_oban/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/instrumentation/opentelemetry_oban/README.md b/instrumentation/opentelemetry_oban/README.md new file mode 100644 index 0000000..e175aed --- /dev/null +++ b/instrumentation/opentelemetry_oban/README.md @@ -0,0 +1,54 @@ +# OpentelemetryOban + +OpentelemetryOban uses [telemetry](https://hexdocs.pm/telemetry/) handlers to +create `OpenTelemetry` spans from Oban events. + +## Installation + +The package can be installed by adding `opentelemetry_oban` to your list of +dependencies in `mix.exs`: + +```elixir +def deps do + [ + {:opentelemetry_oban, "~> 0.1"} + ] +end +``` + +In your application start: + +```elixir + def start(_type, _args) do + OpentelemetryOban.setup() + + # ... + end +``` + +## Usage + +By default a new trace is automatically started when a job is processed. + +To also record a span when a job is created and to link traces together +`Oban.insert/2` has to be replaced by `OpentelemetryOban.insert/2`. + +Before: + +```elixir + %{id: 1, in_the: "business", of_doing: "business"} + |> MyApp.Business.new() + |> Oban.insert() +``` + +After: + +```elixir + %{id: 1, in_the: "business", of_doing: "business"} + |> MyApp.Business.new() + |> OpentelemetryOban.insert() +``` + +Oban also supports inserting jobs using `Oban.insert/4`, `Oban.insert_all/2` +and `Oban.insert_all/4`. These are currently not supported by OpentelemetryOban +and are just proxied through to Oban. diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex new file mode 100644 index 0000000..5f76963 --- /dev/null +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex @@ -0,0 +1,150 @@ +defmodule OpentelemetryOban do + @moduledoc """ + OpentelemetryOban uses [telemetry](https://hexdocs.pm/telemetry/) handlers to + create `OpenTelemetry` spans from Oban events. + + Supported events include job start/stop and also when an exception is raised. + + ## Usage + + In your application start: + + def start(_type, _args) do + OpentelemetryOban.setup() + + # ... + end + """ + + alias Ecto.Changeset + alias OpenTelemetry.Span + + require OpenTelemetry.Tracer + + @tracer_id :opentelemetry_oban + + @doc """ + Initializes and configures telemetry handlers. + + By default jobs and plugins are traced. If you wish to trace only jobs then + use: + + OpentelemetryOban.setup(trace: [:jobs]) + + Note that if you don't trace plugins, but inside the plugins, there are spans + from other instrumentation libraries (e.g. ecto) then these will still be + traced. This setting controls only the spans that are created by + opentelemetry_oban. + """ + @spec setup() :: :ok + def setup(opts \\ []) do + {:ok, otel_tracer_vsn} = :application.get_key(@tracer_id, :vsn) + OpenTelemetry.register_tracer(@tracer_id, otel_tracer_vsn) + + trace = Keyword.get(opts, :trace, [:jobs, :plugins]) + + if Enum.member?(trace, :jobs) do + OpentelemetryOban.JobHandler.attach() + end + + if Enum.member?(trace, :plugins) do + OpentelemetryOban.PluginHandler.attach() + end + + :ok + end + + def insert(name \\ Oban, %Changeset{} = changeset) do + attributes = attributes_before_insert(changeset) + worker = Changeset.get_field(changeset, :worker, "unknown") + + OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do + changeset = add_tracing_information_to_meta(changeset) + + case Oban.insert(name, changeset) do + {:ok, job} -> + OpenTelemetry.Tracer.set_attributes(attributes_after_insert(job)) + {:ok, job} + + other -> + other + end + end + end + + def insert(name \\ Oban, multi, multi_name, changeset_or_fun) do + Oban.insert(name, multi, multi_name, changeset_or_fun) + end + + def insert!(name \\ Oban, %Changeset{} = changeset) do + attributes = attributes_before_insert(changeset) + worker = Changeset.get_field(changeset, :worker, "unknown") + + OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do + changeset = add_tracing_information_to_meta(changeset) + + try do + job = Oban.insert!(name, changeset) + OpenTelemetry.Tracer.set_attributes(attributes_after_insert(job)) + job + rescue + exception -> + ctx = OpenTelemetry.Tracer.current_span_ctx() + Span.record_exception(ctx, exception, __STACKTRACE__) + Span.set_status(ctx, OpenTelemetry.status(:error, "")) + reraise exception, __STACKTRACE__ + end + end + end + + def insert_all(name \\ Oban, changesets_or_wrapper) + + def insert_all(name, %{changesets: changesets}) when is_list(changesets) do + insert_all(name, changesets) + end + + def insert_all(name, changesets) when is_list(changesets) do + # changesets in insert_all can include different workers and different + # queues. This means we cannot provide much information here, but we can + # still record the insert and propagate the context information. + OpenTelemetry.Tracer.with_span "Oban bulk insert", kind: :producer do + changesets = Enum.map(changesets, &add_tracing_information_to_meta/1) + Oban.insert_all(name, changesets) + end + end + + def insert_all(name \\ __MODULE__, multi, multi_name, changesets_or_wrapper) do + Oban.insert_all(name, multi, multi_name, changesets_or_wrapper) + end + + defp add_tracing_information_to_meta(changeset) do + meta = Changeset.get_field(changeset, :meta, %{}) + + new_meta = + [] + |> :otel_propagator_text_map.inject() + |> Enum.into(meta) + + Changeset.change(changeset, %{meta: new_meta}) + end + + defp attributes_before_insert(changeset) do + queue = Changeset.get_field(changeset, :queue, "unknown") + worker = Changeset.get_field(changeset, :worker, "unknown") + + [ + "messaging.system": "oban", + "messaging.destination": queue, + "messaging.destination_kind": "queue", + "messaging.oban.worker": worker + ] + end + + defp attributes_after_insert(job) do + [ + "messaging.oban.job_id": job.id, + "messaging.oban.priority": job.priority, + "messaging.oban.max_attempts": job.max_attempts + ] + end +end diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex new file mode 100644 index 0000000..79ff213 --- /dev/null +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex @@ -0,0 +1,101 @@ +defmodule OpentelemetryOban.JobHandler do + alias OpenTelemetry.Span + + @tracer_id :opentelemetry_oban + + def attach() do + attach_job_start_handler() + attach_job_stop_handler() + attach_job_exception_handler() + end + + defp attach_job_start_handler() do + :telemetry.attach( + "#{__MODULE__}.job_start", + [:oban, :job, :start], + &__MODULE__.handle_job_start/4, + [] + ) + end + + defp attach_job_stop_handler() do + :telemetry.attach( + "#{__MODULE__}.job_stop", + [:oban, :job, :stop], + &__MODULE__.handle_job_stop/4, + [] + ) + end + + defp attach_job_exception_handler() do + :telemetry.attach( + "#{__MODULE__}.job_exception", + [:oban, :job, :exception], + &__MODULE__.handle_job_exception/4, + [] + ) + end + + def handle_job_start(_event, _measurements, metadata, _config) do + %{ + job: %{ + id: id, + queue: queue, + worker: worker, + priority: priority, + inserted_at: inserted_at, + scheduled_at: scheduled_at, + attempt: attempt, + max_attempts: max_attempts, + meta: job_meta + } + } = metadata + + :otel_propagator_text_map.extract(Map.to_list(job_meta)) + parent = OpenTelemetry.Tracer.current_span_ctx() + links = if parent == :undefined, do: [], else: [OpenTelemetry.link(parent)] + OpenTelemetry.Tracer.set_current_span(:undefined) + + attributes = [ + "messaging.system": "oban", + "messaging.destination": queue, + "messaging.destination_kind": "queue", + "messaging.operation": "process", + "messaging.oban.job_id": id, + "messaging.oban.worker": worker, + "messaging.oban.priority": priority, + "messaging.oban.attempt": attempt, + "messaging.oban.max_attempts": max_attempts, + "messaging.oban.inserted_at": + if(inserted_at, do: DateTime.to_iso8601(inserted_at), else: nil), + "messaging.oban.scheduled_at": DateTime.to_iso8601(scheduled_at) + ] + + span_name = "#{worker} process" + + OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name, metadata, %{ + kind: :consumer, + links: links, + attributes: attributes + }) + end + + def handle_job_stop(_event, _measurements, metadata, _config) do + OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) + end + + def handle_job_exception( + _event, + _measurements, + %{stacktrace: stacktrace, error: error} = metadata, + _config + ) do + ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata) + + # Record exception and mark the span as errored + Span.record_exception(ctx, error, stacktrace) + Span.set_status(ctx, OpenTelemetry.status(:error, "")) + + OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) + end +end diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex new file mode 100644 index 0000000..2ff2607 --- /dev/null +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex @@ -0,0 +1,66 @@ +defmodule OpentelemetryOban.PluginHandler do + alias OpenTelemetry.Span + + @tracer_id :opentelemetry_oban + + def attach() do + attach_plugin_start_handler() + attach_plugin_stop_handler() + attach_plugin_exception_handler() + end + + defp attach_plugin_start_handler() do + :telemetry.attach( + "#{__MODULE__}.plugin_start", + [:oban, :plugin, :start], + &__MODULE__.handle_plugin_start/4, + [] + ) + end + + defp attach_plugin_stop_handler() do + :telemetry.attach( + "#{__MODULE__}.plugin_stop", + [:oban, :plugin, :stop], + &__MODULE__.handle_plugin_stop/4, + [] + ) + end + + defp attach_plugin_exception_handler() do + :telemetry.attach( + "#{__MODULE__}.plugin_exception", + [:oban, :plugin, :exception], + &__MODULE__.handle_plugin_exception/4, + [] + ) + end + + def handle_plugin_start(_event, _measurements, %{plugin: plugin} = metadata, _config) do + OpentelemetryTelemetry.start_telemetry_span( + @tracer_id, + "#{plugin} process", + metadata, + %{} + ) + end + + def handle_plugin_stop(_event, _measurements, metadata, _config) do + OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) + end + + def handle_plugin_exception( + _event, + _measurements, + %{stacktrace: stacktrace, error: error} = metadata, + _config + ) do + ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata) + + # Record exception and mark the span as errored + Span.record_exception(ctx, error, stacktrace) + Span.set_status(ctx, OpenTelemetry.status(:error, "")) + + OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) + end +end diff --git a/instrumentation/opentelemetry_oban/mix.exs b/instrumentation/opentelemetry_oban/mix.exs new file mode 100644 index 0000000..584f6b5 --- /dev/null +++ b/instrumentation/opentelemetry_oban/mix.exs @@ -0,0 +1,53 @@ +defmodule OpentelemetryOban.MixProject do + use Mix.Project + + def project do + [ + app: :opentelemetry_oban, + version: "0.2.0-rc.3", + elixir: "~> 1.10", + start_permanent: Mix.env() == :prod, + deps: deps(), + docs: [ + main: "OpentelemetryOban", + extras: ["README.md"] + ], + elixirc_paths: elixirc_paths(Mix.env()), + package: [ + name: "opentelemetry_oban", + description: "OpenTelemetry tracing for Oban", + maintainers: ["Glia TechMovers"], + licenses: ["Apache-2.0"], + links: %{ + "GitHub" => "https://github.com/open-telemetry/opentelemetry-erlang-contrib", + "OpenTelemetry Erlang" => "https://github.com/open-telemetry/opentelemetry-erlang", + "OpenTelemetry.io" => "https://opentelemetry.io" + }, + files: ~w(lib .formatter.exs mix.exs README* LICENSE* CHANGELOG*) + ] + ] + end + + # Run "mix help compile.app" to learn about applications. + def application do + [ + extra_applications: [] + ] + end + + # Run "mix help deps" to learn about dependencies. + defp deps do + [ + {:oban, "~> 2.0"}, + {:opentelemetry_api, "~> 1.0.0-rc.3"}, + {:opentelemetry_telemetry, "~> 1.0.0-beta"}, + {:opentelemetry, "~> 1.0.0-rc.3", only: [:test]}, + {:opentelemetry_exporter, "~> 1.0.0-rc.3", only: [:test]}, + {:telemetry, "~> 0.4 or ~> 1.0"}, + {:ex_doc, "~> 0.24", only: [:dev], runtime: false} + ] + end + + defp elixirc_paths(:test), do: ["lib", "test/support"] + defp elixirc_paths(_), do: ["lib"] +end diff --git a/instrumentation/opentelemetry_oban/mix.lock b/instrumentation/opentelemetry_oban/mix.lock new file mode 100644 index 0000000..29aa3ed --- /dev/null +++ b/instrumentation/opentelemetry_oban/mix.lock @@ -0,0 +1,28 @@ +%{ + "acceptor_pool": {:hex, :acceptor_pool, "1.0.0", "43c20d2acae35f0c2bcd64f9d2bde267e459f0f3fd23dab26485bf518c281b21", [:rebar3], [], "hexpm", "0cbcd83fdc8b9ad2eee2067ef8b91a14858a5883cb7cd800e6fcd5803e158788"}, + "chatterbox": {:hex, :ts_chatterbox, "0.11.0", "b8f372c706023eb0de5bf2976764edb27c70fe67052c88c1f6a66b3a5626847f", [:rebar3], [{:hpack, "~>0.2.3", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "722fe2bad52913ab7e87d849fc6370375f0c961ffb2f0b5e6d647c9170c382a6"}, + "connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"}, + "ctx": {:hex, :ctx, "0.6.0", "8ff88b70e6400c4df90142e7f130625b82086077a45364a78d208ed3ed53c7fe", [:rebar3], [], "hexpm", "a14ed2d1b67723dbebbe423b28d7615eb0bdcba6ff28f2d1f1b0a7e1d4aa5fc2"}, + "db_connection": {:hex, :db_connection, "2.4.0", "d04b1b73795dae60cead94189f1b8a51cc9e1f911c234cc23074017c43c031e5", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ad416c21ad9f61b3103d254a71b63696ecadb6a917b36f563921e0de00d7d7c8"}, + "decimal": {:hex, :decimal, "2.0.0", "a78296e617b0f5dd4c6caf57c714431347912ffb1d0842e998e9792b5642d697", [:mix], [], "hexpm", "34666e9c55dea81013e77d9d87370fe6cb6291d1ef32f46a1600230b1d44f577"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.15", "b29e8e729f4aa4a00436580dcc2c9c5c51890613457c193cc8525c388ccb2f06", [:mix], [], "hexpm", "044523d6438ea19c1b8ec877ec221b008661d3c27e3b848f4c879f500421ca5c"}, + "ecto": {:hex, :ecto, "3.7.1", "a20598862351b29f80f285b21ec5297da1181c0442687f9b8329f0445d228892", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d36e5b39fc479e654cffd4dbe1865d9716e4a9b6311faff799b6f90ab81b8638"}, + "ecto_sql": {:hex, :ecto_sql, "3.7.0", "2fcaad4ab0c8d76a5afbef078162806adbe709c04160aca58400d5cbbe8eeac6", [:mix], [{:db_connection, "~> 2.2", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.4.0 or ~> 0.5.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.15.0 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a26135dfa1d99bf87a928c464cfa25bba6535a4fe761eefa56077a4febc60f70"}, + "ex_doc": {:hex, :ex_doc, "0.25.3", "3edf6a0d70a39d2eafde030b8895501b1c93692effcbd21347296c18e47618ce", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "9ebebc2169ec732a38e9e779fd0418c9189b3ca93f4a676c961be6c1527913f5"}, + "gproc": {:hex, :gproc, "0.8.0", "cea02c578589c61e5341fce149ea36ccef236cc2ecac8691fba408e7ea77ec2f", [:rebar3], [], "hexpm", "580adafa56463b75263ef5a5df4c86af321f68694e7786cb057fd805d1e2a7de"}, + "grpcbox": {:hex, :grpcbox, "0.14.0", "3eb321bcd2275baf8b54cf381feb7b0559a50c02544de28fda039c7f2f9d1a7a", [:rebar3], [{:acceptor_pool, "~>1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~>0.11.0", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~>0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~>0.8.0", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "e24159b7b6d3f9869bbe528845c0125fed2259366ba908fd04a1f45fe81d0660"}, + "hpack": {:hex, :hpack_erl, "0.2.3", "17670f83ff984ae6cd74b1c456edde906d27ff013740ee4d9efaa4f1bf999633", [:rebar3], [], "hexpm", "06f580167c4b8b8a6429040df36cc93bba6d571faeaec1b28816523379cbb23a"}, + "jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"}, + "makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"}, + "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"}, + "oban": {:hex, :oban, "2.8.0", "e44b19a30e30bb983099f55d59749316ff0eaf5dfef4214e1190738176653e50", [:mix], [{:ecto_sql, ">= 3.4.3", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2954a2ac418f7cc4217c0772a3dd3a70e2966240583b97f4126a489e1300a573"}, + "opentelemetry": {:hex, :opentelemetry, "1.0.0-rc.3", "d2698bee882c354274563ee85d097bb736a9adb8d8ed376a4deea0cd3a14bb31", [:rebar3], [{:opentelemetry_api, "~> 1.0.0-rc.3", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "c9105933df0d783d94cf08d79206eb8d6578abc0bcbd498d0b497ec62a4e30a8"}, + "opentelemetry_api": {:hex, :opentelemetry_api, "1.0.0-rc.3.1", "d183663c178f317a109a267b3c3664d09db22829a4d4eea8d9af46ed3e5bee05", [:mix, :rebar3], [], "hexpm", "4b836cec1b531080c310fa54afca6e523984a1f6c1aeb5d4da537dad9e309ce9"}, + "opentelemetry_exporter": {:hex, :opentelemetry_exporter, "1.0.0-rc.3", "76f5657d4c94a12003d9ed2c8da1023c815e98f5553184dbb0cdaeec76db676d", [:rebar3], [{:grpcbox, ">= 0.0.0", [hex: :grpcbox, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.0.0-rc.3", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.0.0-rc.3", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "267f0e4c3f1f5557cc7ad6ac71d66b8eaf7b3b56fde942c21f8a0bc96174fe1e"}, + "opentelemetry_telemetry": {:hex, :opentelemetry_telemetry, "1.0.0-beta.2", "b840eee9e68307ad7fa4ee316da19db3f8e30763b87737d3304782ca3cc296a2", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.0.0-rc.1", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_registry, "~> 0.2.1", [hex: :telemetry_registry, repo: "hexpm", optional: false]}], "hexpm", "e8b12f42614d0aeb6a49001c75ca035544950f736fdbb240177838674f99e1e2"}, + "postgrex": {:hex, :postgrex, "0.15.10", "2809dee1b1d76f7cbabe570b2a9285c2e7b41be60cf792f5f2804a54b838a067", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "1560ca427542f6b213f8e281633ae1a3b31cdbcd84ebd7f50628765b8f6132be"}, + "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, + "telemetry_registry": {:hex, :telemetry_registry, "0.2.1", "fe648a691f2128e4279d993cd010994c67f282354dc061e697bf070d4b87b480", [:mix, :rebar3], [{:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4221cefbcadd0b3e7076960339223742d973f1371bc20f3826af640257bc3690"}, +} diff --git a/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs b/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs new file mode 100644 index 0000000..a868c73 --- /dev/null +++ b/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs @@ -0,0 +1,112 @@ +defmodule OpentelemetryOban.PluginHandlerTest do + use DataCase + + require OpenTelemetry.Tracer + require OpenTelemetry.Span + require Record + + for {name, spec} <- Record.extract_all(from_lib: "opentelemetry/include/otel_span.hrl") do + Record.defrecord(name, spec) + end + + for {name, spec} <- Record.extract_all(from_lib: "opentelemetry_api/include/opentelemetry.hrl") do + Record.defrecord(name, spec) + end + + setup do + :application.stop(:opentelemetry) + :application.set_env(:opentelemetry, :tracer, :otel_tracer_default) + + :application.set_env(:opentelemetry, :processors, [ + {:otel_batch_processor, %{scheduled_delay_ms: 1, exporter: {:otel_exporter_pid, self()}}} + ]) + + :application.start(:opentelemetry) + + TestHelpers.remove_oban_handlers() + OpentelemetryOban.setup() + + :ok + end + + test "does not create spans when tracing plugins is disabled" do + TestHelpers.remove_oban_handlers() + OpentelemetryOban.setup(trace: [:jobs]) + + :telemetry.execute( + [:oban, :plugin, :start], + %{system_time: System.system_time()}, + %{plugin: Elixir.Oban.Plugins.Stager} + ) + + :telemetry.execute( + [:oban, :plugin, :stop], + %{duration: 444}, + %{plugin: Elixir.Oban.Plugins.Stager} + ) + + refute_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")} + end + + test "records span on plugin execution" do + :telemetry.execute( + [:oban, :plugin, :start], + %{system_time: System.system_time()}, + %{plugin: Elixir.Oban.Plugins.Stager} + ) + + :telemetry.execute( + [:oban, :plugin, :stop], + %{duration: 444}, + %{plugin: Elixir.Oban.Plugins.Stager} + ) + + assert_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")} + end + + test "records span on plugin error" do + :telemetry.execute( + [:oban, :plugin, :start], + %{system_time: System.system_time()}, + %{plugin: Elixir.Oban.Plugins.Stager} + ) + + :telemetry.execute( + [:oban, :plugin, :exception], + %{duration: 444}, + %{ + plugin: Elixir.Oban.Plugins.Stager, + kind: :error, + stacktrace: [ + {Some, :error, [], []} + ], + error: %UndefinedFunctionError{ + arity: 0, + function: :error, + message: nil, + module: Some, + reason: nil + } + } + ) + + expected_status = OpenTelemetry.status(:error, "") + + assert_receive {:span, + span( + name: "Elixir.Oban.Plugins.Stager process", + events: [ + event( + name: "exception", + attributes: [ + {"exception.type", "Elixir.UndefinedFunctionError"}, + {"exception.message", + "function Some.error/0 is undefined (module Some is not available)"}, + {"exception.stacktrace", _stacktrace} + ] + ) + ], + status: ^expected_status + )} + end +end diff --git a/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs b/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs new file mode 100644 index 0000000..274e206 --- /dev/null +++ b/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs @@ -0,0 +1,379 @@ +defmodule OpentelemetryObanTest do + use DataCase + + doctest OpentelemetryOban + + require OpenTelemetry.Tracer + require OpenTelemetry.Span + require Record + + for {name, spec} <- Record.extract_all(from_lib: "opentelemetry/include/otel_span.hrl") do + Record.defrecord(name, spec) + end + + for {name, spec} <- Record.extract_all(from_lib: "opentelemetry_api/include/opentelemetry.hrl") do + Record.defrecord(name, spec) + end + + setup do + :application.stop(:opentelemetry) + :application.set_env(:opentelemetry, :tracer, :otel_tracer_default) + + :application.set_env(:opentelemetry, :processors, [ + {:otel_batch_processor, %{scheduled_delay_ms: 1, exporter: {:otel_exporter_pid, self()}}} + ]) + + :application.start(:opentelemetry) + + TestHelpers.remove_oban_handlers() + OpentelemetryOban.setup() + + :ok + end + + test "records span on job insertion" do + OpentelemetryOban.insert(TestJob.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + + assert_receive {:span, + span( + name: "TestJob send", + attributes: attributes, + parent_span_id: :undefined, + kind: :producer, + status: :undefined + )} + + assert [ + "messaging.destination": "events", + "messaging.destination_kind": "queue", + "messaging.oban.job_id": _job_id, + "messaging.oban.max_attempts": 1, + "messaging.oban.priority": 0, + "messaging.oban.worker": "TestJob", + "messaging.system": "oban" + ] = List.keysort(attributes, 0) + end + + test "job creation uses existing trace if present" do + OpenTelemetry.Tracer.with_span "test span" do + ctx = OpenTelemetry.Tracer.current_span_ctx() + root_trace_id = OpenTelemetry.Span.trace_id(ctx) + root_span_id = OpenTelemetry.Span.span_id(ctx) + + OpentelemetryOban.insert(TestJob.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + + assert_receive {:span, + span( + name: "TestJob send", + attributes: _attributes, + trace_id: ^root_trace_id, + parent_span_id: ^root_span_id, + kind: :producer, + status: :undefined + )} + end + end + + test "keeps existing meta information" do + OpentelemetryOban.insert(TestJob.new(%{}, meta: %{foo: "bar"})) + + assert [job] = all_enqueued() + assert job.meta["foo"] == "bar" + end + + test "tracing information is propagated between send and process" do + OpentelemetryOban.insert(TestJob.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + + assert_receive {:span, + span( + name: "TestJob send", + attributes: _attributes, + trace_id: send_trace_id, + span_id: send_span_id, + kind: :producer, + status: :undefined + )} + + assert_receive {:span, + span( + name: "TestJob process", + attributes: _attributes, + kind: :consumer, + status: :undefined, + trace_id: process_trace_id, + links: [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] + )} + + # Process is ran asynchronously so we create a new trace, but still link + # the traces together. + assert send_trace_id != process_trace_id + end + + test "no link is created on process when tracing info was not propagated" do + # Using regular Oban, instead of OpentelemetryOban + Oban.insert(TestJob.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + + assert_receive {:span, + span( + name: "TestJob process", + attributes: _attributes, + kind: :consumer, + status: :undefined, + trace_id: _trace_id, + links: [] + )} + end + + test "records spans for successful Oban jobs" do + OpentelemetryOban.insert(TestJob.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + + assert_receive {:span, + span( + name: "TestJob process", + attributes: attributes, + kind: :consumer, + status: :undefined + )} + + assert [ + "messaging.destination": "events", + "messaging.destination_kind": "queue", + "messaging.oban.attempt": 1, + "messaging.oban.inserted_at": _inserted_at, + "messaging.oban.job_id": _job_id, + "messaging.oban.max_attempts": 1, + "messaging.oban.priority": 0, + "messaging.oban.scheduled_at": _scheduled_at, + "messaging.oban.worker": "TestJob", + "messaging.operation": "process", + "messaging.system": "oban" + ] = List.keysort(attributes, 0) + end + + test "records spans for Oban jobs that stop with {:error, :something}" do + OpentelemetryOban.insert(TestJobThatReturnsError.new(%{})) + assert %{success: 0, failure: 1} = Oban.drain_queue(queue: :events) + + expected_status = OpenTelemetry.status(:error, "") + + assert_receive {:span, + span( + name: "TestJobThatReturnsError process", + attributes: attributes, + kind: :consumer, + events: [ + event( + name: "exception", + attributes: [ + {"exception.type", "Elixir.Oban.PerformError"}, + {"exception.message", + "TestJobThatReturnsError failed with {:error, :something}"}, + {"exception.stacktrace", _stacktrace} + ] + ) + ], + status: ^expected_status + )} + + assert [ + "messaging.destination": "events", + "messaging.destination_kind": "queue", + "messaging.oban.attempt": 1, + "messaging.oban.inserted_at": _inserted_at, + "messaging.oban.job_id": _job_id, + "messaging.oban.max_attempts": 1, + "messaging.oban.priority": 0, + "messaging.oban.scheduled_at": _scheduled_at, + "messaging.oban.worker": "TestJobThatReturnsError", + "messaging.operation": "process", + "messaging.system": "oban" + ] = List.keysort(attributes, 0) + end + + test "records spans for each retry" do + OpentelemetryOban.insert(TestJobThatReturnsError.new(%{}, max_attempts: 2)) + + assert %{success: 0, failure: 2} = + Oban.drain_queue(queue: :events, with_scheduled: true, with_recursion: true) + + expected_status = OpenTelemetry.status(:error, "") + + assert_receive {:span, + span( + name: "TestJobThatReturnsError send", + trace_id: send_trace_id, + span_id: send_span_id + )} + + assert_receive {:span, + span( + name: "TestJobThatReturnsError process", + status: ^expected_status, + trace_id: first_process_trace_id, + links: [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] + )} + + assert_receive {:span, + span( + name: "TestJobThatReturnsError process", + status: ^expected_status, + trace_id: second_process_trace_id, + links: [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] + )} + + assert first_process_trace_id != second_process_trace_id + end + + test "records spans for Oban jobs that stop with an exception" do + OpentelemetryOban.insert(TestJobThatThrowsException.new(%{})) + assert %{success: 0, failure: 1} = Oban.drain_queue(queue: :events) + + expected_status = OpenTelemetry.status(:error, "") + + assert_receive {:span, + span( + name: "TestJobThatThrowsException process", + attributes: attributes, + kind: :consumer, + events: [ + event( + name: "exception", + attributes: [ + {"exception.type", "Elixir.UndefinedFunctionError"}, + {"exception.message", + "function Some.error/0 is undefined (module Some is not available)"}, + {"exception.stacktrace", _stacktrace} + ] + ) + ], + status: ^expected_status + )} + + assert [ + "messaging.destination": "events", + "messaging.destination_kind": "queue", + "messaging.oban.attempt": 1, + "messaging.oban.inserted_at": _inserted_at, + "messaging.oban.job_id": _job_id, + "messaging.oban.max_attempts": 1, + "messaging.oban.priority": 0, + "messaging.oban.scheduled_at": _scheduled_at, + "messaging.oban.worker": "TestJobThatThrowsException", + "messaging.operation": "process", + "messaging.system": "oban" + ] = List.keysort(attributes, 0) + end + + test "spans inside the job are associated with the job trace" do + OpentelemetryOban.insert(TestJobWithInnerSpan.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + + assert_receive {:span, + span( + name: "TestJobWithInnerSpan process", + kind: :consumer, + trace_id: trace_id, + span_id: process_span_id + )} + + assert_receive {:span, + span( + name: "span inside the job", + kind: :internal, + trace_id: ^trace_id, + parent_span_id: ^process_span_id + )} + end + + test "OpentelemetryOban.insert!/2 returns job on successful insert" do + %Oban.Job{} = OpentelemetryOban.insert!(TestJob.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + assert_receive {:span, span(name: "TestJob send")} + assert_receive {:span, span(name: "TestJob process")} + end + + test "OpentelemetryOban.insert!/2 raises an error on failed insert" do + assert_raise( + Ecto.InvalidChangesetError, + fn -> OpentelemetryOban.insert!(TestJob.new(%{}, max_attempts: -1)) end + ) + + assert %{success: 0, failure: 0} = Oban.drain_queue(queue: :events) + + expected_status = OpenTelemetry.status(:error, "") + + assert_receive {:span, + span( + name: "TestJob send", + events: [ + event( + name: "exception", + attributes: [ + {"exception.type", "Elixir.Ecto.InvalidChangesetError"}, + {"exception.message", _message}, + {"exception.stacktrace", _stacktrace} + ] + ) + ], + status: ^expected_status + )} + + refute_received {:span, span(name: "TestJob process")} + end + + test "tracing information is propagated when using insert_all/2" do + OpentelemetryOban.insert_all([ + TestJob.new(%{}), + TestJob.new(%{}) + ]) + + assert %{success: 2, failure: 0} = Oban.drain_queue(queue: :events) + + assert_receive {:span, + span( + name: "Oban bulk insert", + attributes: _attributes, + trace_id: send_trace_id, + span_id: send_span_id, + kind: :producer, + status: :undefined + )} + + assert_receive {:span, + span( + name: "TestJob process", + attributes: _attributes, + kind: :consumer, + status: :undefined, + trace_id: first_process_trace_id, + links: [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] + )} + + assert_receive {:span, + span( + name: "TestJob process", + attributes: _attributes, + kind: :consumer, + status: :undefined, + trace_id: second_process_trace_id, + links: [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] + )} + + # Process is ran asynchronously so we create a new trace, but still link + # the traces together. + assert send_trace_id != first_process_trace_id + assert send_trace_id != second_process_trace_id + assert first_process_trace_id != second_process_trace_id + end + + test "works with Oban.Testing.perform_job helper function" do + Oban.Testing.perform_job(TestJob, %{}, repo: TestRepo) + + assert_receive {:span, span(name: "TestJob process")} + end +end diff --git a/instrumentation/opentelemetry_oban/test/support/data_case.ex b/instrumentation/opentelemetry_oban/test/support/data_case.ex new file mode 100644 index 0000000..793116a --- /dev/null +++ b/instrumentation/opentelemetry_oban/test/support/data_case.ex @@ -0,0 +1,32 @@ +defmodule DataCase do + @moduledoc """ + This module defines the setup for tests requiring access to the data layer. + + You may define functions here to be used as helpers in your tests. + + Finally, if the test case interacts with the database, it cannot be async. + For this reason, every test runs inside a transaction which is reset at the + beginning of the test unless the test case is marked as async. + """ + + use ExUnit.CaseTemplate + + using do + quote do + use Oban.Testing, repo: TestRepo + + import Ecto + import DataCase + end + end + + setup tags do + :ok = Ecto.Adapters.SQL.Sandbox.checkout(TestRepo) + + unless tags[:async] do + Ecto.Adapters.SQL.Sandbox.mode(TestRepo, {:shared, self()}) + end + + :ok + end +end diff --git a/instrumentation/opentelemetry_oban/test/support/test_repo.ex b/instrumentation/opentelemetry_oban/test/support/test_repo.ex new file mode 100644 index 0000000..63cdd3a --- /dev/null +++ b/instrumentation/opentelemetry_oban/test/support/test_repo.ex @@ -0,0 +1,5 @@ +defmodule TestRepo do + use Ecto.Repo, + otp_app: :opentelemetry_oban, + adapter: Ecto.Adapters.Postgres +end diff --git a/instrumentation/opentelemetry_oban/test/test_helper.exs b/instrumentation/opentelemetry_oban/test/test_helper.exs new file mode 100644 index 0000000..f052e44 --- /dev/null +++ b/instrumentation/opentelemetry_oban/test/test_helper.exs @@ -0,0 +1,74 @@ +ExUnit.start() + +TestRepo.start_link( + database: "opentelemetry_oban_test", + hostname: "localhost", + username: "postgres", + password: "postgres", + pool: Ecto.Adapters.SQL.Sandbox +) + +Ecto.Adapters.SQL.Sandbox.mode(TestRepo, {:shared, self()}) + +defmodule PrepareOban do + use Ecto.Migration + def up, do: Oban.Migrations.up() +end + +Ecto.Migrator.run(TestRepo, [{0, PrepareOban}], :up, all: true) +TestRepo.query("TRUNCATE oban_jobs", []) + +Oban.start_link( + repo: TestRepo, + plugins: [Oban.Plugins.Pruner], + queues: [default: 10, events: 50] +) + +defmodule TestJob do + use Oban.Worker, queue: :events, max_attempts: 1 + + @impl Oban.Worker + def perform(_job) do + :ok + end +end + +defmodule TestJobWithInnerSpan do + use Oban.Worker, queue: :events, max_attempts: 1 + require OpenTelemetry.Tracer + + @impl Oban.Worker + def perform(_job) do + OpenTelemetry.Tracer.with_span "span inside the job" do + :ok + end + end +end + +defmodule TestJobThatReturnsError do + use Oban.Worker, queue: :events, max_attempts: 1 + + @impl Oban.Worker + def perform(_job) do + {:error, :something} + end +end + +defmodule TestJobThatThrowsException do + use Oban.Worker, queue: :events, max_attempts: 1 + + @impl Oban.Worker + def perform(_job) do + raise %UndefinedFunctionError{ + message: "function Some.error/0 is undefined (module Some is not available)" + } + end +end + +defmodule TestHelpers do + def remove_oban_handlers() do + Enum.each(:telemetry.list_handlers([:oban]), fn handler -> + :telemetry.detach(handler[:id]) + end) + end +end