Add opentelemetry integration to Oban (#6)
By default a new trace is automatically started when a job is processed by monitoring these events: * `[:oban, :job, :start]` — at the point a job is fetched from the database and will execute * `[:oban, :job, :stop]` — after a job succeeds and the success is recorded in the database * `[:oban, :job, :exception]` — after a job fails and the failure is recorded in the database To also record a span when a job is created and to link traces together `Oban.insert/2` has to be replaced by `OpentelemetryOban.insert/2`. Before: ```elixir %{id: 1, in_the: "business", of_doing: "business"} |> MyApp.Business.new() |> Oban.insert() ``` After: ```elixir %{id: 1, in_the: "business", of_doing: "business"} |> MyApp.Business.new() |> OpentelemetryOban.insert() ``` Co-authored-by: Tristan Sloughter <t@crashfast.com>
This commit is contained in:
parent
f12a635d9d
commit
eecb238cff
|
@ -58,3 +58,6 @@ opentelemetry_phoenix:
|
|||
|
||||
opentelemetry_telemetry:
|
||||
- utilities/opentelemetry_telemetry/**/*
|
||||
|
||||
opentelemetry_oban:
|
||||
- instrumentation/opentelemetry_oban/**/*
|
||||
|
|
|
@ -141,3 +141,49 @@ jobs:
|
|||
run: mix format --check-formatted
|
||||
- name: Test
|
||||
run: mix test
|
||||
|
||||
opentelemetry-oban:
|
||||
needs: [test-matrix]
|
||||
if: (contains(github.event.pull_request.labels.*.name, 'elixir') && contains(github.event.pull_request.labels.*.name, 'opentelemetry_oban'))
|
||||
env:
|
||||
app: 'opentelemetry_oban'
|
||||
defaults:
|
||||
run:
|
||||
working-directory: instrumentation/${{ env.app }}
|
||||
runs-on: ubuntu-18.04
|
||||
name: Opentelemetry Oban test on Elixir ${{ matrix.elixir_version }} (OTP ${{ matrix.otp_version }})
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix: ${{ fromJson(needs.test-matrix.outputs.matrix) }}
|
||||
services:
|
||||
postgres:
|
||||
image: circleci/postgres:13.3-ram
|
||||
ports: ['5432:5432']
|
||||
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
|
||||
env:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_DB: opentelemetry_oban_test
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: erlef/setup-beam@v1
|
||||
with:
|
||||
otp-version: ${{ matrix.otp_version }}
|
||||
elixir-version: ${{ matrix.elixir_version }}
|
||||
rebar3-version: ${{ matrix.rebar3_version }}
|
||||
- name: Cache
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: |
|
||||
instrumentation/${{ env.app }}/deps
|
||||
instrumentation/${{ env.app }}/_build
|
||||
key: ${{ runner.os }}-build-${{ matrix.otp_version }}-${{ matrix.elixir_version }}-v3-${{ hashFiles(format('{0}{1}', github.workspace, 'instrumentation/${{ env.app }}/mix.lock')) }}
|
||||
- name: Fetch deps
|
||||
if: steps.deps-cache.outputs.cache-hit != 'true'
|
||||
run: mix deps.get
|
||||
- name: Compile project
|
||||
run: mix compile --warnings-as-errors
|
||||
- name: Check formatting
|
||||
run: mix format --check-formatted
|
||||
- name: Test
|
||||
run: mix test
|
||||
|
|
|
@ -16,5 +16,6 @@
|
|||
|
||||
/instrumentation/opentelemetry_cowboy @bryannaegele @tsloughter
|
||||
/instrumentation/opentelemetry_ecto @bryannaegele @tsloughter
|
||||
/instrumentation/opentelemetry_oban @indrekj
|
||||
/instrumentation/opentelemetry_phoenix @bryannaegele @tsloughter
|
||||
/utilities/opentelemetry_telemetry @bryannaegele @tsloughter
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
# Used by "mix format"
|
||||
[
|
||||
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
|
||||
]
|
|
@ -0,0 +1,24 @@
|
|||
# The directory Mix will write compiled artifacts to.
|
||||
/_build/
|
||||
|
||||
# If you run "mix test --cover", coverage assets end up here.
|
||||
/cover/
|
||||
|
||||
# The directory Mix downloads your dependencies sources to.
|
||||
/deps/
|
||||
|
||||
# Where third-party dependencies like ExDoc output generated docs.
|
||||
/doc/
|
||||
|
||||
# Ignore .fetch files in case you like to edit your project deps locally.
|
||||
/.fetch
|
||||
|
||||
# If the VM crashes, it generates a dump, let's ignore it too.
|
||||
erl_crash.dump
|
||||
|
||||
# Also ignore archive artifacts (built via "mix archive.build").
|
||||
*.ez
|
||||
|
||||
# Ignore package tarball (built via "mix hex.build").
|
||||
opentelemetry_oban-*.tar
|
||||
|
|
@ -0,0 +1,2 @@
|
|||
erlang 23.2
|
||||
elixir 1.11
|
|
@ -0,0 +1,5 @@
|
|||
# Changelog
|
||||
|
||||
## 0.1.0
|
||||
|
||||
* Initial release
|
|
@ -0,0 +1,201 @@
|
|||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright [yyyy] [name of copyright owner]
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
|
@ -0,0 +1,54 @@
|
|||
# OpentelemetryOban
|
||||
|
||||
OpentelemetryOban uses [telemetry](https://hexdocs.pm/telemetry/) handlers to
|
||||
create `OpenTelemetry` spans from Oban events.
|
||||
|
||||
## Installation
|
||||
|
||||
The package can be installed by adding `opentelemetry_oban` to your list of
|
||||
dependencies in `mix.exs`:
|
||||
|
||||
```elixir
|
||||
def deps do
|
||||
[
|
||||
{:opentelemetry_oban, "~> 0.1"}
|
||||
]
|
||||
end
|
||||
```
|
||||
|
||||
In your application start:
|
||||
|
||||
```elixir
|
||||
def start(_type, _args) do
|
||||
OpentelemetryOban.setup()
|
||||
|
||||
# ...
|
||||
end
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
By default a new trace is automatically started when a job is processed.
|
||||
|
||||
To also record a span when a job is created and to link traces together
|
||||
`Oban.insert/2` has to be replaced by `OpentelemetryOban.insert/2`.
|
||||
|
||||
Before:
|
||||
|
||||
```elixir
|
||||
%{id: 1, in_the: "business", of_doing: "business"}
|
||||
|> MyApp.Business.new()
|
||||
|> Oban.insert()
|
||||
```
|
||||
|
||||
After:
|
||||
|
||||
```elixir
|
||||
%{id: 1, in_the: "business", of_doing: "business"}
|
||||
|> MyApp.Business.new()
|
||||
|> OpentelemetryOban.insert()
|
||||
```
|
||||
|
||||
Oban also supports inserting jobs using `Oban.insert/4`, `Oban.insert_all/2`
|
||||
and `Oban.insert_all/4`. These are currently not supported by OpentelemetryOban
|
||||
and are just proxied through to Oban.
|
|
@ -0,0 +1,150 @@
|
|||
defmodule OpentelemetryOban do
|
||||
@moduledoc """
|
||||
OpentelemetryOban uses [telemetry](https://hexdocs.pm/telemetry/) handlers to
|
||||
create `OpenTelemetry` spans from Oban events.
|
||||
|
||||
Supported events include job start/stop and also when an exception is raised.
|
||||
|
||||
## Usage
|
||||
|
||||
In your application start:
|
||||
|
||||
def start(_type, _args) do
|
||||
OpentelemetryOban.setup()
|
||||
|
||||
# ...
|
||||
end
|
||||
"""
|
||||
|
||||
alias Ecto.Changeset
|
||||
alias OpenTelemetry.Span
|
||||
|
||||
require OpenTelemetry.Tracer
|
||||
|
||||
@tracer_id :opentelemetry_oban
|
||||
|
||||
@doc """
|
||||
Initializes and configures telemetry handlers.
|
||||
|
||||
By default jobs and plugins are traced. If you wish to trace only jobs then
|
||||
use:
|
||||
|
||||
OpentelemetryOban.setup(trace: [:jobs])
|
||||
|
||||
Note that if you don't trace plugins, but inside the plugins, there are spans
|
||||
from other instrumentation libraries (e.g. ecto) then these will still be
|
||||
traced. This setting controls only the spans that are created by
|
||||
opentelemetry_oban.
|
||||
"""
|
||||
@spec setup() :: :ok
|
||||
def setup(opts \\ []) do
|
||||
{:ok, otel_tracer_vsn} = :application.get_key(@tracer_id, :vsn)
|
||||
OpenTelemetry.register_tracer(@tracer_id, otel_tracer_vsn)
|
||||
|
||||
trace = Keyword.get(opts, :trace, [:jobs, :plugins])
|
||||
|
||||
if Enum.member?(trace, :jobs) do
|
||||
OpentelemetryOban.JobHandler.attach()
|
||||
end
|
||||
|
||||
if Enum.member?(trace, :plugins) do
|
||||
OpentelemetryOban.PluginHandler.attach()
|
||||
end
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
def insert(name \\ Oban, %Changeset{} = changeset) do
|
||||
attributes = attributes_before_insert(changeset)
|
||||
worker = Changeset.get_field(changeset, :worker, "unknown")
|
||||
|
||||
OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do
|
||||
changeset = add_tracing_information_to_meta(changeset)
|
||||
|
||||
case Oban.insert(name, changeset) do
|
||||
{:ok, job} ->
|
||||
OpenTelemetry.Tracer.set_attributes(attributes_after_insert(job))
|
||||
{:ok, job}
|
||||
|
||||
other ->
|
||||
other
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def insert(name \\ Oban, multi, multi_name, changeset_or_fun) do
|
||||
Oban.insert(name, multi, multi_name, changeset_or_fun)
|
||||
end
|
||||
|
||||
def insert!(name \\ Oban, %Changeset{} = changeset) do
|
||||
attributes = attributes_before_insert(changeset)
|
||||
worker = Changeset.get_field(changeset, :worker, "unknown")
|
||||
|
||||
OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do
|
||||
changeset = add_tracing_information_to_meta(changeset)
|
||||
|
||||
try do
|
||||
job = Oban.insert!(name, changeset)
|
||||
OpenTelemetry.Tracer.set_attributes(attributes_after_insert(job))
|
||||
job
|
||||
rescue
|
||||
exception ->
|
||||
ctx = OpenTelemetry.Tracer.current_span_ctx()
|
||||
Span.record_exception(ctx, exception, __STACKTRACE__)
|
||||
Span.set_status(ctx, OpenTelemetry.status(:error, ""))
|
||||
reraise exception, __STACKTRACE__
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def insert_all(name \\ Oban, changesets_or_wrapper)
|
||||
|
||||
def insert_all(name, %{changesets: changesets}) when is_list(changesets) do
|
||||
insert_all(name, changesets)
|
||||
end
|
||||
|
||||
def insert_all(name, changesets) when is_list(changesets) do
|
||||
# changesets in insert_all can include different workers and different
|
||||
# queues. This means we cannot provide much information here, but we can
|
||||
# still record the insert and propagate the context information.
|
||||
OpenTelemetry.Tracer.with_span "Oban bulk insert", kind: :producer do
|
||||
changesets = Enum.map(changesets, &add_tracing_information_to_meta/1)
|
||||
Oban.insert_all(name, changesets)
|
||||
end
|
||||
end
|
||||
|
||||
def insert_all(name \\ __MODULE__, multi, multi_name, changesets_or_wrapper) do
|
||||
Oban.insert_all(name, multi, multi_name, changesets_or_wrapper)
|
||||
end
|
||||
|
||||
defp add_tracing_information_to_meta(changeset) do
|
||||
meta = Changeset.get_field(changeset, :meta, %{})
|
||||
|
||||
new_meta =
|
||||
[]
|
||||
|> :otel_propagator_text_map.inject()
|
||||
|> Enum.into(meta)
|
||||
|
||||
Changeset.change(changeset, %{meta: new_meta})
|
||||
end
|
||||
|
||||
defp attributes_before_insert(changeset) do
|
||||
queue = Changeset.get_field(changeset, :queue, "unknown")
|
||||
worker = Changeset.get_field(changeset, :worker, "unknown")
|
||||
|
||||
[
|
||||
"messaging.system": "oban",
|
||||
"messaging.destination": queue,
|
||||
"messaging.destination_kind": "queue",
|
||||
"messaging.oban.worker": worker
|
||||
]
|
||||
end
|
||||
|
||||
defp attributes_after_insert(job) do
|
||||
[
|
||||
"messaging.oban.job_id": job.id,
|
||||
"messaging.oban.priority": job.priority,
|
||||
"messaging.oban.max_attempts": job.max_attempts
|
||||
]
|
||||
end
|
||||
end
|
|
@ -0,0 +1,101 @@
|
|||
defmodule OpentelemetryOban.JobHandler do
|
||||
alias OpenTelemetry.Span
|
||||
|
||||
@tracer_id :opentelemetry_oban
|
||||
|
||||
def attach() do
|
||||
attach_job_start_handler()
|
||||
attach_job_stop_handler()
|
||||
attach_job_exception_handler()
|
||||
end
|
||||
|
||||
defp attach_job_start_handler() do
|
||||
:telemetry.attach(
|
||||
"#{__MODULE__}.job_start",
|
||||
[:oban, :job, :start],
|
||||
&__MODULE__.handle_job_start/4,
|
||||
[]
|
||||
)
|
||||
end
|
||||
|
||||
defp attach_job_stop_handler() do
|
||||
:telemetry.attach(
|
||||
"#{__MODULE__}.job_stop",
|
||||
[:oban, :job, :stop],
|
||||
&__MODULE__.handle_job_stop/4,
|
||||
[]
|
||||
)
|
||||
end
|
||||
|
||||
defp attach_job_exception_handler() do
|
||||
:telemetry.attach(
|
||||
"#{__MODULE__}.job_exception",
|
||||
[:oban, :job, :exception],
|
||||
&__MODULE__.handle_job_exception/4,
|
||||
[]
|
||||
)
|
||||
end
|
||||
|
||||
def handle_job_start(_event, _measurements, metadata, _config) do
|
||||
%{
|
||||
job: %{
|
||||
id: id,
|
||||
queue: queue,
|
||||
worker: worker,
|
||||
priority: priority,
|
||||
inserted_at: inserted_at,
|
||||
scheduled_at: scheduled_at,
|
||||
attempt: attempt,
|
||||
max_attempts: max_attempts,
|
||||
meta: job_meta
|
||||
}
|
||||
} = metadata
|
||||
|
||||
:otel_propagator_text_map.extract(Map.to_list(job_meta))
|
||||
parent = OpenTelemetry.Tracer.current_span_ctx()
|
||||
links = if parent == :undefined, do: [], else: [OpenTelemetry.link(parent)]
|
||||
OpenTelemetry.Tracer.set_current_span(:undefined)
|
||||
|
||||
attributes = [
|
||||
"messaging.system": "oban",
|
||||
"messaging.destination": queue,
|
||||
"messaging.destination_kind": "queue",
|
||||
"messaging.operation": "process",
|
||||
"messaging.oban.job_id": id,
|
||||
"messaging.oban.worker": worker,
|
||||
"messaging.oban.priority": priority,
|
||||
"messaging.oban.attempt": attempt,
|
||||
"messaging.oban.max_attempts": max_attempts,
|
||||
"messaging.oban.inserted_at":
|
||||
if(inserted_at, do: DateTime.to_iso8601(inserted_at), else: nil),
|
||||
"messaging.oban.scheduled_at": DateTime.to_iso8601(scheduled_at)
|
||||
]
|
||||
|
||||
span_name = "#{worker} process"
|
||||
|
||||
OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name, metadata, %{
|
||||
kind: :consumer,
|
||||
links: links,
|
||||
attributes: attributes
|
||||
})
|
||||
end
|
||||
|
||||
def handle_job_stop(_event, _measurements, metadata, _config) do
|
||||
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
|
||||
end
|
||||
|
||||
def handle_job_exception(
|
||||
_event,
|
||||
_measurements,
|
||||
%{stacktrace: stacktrace, error: error} = metadata,
|
||||
_config
|
||||
) do
|
||||
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata)
|
||||
|
||||
# Record exception and mark the span as errored
|
||||
Span.record_exception(ctx, error, stacktrace)
|
||||
Span.set_status(ctx, OpenTelemetry.status(:error, ""))
|
||||
|
||||
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
|
||||
end
|
||||
end
|
|
@ -0,0 +1,66 @@
|
|||
defmodule OpentelemetryOban.PluginHandler do
|
||||
alias OpenTelemetry.Span
|
||||
|
||||
@tracer_id :opentelemetry_oban
|
||||
|
||||
def attach() do
|
||||
attach_plugin_start_handler()
|
||||
attach_plugin_stop_handler()
|
||||
attach_plugin_exception_handler()
|
||||
end
|
||||
|
||||
defp attach_plugin_start_handler() do
|
||||
:telemetry.attach(
|
||||
"#{__MODULE__}.plugin_start",
|
||||
[:oban, :plugin, :start],
|
||||
&__MODULE__.handle_plugin_start/4,
|
||||
[]
|
||||
)
|
||||
end
|
||||
|
||||
defp attach_plugin_stop_handler() do
|
||||
:telemetry.attach(
|
||||
"#{__MODULE__}.plugin_stop",
|
||||
[:oban, :plugin, :stop],
|
||||
&__MODULE__.handle_plugin_stop/4,
|
||||
[]
|
||||
)
|
||||
end
|
||||
|
||||
defp attach_plugin_exception_handler() do
|
||||
:telemetry.attach(
|
||||
"#{__MODULE__}.plugin_exception",
|
||||
[:oban, :plugin, :exception],
|
||||
&__MODULE__.handle_plugin_exception/4,
|
||||
[]
|
||||
)
|
||||
end
|
||||
|
||||
def handle_plugin_start(_event, _measurements, %{plugin: plugin} = metadata, _config) do
|
||||
OpentelemetryTelemetry.start_telemetry_span(
|
||||
@tracer_id,
|
||||
"#{plugin} process",
|
||||
metadata,
|
||||
%{}
|
||||
)
|
||||
end
|
||||
|
||||
def handle_plugin_stop(_event, _measurements, metadata, _config) do
|
||||
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
|
||||
end
|
||||
|
||||
def handle_plugin_exception(
|
||||
_event,
|
||||
_measurements,
|
||||
%{stacktrace: stacktrace, error: error} = metadata,
|
||||
_config
|
||||
) do
|
||||
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata)
|
||||
|
||||
# Record exception and mark the span as errored
|
||||
Span.record_exception(ctx, error, stacktrace)
|
||||
Span.set_status(ctx, OpenTelemetry.status(:error, ""))
|
||||
|
||||
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
|
||||
end
|
||||
end
|
|
@ -0,0 +1,53 @@
|
|||
defmodule OpentelemetryOban.MixProject do
|
||||
use Mix.Project
|
||||
|
||||
def project do
|
||||
[
|
||||
app: :opentelemetry_oban,
|
||||
version: "0.2.0-rc.3",
|
||||
elixir: "~> 1.10",
|
||||
start_permanent: Mix.env() == :prod,
|
||||
deps: deps(),
|
||||
docs: [
|
||||
main: "OpentelemetryOban",
|
||||
extras: ["README.md"]
|
||||
],
|
||||
elixirc_paths: elixirc_paths(Mix.env()),
|
||||
package: [
|
||||
name: "opentelemetry_oban",
|
||||
description: "OpenTelemetry tracing for Oban",
|
||||
maintainers: ["Glia TechMovers"],
|
||||
licenses: ["Apache-2.0"],
|
||||
links: %{
|
||||
"GitHub" => "https://github.com/open-telemetry/opentelemetry-erlang-contrib",
|
||||
"OpenTelemetry Erlang" => "https://github.com/open-telemetry/opentelemetry-erlang",
|
||||
"OpenTelemetry.io" => "https://opentelemetry.io"
|
||||
},
|
||||
files: ~w(lib .formatter.exs mix.exs README* LICENSE* CHANGELOG*)
|
||||
]
|
||||
]
|
||||
end
|
||||
|
||||
# Run "mix help compile.app" to learn about applications.
|
||||
def application do
|
||||
[
|
||||
extra_applications: []
|
||||
]
|
||||
end
|
||||
|
||||
# Run "mix help deps" to learn about dependencies.
|
||||
defp deps do
|
||||
[
|
||||
{:oban, "~> 2.0"},
|
||||
{:opentelemetry_api, "~> 1.0.0-rc.3"},
|
||||
{:opentelemetry_telemetry, "~> 1.0.0-beta"},
|
||||
{:opentelemetry, "~> 1.0.0-rc.3", only: [:test]},
|
||||
{:opentelemetry_exporter, "~> 1.0.0-rc.3", only: [:test]},
|
||||
{:telemetry, "~> 0.4 or ~> 1.0"},
|
||||
{:ex_doc, "~> 0.24", only: [:dev], runtime: false}
|
||||
]
|
||||
end
|
||||
|
||||
defp elixirc_paths(:test), do: ["lib", "test/support"]
|
||||
defp elixirc_paths(_), do: ["lib"]
|
||||
end
|
|
@ -0,0 +1,28 @@
|
|||
%{
|
||||
"acceptor_pool": {:hex, :acceptor_pool, "1.0.0", "43c20d2acae35f0c2bcd64f9d2bde267e459f0f3fd23dab26485bf518c281b21", [:rebar3], [], "hexpm", "0cbcd83fdc8b9ad2eee2067ef8b91a14858a5883cb7cd800e6fcd5803e158788"},
|
||||
"chatterbox": {:hex, :ts_chatterbox, "0.11.0", "b8f372c706023eb0de5bf2976764edb27c70fe67052c88c1f6a66b3a5626847f", [:rebar3], [{:hpack, "~>0.2.3", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "722fe2bad52913ab7e87d849fc6370375f0c961ffb2f0b5e6d647c9170c382a6"},
|
||||
"connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"},
|
||||
"ctx": {:hex, :ctx, "0.6.0", "8ff88b70e6400c4df90142e7f130625b82086077a45364a78d208ed3ed53c7fe", [:rebar3], [], "hexpm", "a14ed2d1b67723dbebbe423b28d7615eb0bdcba6ff28f2d1f1b0a7e1d4aa5fc2"},
|
||||
"db_connection": {:hex, :db_connection, "2.4.0", "d04b1b73795dae60cead94189f1b8a51cc9e1f911c234cc23074017c43c031e5", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ad416c21ad9f61b3103d254a71b63696ecadb6a917b36f563921e0de00d7d7c8"},
|
||||
"decimal": {:hex, :decimal, "2.0.0", "a78296e617b0f5dd4c6caf57c714431347912ffb1d0842e998e9792b5642d697", [:mix], [], "hexpm", "34666e9c55dea81013e77d9d87370fe6cb6291d1ef32f46a1600230b1d44f577"},
|
||||
"earmark_parser": {:hex, :earmark_parser, "1.4.15", "b29e8e729f4aa4a00436580dcc2c9c5c51890613457c193cc8525c388ccb2f06", [:mix], [], "hexpm", "044523d6438ea19c1b8ec877ec221b008661d3c27e3b848f4c879f500421ca5c"},
|
||||
"ecto": {:hex, :ecto, "3.7.1", "a20598862351b29f80f285b21ec5297da1181c0442687f9b8329f0445d228892", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d36e5b39fc479e654cffd4dbe1865d9716e4a9b6311faff799b6f90ab81b8638"},
|
||||
"ecto_sql": {:hex, :ecto_sql, "3.7.0", "2fcaad4ab0c8d76a5afbef078162806adbe709c04160aca58400d5cbbe8eeac6", [:mix], [{:db_connection, "~> 2.2", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.4.0 or ~> 0.5.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.15.0 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a26135dfa1d99bf87a928c464cfa25bba6535a4fe761eefa56077a4febc60f70"},
|
||||
"ex_doc": {:hex, :ex_doc, "0.25.3", "3edf6a0d70a39d2eafde030b8895501b1c93692effcbd21347296c18e47618ce", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "9ebebc2169ec732a38e9e779fd0418c9189b3ca93f4a676c961be6c1527913f5"},
|
||||
"gproc": {:hex, :gproc, "0.8.0", "cea02c578589c61e5341fce149ea36ccef236cc2ecac8691fba408e7ea77ec2f", [:rebar3], [], "hexpm", "580adafa56463b75263ef5a5df4c86af321f68694e7786cb057fd805d1e2a7de"},
|
||||
"grpcbox": {:hex, :grpcbox, "0.14.0", "3eb321bcd2275baf8b54cf381feb7b0559a50c02544de28fda039c7f2f9d1a7a", [:rebar3], [{:acceptor_pool, "~>1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~>0.11.0", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~>0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~>0.8.0", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "e24159b7b6d3f9869bbe528845c0125fed2259366ba908fd04a1f45fe81d0660"},
|
||||
"hpack": {:hex, :hpack_erl, "0.2.3", "17670f83ff984ae6cd74b1c456edde906d27ff013740ee4d9efaa4f1bf999633", [:rebar3], [], "hexpm", "06f580167c4b8b8a6429040df36cc93bba6d571faeaec1b28816523379cbb23a"},
|
||||
"jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"},
|
||||
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
|
||||
"makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"},
|
||||
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
|
||||
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
|
||||
"oban": {:hex, :oban, "2.8.0", "e44b19a30e30bb983099f55d59749316ff0eaf5dfef4214e1190738176653e50", [:mix], [{:ecto_sql, ">= 3.4.3", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2954a2ac418f7cc4217c0772a3dd3a70e2966240583b97f4126a489e1300a573"},
|
||||
"opentelemetry": {:hex, :opentelemetry, "1.0.0-rc.3", "d2698bee882c354274563ee85d097bb736a9adb8d8ed376a4deea0cd3a14bb31", [:rebar3], [{:opentelemetry_api, "~> 1.0.0-rc.3", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "c9105933df0d783d94cf08d79206eb8d6578abc0bcbd498d0b497ec62a4e30a8"},
|
||||
"opentelemetry_api": {:hex, :opentelemetry_api, "1.0.0-rc.3.1", "d183663c178f317a109a267b3c3664d09db22829a4d4eea8d9af46ed3e5bee05", [:mix, :rebar3], [], "hexpm", "4b836cec1b531080c310fa54afca6e523984a1f6c1aeb5d4da537dad9e309ce9"},
|
||||
"opentelemetry_exporter": {:hex, :opentelemetry_exporter, "1.0.0-rc.3", "76f5657d4c94a12003d9ed2c8da1023c815e98f5553184dbb0cdaeec76db676d", [:rebar3], [{:grpcbox, ">= 0.0.0", [hex: :grpcbox, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.0.0-rc.3", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.0.0-rc.3", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "267f0e4c3f1f5557cc7ad6ac71d66b8eaf7b3b56fde942c21f8a0bc96174fe1e"},
|
||||
"opentelemetry_telemetry": {:hex, :opentelemetry_telemetry, "1.0.0-beta.2", "b840eee9e68307ad7fa4ee316da19db3f8e30763b87737d3304782ca3cc296a2", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.0.0-rc.1", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_registry, "~> 0.2.1", [hex: :telemetry_registry, repo: "hexpm", optional: false]}], "hexpm", "e8b12f42614d0aeb6a49001c75ca035544950f736fdbb240177838674f99e1e2"},
|
||||
"postgrex": {:hex, :postgrex, "0.15.10", "2809dee1b1d76f7cbabe570b2a9285c2e7b41be60cf792f5f2804a54b838a067", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "1560ca427542f6b213f8e281633ae1a3b31cdbcd84ebd7f50628765b8f6132be"},
|
||||
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
|
||||
"telemetry_registry": {:hex, :telemetry_registry, "0.2.1", "fe648a691f2128e4279d993cd010994c67f282354dc061e697bf070d4b87b480", [:mix, :rebar3], [{:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4221cefbcadd0b3e7076960339223742d973f1371bc20f3826af640257bc3690"},
|
||||
}
|
|
@ -0,0 +1,112 @@
|
|||
defmodule OpentelemetryOban.PluginHandlerTest do
|
||||
use DataCase
|
||||
|
||||
require OpenTelemetry.Tracer
|
||||
require OpenTelemetry.Span
|
||||
require Record
|
||||
|
||||
for {name, spec} <- Record.extract_all(from_lib: "opentelemetry/include/otel_span.hrl") do
|
||||
Record.defrecord(name, spec)
|
||||
end
|
||||
|
||||
for {name, spec} <- Record.extract_all(from_lib: "opentelemetry_api/include/opentelemetry.hrl") do
|
||||
Record.defrecord(name, spec)
|
||||
end
|
||||
|
||||
setup do
|
||||
:application.stop(:opentelemetry)
|
||||
:application.set_env(:opentelemetry, :tracer, :otel_tracer_default)
|
||||
|
||||
:application.set_env(:opentelemetry, :processors, [
|
||||
{:otel_batch_processor, %{scheduled_delay_ms: 1, exporter: {:otel_exporter_pid, self()}}}
|
||||
])
|
||||
|
||||
:application.start(:opentelemetry)
|
||||
|
||||
TestHelpers.remove_oban_handlers()
|
||||
OpentelemetryOban.setup()
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
test "does not create spans when tracing plugins is disabled" do
|
||||
TestHelpers.remove_oban_handlers()
|
||||
OpentelemetryOban.setup(trace: [:jobs])
|
||||
|
||||
:telemetry.execute(
|
||||
[:oban, :plugin, :start],
|
||||
%{system_time: System.system_time()},
|
||||
%{plugin: Elixir.Oban.Plugins.Stager}
|
||||
)
|
||||
|
||||
:telemetry.execute(
|
||||
[:oban, :plugin, :stop],
|
||||
%{duration: 444},
|
||||
%{plugin: Elixir.Oban.Plugins.Stager}
|
||||
)
|
||||
|
||||
refute_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")}
|
||||
end
|
||||
|
||||
test "records span on plugin execution" do
|
||||
:telemetry.execute(
|
||||
[:oban, :plugin, :start],
|
||||
%{system_time: System.system_time()},
|
||||
%{plugin: Elixir.Oban.Plugins.Stager}
|
||||
)
|
||||
|
||||
:telemetry.execute(
|
||||
[:oban, :plugin, :stop],
|
||||
%{duration: 444},
|
||||
%{plugin: Elixir.Oban.Plugins.Stager}
|
||||
)
|
||||
|
||||
assert_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")}
|
||||
end
|
||||
|
||||
test "records span on plugin error" do
|
||||
:telemetry.execute(
|
||||
[:oban, :plugin, :start],
|
||||
%{system_time: System.system_time()},
|
||||
%{plugin: Elixir.Oban.Plugins.Stager}
|
||||
)
|
||||
|
||||
:telemetry.execute(
|
||||
[:oban, :plugin, :exception],
|
||||
%{duration: 444},
|
||||
%{
|
||||
plugin: Elixir.Oban.Plugins.Stager,
|
||||
kind: :error,
|
||||
stacktrace: [
|
||||
{Some, :error, [], []}
|
||||
],
|
||||
error: %UndefinedFunctionError{
|
||||
arity: 0,
|
||||
function: :error,
|
||||
message: nil,
|
||||
module: Some,
|
||||
reason: nil
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
expected_status = OpenTelemetry.status(:error, "")
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "Elixir.Oban.Plugins.Stager process",
|
||||
events: [
|
||||
event(
|
||||
name: "exception",
|
||||
attributes: [
|
||||
{"exception.type", "Elixir.UndefinedFunctionError"},
|
||||
{"exception.message",
|
||||
"function Some.error/0 is undefined (module Some is not available)"},
|
||||
{"exception.stacktrace", _stacktrace}
|
||||
]
|
||||
)
|
||||
],
|
||||
status: ^expected_status
|
||||
)}
|
||||
end
|
||||
end
|
|
@ -0,0 +1,379 @@
|
|||
defmodule OpentelemetryObanTest do
|
||||
use DataCase
|
||||
|
||||
doctest OpentelemetryOban
|
||||
|
||||
require OpenTelemetry.Tracer
|
||||
require OpenTelemetry.Span
|
||||
require Record
|
||||
|
||||
for {name, spec} <- Record.extract_all(from_lib: "opentelemetry/include/otel_span.hrl") do
|
||||
Record.defrecord(name, spec)
|
||||
end
|
||||
|
||||
for {name, spec} <- Record.extract_all(from_lib: "opentelemetry_api/include/opentelemetry.hrl") do
|
||||
Record.defrecord(name, spec)
|
||||
end
|
||||
|
||||
setup do
|
||||
:application.stop(:opentelemetry)
|
||||
:application.set_env(:opentelemetry, :tracer, :otel_tracer_default)
|
||||
|
||||
:application.set_env(:opentelemetry, :processors, [
|
||||
{:otel_batch_processor, %{scheduled_delay_ms: 1, exporter: {:otel_exporter_pid, self()}}}
|
||||
])
|
||||
|
||||
:application.start(:opentelemetry)
|
||||
|
||||
TestHelpers.remove_oban_handlers()
|
||||
OpentelemetryOban.setup()
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
test "records span on job insertion" do
|
||||
OpentelemetryOban.insert(TestJob.new(%{}))
|
||||
assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events)
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "TestJob send",
|
||||
attributes: attributes,
|
||||
parent_span_id: :undefined,
|
||||
kind: :producer,
|
||||
status: :undefined
|
||||
)}
|
||||
|
||||
assert [
|
||||
"messaging.destination": "events",
|
||||
"messaging.destination_kind": "queue",
|
||||
"messaging.oban.job_id": _job_id,
|
||||
"messaging.oban.max_attempts": 1,
|
||||
"messaging.oban.priority": 0,
|
||||
"messaging.oban.worker": "TestJob",
|
||||
"messaging.system": "oban"
|
||||
] = List.keysort(attributes, 0)
|
||||
end
|
||||
|
||||
test "job creation uses existing trace if present" do
|
||||
OpenTelemetry.Tracer.with_span "test span" do
|
||||
ctx = OpenTelemetry.Tracer.current_span_ctx()
|
||||
root_trace_id = OpenTelemetry.Span.trace_id(ctx)
|
||||
root_span_id = OpenTelemetry.Span.span_id(ctx)
|
||||
|
||||
OpentelemetryOban.insert(TestJob.new(%{}))
|
||||
assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events)
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "TestJob send",
|
||||
attributes: _attributes,
|
||||
trace_id: ^root_trace_id,
|
||||
parent_span_id: ^root_span_id,
|
||||
kind: :producer,
|
||||
status: :undefined
|
||||
)}
|
||||
end
|
||||
end
|
||||
|
||||
test "keeps existing meta information" do
|
||||
OpentelemetryOban.insert(TestJob.new(%{}, meta: %{foo: "bar"}))
|
||||
|
||||
assert [job] = all_enqueued()
|
||||
assert job.meta["foo"] == "bar"
|
||||
end
|
||||
|
||||
test "tracing information is propagated between send and process" do
|
||||
OpentelemetryOban.insert(TestJob.new(%{}))
|
||||
assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events)
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "TestJob send",
|
||||
attributes: _attributes,
|
||||
trace_id: send_trace_id,
|
||||
span_id: send_span_id,
|
||||
kind: :producer,
|
||||
status: :undefined
|
||||
)}
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "TestJob process",
|
||||
attributes: _attributes,
|
||||
kind: :consumer,
|
||||
status: :undefined,
|
||||
trace_id: process_trace_id,
|
||||
links: [link(trace_id: ^send_trace_id, span_id: ^send_span_id)]
|
||||
)}
|
||||
|
||||
# Process is ran asynchronously so we create a new trace, but still link
|
||||
# the traces together.
|
||||
assert send_trace_id != process_trace_id
|
||||
end
|
||||
|
||||
test "no link is created on process when tracing info was not propagated" do
|
||||
# Using regular Oban, instead of OpentelemetryOban
|
||||
Oban.insert(TestJob.new(%{}))
|
||||
assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events)
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "TestJob process",
|
||||
attributes: _attributes,
|
||||
kind: :consumer,
|
||||
status: :undefined,
|
||||
trace_id: _trace_id,
|
||||
links: []
|
||||
)}
|
||||
end
|
||||
|
||||
test "records spans for successful Oban jobs" do
|
||||
OpentelemetryOban.insert(TestJob.new(%{}))
|
||||
assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events)
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "TestJob process",
|
||||
attributes: attributes,
|
||||
kind: :consumer,
|
||||
status: :undefined
|
||||
)}
|
||||
|
||||
assert [
|
||||
"messaging.destination": "events",
|
||||
"messaging.destination_kind": "queue",
|
||||
"messaging.oban.attempt": 1,
|
||||
"messaging.oban.inserted_at": _inserted_at,
|
||||
"messaging.oban.job_id": _job_id,
|
||||
"messaging.oban.max_attempts": 1,
|
||||
"messaging.oban.priority": 0,
|
||||
"messaging.oban.scheduled_at": _scheduled_at,
|
||||
"messaging.oban.worker": "TestJob",
|
||||
"messaging.operation": "process",
|
||||
"messaging.system": "oban"
|
||||
] = List.keysort(attributes, 0)
|
||||
end
|
||||
|
||||
test "records spans for Oban jobs that stop with {:error, :something}" do
|
||||
OpentelemetryOban.insert(TestJobThatReturnsError.new(%{}))
|
||||
assert %{success: 0, failure: 1} = Oban.drain_queue(queue: :events)
|
||||
|
||||
expected_status = OpenTelemetry.status(:error, "")
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "TestJobThatReturnsError process",
|
||||
attributes: attributes,
|
||||
kind: :consumer,
|
||||
events: [
|
||||
event(
|
||||
name: "exception",
|
||||
attributes: [
|
||||
{"exception.type", "Elixir.Oban.PerformError"},
|
||||
{"exception.message",
|
||||
"TestJobThatReturnsError failed with {:error, :something}"},
|
||||
{"exception.stacktrace", _stacktrace}
|
||||
]
|
||||
)
|
||||
],
|
||||
status: ^expected_status
|
||||
)}
|
||||
|
||||
assert [
|
||||
"messaging.destination": "events",
|
||||
"messaging.destination_kind": "queue",
|
||||
"messaging.oban.attempt": 1,
|
||||
"messaging.oban.inserted_at": _inserted_at,
|
||||
"messaging.oban.job_id": _job_id,
|
||||
"messaging.oban.max_attempts": 1,
|
||||
"messaging.oban.priority": 0,
|
||||
"messaging.oban.scheduled_at": _scheduled_at,
|
||||
"messaging.oban.worker": "TestJobThatReturnsError",
|
||||
"messaging.operation": "process",
|
||||
"messaging.system": "oban"
|
||||
] = List.keysort(attributes, 0)
|
||||
end
|
||||
|
||||
test "records spans for each retry" do
|
||||
OpentelemetryOban.insert(TestJobThatReturnsError.new(%{}, max_attempts: 2))
|
||||
|
||||
assert %{success: 0, failure: 2} =
|
||||
Oban.drain_queue(queue: :events, with_scheduled: true, with_recursion: true)
|
||||
|
||||
expected_status = OpenTelemetry.status(:error, "")
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "TestJobThatReturnsError send",
|
||||
trace_id: send_trace_id,
|
||||
span_id: send_span_id
|
||||
)}
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "TestJobThatReturnsError process",
|
||||
status: ^expected_status,
|
||||
trace_id: first_process_trace_id,
|
||||
links: [link(trace_id: ^send_trace_id, span_id: ^send_span_id)]
|
||||
)}
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "TestJobThatReturnsError process",
|
||||
status: ^expected_status,
|
||||
trace_id: second_process_trace_id,
|
||||
links: [link(trace_id: ^send_trace_id, span_id: ^send_span_id)]
|
||||
)}
|
||||
|
||||
assert first_process_trace_id != second_process_trace_id
|
||||
end
|
||||
|
||||
test "records spans for Oban jobs that stop with an exception" do
|
||||
OpentelemetryOban.insert(TestJobThatThrowsException.new(%{}))
|
||||
assert %{success: 0, failure: 1} = Oban.drain_queue(queue: :events)
|
||||
|
||||
expected_status = OpenTelemetry.status(:error, "")
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "TestJobThatThrowsException process",
|
||||
attributes: attributes,
|
||||
kind: :consumer,
|
||||
events: [
|
||||
event(
|
||||
name: "exception",
|
||||
attributes: [
|
||||
{"exception.type", "Elixir.UndefinedFunctionError"},
|
||||
{"exception.message",
|
||||
"function Some.error/0 is undefined (module Some is not available)"},
|
||||
{"exception.stacktrace", _stacktrace}
|
||||
]
|
||||
)
|
||||
],
|
||||
status: ^expected_status
|
||||
)}
|
||||
|
||||
assert [
|
||||
"messaging.destination": "events",
|
||||
"messaging.destination_kind": "queue",
|
||||
"messaging.oban.attempt": 1,
|
||||
"messaging.oban.inserted_at": _inserted_at,
|
||||
"messaging.oban.job_id": _job_id,
|
||||
"messaging.oban.max_attempts": 1,
|
||||
"messaging.oban.priority": 0,
|
||||
"messaging.oban.scheduled_at": _scheduled_at,
|
||||
"messaging.oban.worker": "TestJobThatThrowsException",
|
||||
"messaging.operation": "process",
|
||||
"messaging.system": "oban"
|
||||
] = List.keysort(attributes, 0)
|
||||
end
|
||||
|
||||
test "spans inside the job are associated with the job trace" do
|
||||
OpentelemetryOban.insert(TestJobWithInnerSpan.new(%{}))
|
||||
assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events)
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "TestJobWithInnerSpan process",
|
||||
kind: :consumer,
|
||||
trace_id: trace_id,
|
||||
span_id: process_span_id
|
||||
)}
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "span inside the job",
|
||||
kind: :internal,
|
||||
trace_id: ^trace_id,
|
||||
parent_span_id: ^process_span_id
|
||||
)}
|
||||
end
|
||||
|
||||
test "OpentelemetryOban.insert!/2 returns job on successful insert" do
|
||||
%Oban.Job{} = OpentelemetryOban.insert!(TestJob.new(%{}))
|
||||
assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events)
|
||||
assert_receive {:span, span(name: "TestJob send")}
|
||||
assert_receive {:span, span(name: "TestJob process")}
|
||||
end
|
||||
|
||||
test "OpentelemetryOban.insert!/2 raises an error on failed insert" do
|
||||
assert_raise(
|
||||
Ecto.InvalidChangesetError,
|
||||
fn -> OpentelemetryOban.insert!(TestJob.new(%{}, max_attempts: -1)) end
|
||||
)
|
||||
|
||||
assert %{success: 0, failure: 0} = Oban.drain_queue(queue: :events)
|
||||
|
||||
expected_status = OpenTelemetry.status(:error, "")
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "TestJob send",
|
||||
events: [
|
||||
event(
|
||||
name: "exception",
|
||||
attributes: [
|
||||
{"exception.type", "Elixir.Ecto.InvalidChangesetError"},
|
||||
{"exception.message", _message},
|
||||
{"exception.stacktrace", _stacktrace}
|
||||
]
|
||||
)
|
||||
],
|
||||
status: ^expected_status
|
||||
)}
|
||||
|
||||
refute_received {:span, span(name: "TestJob process")}
|
||||
end
|
||||
|
||||
test "tracing information is propagated when using insert_all/2" do
|
||||
OpentelemetryOban.insert_all([
|
||||
TestJob.new(%{}),
|
||||
TestJob.new(%{})
|
||||
])
|
||||
|
||||
assert %{success: 2, failure: 0} = Oban.drain_queue(queue: :events)
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "Oban bulk insert",
|
||||
attributes: _attributes,
|
||||
trace_id: send_trace_id,
|
||||
span_id: send_span_id,
|
||||
kind: :producer,
|
||||
status: :undefined
|
||||
)}
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "TestJob process",
|
||||
attributes: _attributes,
|
||||
kind: :consumer,
|
||||
status: :undefined,
|
||||
trace_id: first_process_trace_id,
|
||||
links: [link(trace_id: ^send_trace_id, span_id: ^send_span_id)]
|
||||
)}
|
||||
|
||||
assert_receive {:span,
|
||||
span(
|
||||
name: "TestJob process",
|
||||
attributes: _attributes,
|
||||
kind: :consumer,
|
||||
status: :undefined,
|
||||
trace_id: second_process_trace_id,
|
||||
links: [link(trace_id: ^send_trace_id, span_id: ^send_span_id)]
|
||||
)}
|
||||
|
||||
# Process is ran asynchronously so we create a new trace, but still link
|
||||
# the traces together.
|
||||
assert send_trace_id != first_process_trace_id
|
||||
assert send_trace_id != second_process_trace_id
|
||||
assert first_process_trace_id != second_process_trace_id
|
||||
end
|
||||
|
||||
test "works with Oban.Testing.perform_job helper function" do
|
||||
Oban.Testing.perform_job(TestJob, %{}, repo: TestRepo)
|
||||
|
||||
assert_receive {:span, span(name: "TestJob process")}
|
||||
end
|
||||
end
|
|
@ -0,0 +1,32 @@
|
|||
defmodule DataCase do
|
||||
@moduledoc """
|
||||
This module defines the setup for tests requiring access to the data layer.
|
||||
|
||||
You may define functions here to be used as helpers in your tests.
|
||||
|
||||
Finally, if the test case interacts with the database, it cannot be async.
|
||||
For this reason, every test runs inside a transaction which is reset at the
|
||||
beginning of the test unless the test case is marked as async.
|
||||
"""
|
||||
|
||||
use ExUnit.CaseTemplate
|
||||
|
||||
using do
|
||||
quote do
|
||||
use Oban.Testing, repo: TestRepo
|
||||
|
||||
import Ecto
|
||||
import DataCase
|
||||
end
|
||||
end
|
||||
|
||||
setup tags do
|
||||
:ok = Ecto.Adapters.SQL.Sandbox.checkout(TestRepo)
|
||||
|
||||
unless tags[:async] do
|
||||
Ecto.Adapters.SQL.Sandbox.mode(TestRepo, {:shared, self()})
|
||||
end
|
||||
|
||||
:ok
|
||||
end
|
||||
end
|
|
@ -0,0 +1,5 @@
|
|||
defmodule TestRepo do
|
||||
use Ecto.Repo,
|
||||
otp_app: :opentelemetry_oban,
|
||||
adapter: Ecto.Adapters.Postgres
|
||||
end
|
|
@ -0,0 +1,74 @@
|
|||
ExUnit.start()
|
||||
|
||||
TestRepo.start_link(
|
||||
database: "opentelemetry_oban_test",
|
||||
hostname: "localhost",
|
||||
username: "postgres",
|
||||
password: "postgres",
|
||||
pool: Ecto.Adapters.SQL.Sandbox
|
||||
)
|
||||
|
||||
Ecto.Adapters.SQL.Sandbox.mode(TestRepo, {:shared, self()})
|
||||
|
||||
defmodule PrepareOban do
|
||||
use Ecto.Migration
|
||||
def up, do: Oban.Migrations.up()
|
||||
end
|
||||
|
||||
Ecto.Migrator.run(TestRepo, [{0, PrepareOban}], :up, all: true)
|
||||
TestRepo.query("TRUNCATE oban_jobs", [])
|
||||
|
||||
Oban.start_link(
|
||||
repo: TestRepo,
|
||||
plugins: [Oban.Plugins.Pruner],
|
||||
queues: [default: 10, events: 50]
|
||||
)
|
||||
|
||||
defmodule TestJob do
|
||||
use Oban.Worker, queue: :events, max_attempts: 1
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(_job) do
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
defmodule TestJobWithInnerSpan do
|
||||
use Oban.Worker, queue: :events, max_attempts: 1
|
||||
require OpenTelemetry.Tracer
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(_job) do
|
||||
OpenTelemetry.Tracer.with_span "span inside the job" do
|
||||
:ok
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defmodule TestJobThatReturnsError do
|
||||
use Oban.Worker, queue: :events, max_attempts: 1
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(_job) do
|
||||
{:error, :something}
|
||||
end
|
||||
end
|
||||
|
||||
defmodule TestJobThatThrowsException do
|
||||
use Oban.Worker, queue: :events, max_attempts: 1
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(_job) do
|
||||
raise %UndefinedFunctionError{
|
||||
message: "function Some.error/0 is undefined (module Some is not available)"
|
||||
}
|
||||
end
|
||||
end
|
||||
|
||||
defmodule TestHelpers do
|
||||
def remove_oban_handlers() do
|
||||
Enum.each(:telemetry.list_handlers([:oban]), fn handler ->
|
||||
:telemetry.detach(handler[:id])
|
||||
end)
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue