Conveniences for Tasks (#80)

* Conveniences for Tasks

* Finish Task module functions

* Complete Task and add Task.Supervisor

* Check if new funs added

* Remove with_ctx in favor of wrapping default

* Fix docs
This commit is contained in:
Bryan Naegele 2022-08-26 14:10:57 -06:00 committed by GitHub
parent e8d1462f46
commit fb6c3d2985
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 2706 additions and 10 deletions

View File

@ -0,0 +1,553 @@
defmodule OpentelemetryProcessPropagator.Task do
@moduledoc """
`OpentelemetryProcessPropagator.Task` provides a set of extensions
to the `Task` module to reduce some boilerplate in propagating OpenTelemetry
contexts across process boundaries. Since these are extensions rather
than a replacement of Elixir's module, this library can be aliased
into a file without concern for creating spans where you do not want them.
Each `Task` function is replicated with two variants: `*_with_span`
and `*_with_linked_span`. Each of these variations has a specific use case.
The original implementation for each function automatically propagates the
current context.
* `*` - propagates the current context
* `*_with_span` - propagates the current context and starts a new child span.
* `*_with_linked_span` - propagates the current context and starts a new linked span.
> #### Module Redefinement {: .info}
>
> This module does not redefine the `Task` module, instead providing a wrapper of the module,
> so this functionality will not globally modify the default behavior of the `Task` module.
## Usage
```
defmodule MyApp do
require OpenTelemetry.Tracer
alias OpentelemetryProcessPropagator.Task
def untraced_task do
Task.async(fn ->
:ok
end)
|> Task.await()
end
def traced_task do
Task.async_with_span(:span_name, %{attributes: %{a: "b"}}, fn ->
Tracer.set_attribute(:c, "d")
:ok
end)
|> Task.await()
end
end
```
"""
alias OpentelemetryProcessPropagator.Task.Wrapper
require OpenTelemetry.Tracer
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with the current `t:OpenTelemetry.Ctx.t/0`
attached.
See `Task.async_stream/3` for more information.
"""
@spec async_stream(Enumerable.t(), (term() -> term()), keyword()) :: Enumerable.t()
def async_stream(enumerable, fun, opts \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.async_stream(
enumerable,
fn arg ->
OpenTelemetry.Ctx.attach(ctx)
fun.(arg)
end,
opts
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with the
current `t:OpenTelemetry.Ctx.t/0` attached.
See `Task.async_stream/5` for more information.
"""
@spec async_stream(
Enumerable.t(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream(enumerable, module, function_name, args, opts \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.async_stream(enumerable, Wrapper, :with_ctx, [ctx, {module, function_name, args}], opts)
end
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with a new child span.
See `Task.async_stream/3` for more information.
"""
@spec async_stream_with_span(
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(term() -> term()),
keyword()
) :: Enumerable.t()
def async_stream_with_span(enumerable, name, start_opts, fun, opts \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.async_stream(
enumerable,
fn arg ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.(arg)
end
end,
opts
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with a new child span.
See `Task.async_stream/5` for more information.
"""
@spec async_stream_with_span(
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream_with_span(enumerable, name, start_opts, module, function_name, args, opts \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.async_stream(enumerable, Wrapper, :with_span, [name, start_opts, ctx, {module, function_name, args}], opts)
end
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with a new linked span.
See `Task.async_stream/3` for more information.
"""
@spec async_stream_with_linked_span(
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(term() -> term()),
keyword()
) :: Enumerable.t()
def async_stream_with_linked_span(enumerable, name, start_opts, fun, opts \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.async_stream(
enumerable,
fn arg ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.(arg)
end
end,
opts
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with a new linked span.
See `Task.async_stream/5` for more information.
"""
@spec async_stream_with_linked_span(
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream_with_linked_span(enumerable, name, start_opts, module, function_name, args, opts \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.async_stream(
enumerable,
Wrapper,
:with_linked_span,
[name, start_opts, parent, {module, function_name, args}],
opts
)
end
@doc """
Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on.
See `Task.async/1` for more information.
"""
@spec async((() -> any())) :: Task.t()
def async(fun) do
ctx = OpenTelemetry.Ctx.get_current()
Task.async(fn ->
OpenTelemetry.Ctx.attach(ctx)
fun.()
end)
end
@doc """
Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on.
See `Task.async/3` for more information.
"""
@spec async(module(), atom(), [term()]) :: Task.t()
def async(module, function_name, args) do
ctx = OpenTelemetry.Ctx.get_current()
Task.async(Wrapper, :with_ctx, [ctx, {module, function_name, args}])
end
@doc """
Starts a task with a new child span that can be awaited on.
See `Task.async/1` for more information.
"""
@spec async_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(() -> any())
) :: Task.t()
def async_with_span(name, start_opts, fun) do
ctx = OpenTelemetry.Ctx.get_current()
Task.async(fn ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.()
end
end)
end
@doc """
Starts a task with a new child span that can be awaited on.
See `Task.async/3` for more information.
"""
@spec async_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()]
) :: Task.t()
def async_with_span(name, start_opts, module, function_name, args) do
ctx = OpenTelemetry.Ctx.get_current()
Task.async(Wrapper, :with_span, [name, start_opts, ctx, {module, function_name, args}])
end
@doc """
Starts a task with a new linked span that can be awaited on.
See `Task.async/1` for more information.
"""
@spec async_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(() -> any())
) :: Task.t()
def async_with_linked_span(name, start_opts, fun) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.async(fn ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.()
end
end)
end
@doc """
Starts a task with a new linked span that can be awaited on.
See `Task.async/3` for more information.
"""
@spec async_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()]
) :: Task.t()
def async_with_linked_span(name, start_opts, module, function_name, args) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.async(Wrapper, :with_linked_span, [name, start_opts, parent, {module, function_name, args}])
end
@doc """
Starts a task with the current `t:OpenTelemetry.Ctx.t/0`.
See `Task.start/1` for more information.
"""
@spec start((() -> any())) :: {:ok, pid()}
def start(fun) do
ctx = OpenTelemetry.Ctx.get_current()
Task.start(fn ->
OpenTelemetry.Ctx.attach(ctx)
fun.()
end)
end
@doc """
Starts a task with the current `t:OpenTelemetry.Ctx.t/0`.
See `Task.start/3` for more information.
"""
@spec start(module(), atom(), [term()]) :: {:ok, pid()}
def start(module, function_name, args) do
ctx = OpenTelemetry.Ctx.get_current()
Task.start(Wrapper, :with_ctx, [ctx, {module, function_name, args}])
end
@doc """
Starts a task with a new child span.
See `Task.start/1` for more information.
"""
@spec start_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(() -> any())
) :: {:ok, pid()}
def start_with_span(name, start_opts, fun) do
ctx = OpenTelemetry.Ctx.get_current()
Task.start(fn ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.()
end
end)
end
@doc """
Starts a task with a new child span.
See `Task.start/3` for more information.
"""
@spec start_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()]
) :: {:ok, pid()}
def start_with_span(name, start_opts, module, function_name, args) do
ctx = OpenTelemetry.Ctx.get_current()
Task.start(Wrapper, :with_span, [name, start_opts, ctx, {module, function_name, args}])
end
@doc """
Starts a task with a new linked span.
See `Task.start/1` for more information.
"""
@spec start_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(() -> any())
) :: {:ok, pid()}
def start_with_linked_span(name, start_opts, fun) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.start(fn ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.()
end
end)
end
@doc """
Starts a task with a new linked span.
See `Task.start/3` for more information.
"""
@spec start_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()]
) :: {:ok, pid()}
def start_with_linked_span(name, start_opts, module, function_name, args) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.start(Wrapper, :with_linked_span, [name, start_opts, parent, {module, function_name, args}])
end
@doc """
Starts a task as part of a supervision tree with the given `fun`
with the current `t:OpenTelemetry.Ctx.t/0` attached.
See `Task.start_link/1` for more information.
"""
@spec start_link((() -> any())) :: {:ok, pid()}
def start_link(fun) do
ctx = OpenTelemetry.Ctx.get_current()
Task.start_link(fn ->
OpenTelemetry.Ctx.attach(ctx)
fun.()
end)
end
@doc """
Starts a task as part of a supervision tree with the given
`module`, `function`, and `args` with the current `t:OpenTelemetry.Ctx.t/0`
attached.
See `Task.start_link/3` for more information.
"""
@spec start_link(module(), atom(), [term()]) :: {:ok, pid()}
def start_link(module, function_name, args) do
ctx = OpenTelemetry.Ctx.get_current()
Task.start_link(Wrapper, :with_ctx, [ctx, {module, function_name, args}])
end
@doc """
Starts a task as part of a supervision tree with the given `fun`
in a new child span.
See `Task.start_link/1` for more information.
"""
@spec start_link_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(() -> any())
) :: {:ok, pid()}
def start_link_with_span(name, start_opts, fun) do
ctx = OpenTelemetry.Ctx.get_current()
Task.start_link(fn ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.()
end
end)
end
@doc """
Starts a task as part of a supervision tree with the given
`module`, `function`, and `args` in a new child span.
See `Task.start_link/3` for more information.
"""
@spec start_link_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()]
) :: {:ok, pid()}
def start_link_with_span(name, start_opts, module, function_name, args) do
ctx = OpenTelemetry.Ctx.get_current()
Task.start_link(Wrapper, :with_span, [name, start_opts, ctx, {module, function_name, args}])
end
@doc """
Starts a task as part of a supervision tree with the given `fun`
in a new linked span.
See `Task.start_link/1` for more information.
"""
@spec start_link_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(() -> any())
) :: {:ok, pid()}
def start_link_with_linked_span(name, start_opts, fun) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.start_link(fn ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.()
end
end)
end
@doc """
Starts a task as part of a supervision tree with the given
`module`, `function`, and `args` in a new linked span.
See `Task.start_link/3` for more information.
"""
@spec start_link_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()]
) :: {:ok, pid()}
def start_link_with_linked_span(name, start_opts, module, function_name, args) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.start_link(Wrapper, :with_linked_span, [name, start_opts, parent, {module, function_name, args}])
end
defdelegate await(task), to: Task
defdelegate await(task, timeout), to: Task
defdelegate await_many(tasks), to: Task
defdelegate await_many(tasks, timeout), to: Task
defdelegate child_spec(arg), to: Task
if Kernel.function_exported?(Task, :completed, 1) do
defdelegate completed(result), to: Task
end
if Kernel.function_exported?(Task, :ignore, 1) do
defdelegate ignore(task), to: Task
end
defdelegate shutdown(task), to: Task
defdelegate shutdown(task, timeout), to: Task
defdelegate yield_many(tasks), to: Task
defdelegate yield_many(tasks, timeout), to: Task
defdelegate yield(tasks), to: Task
defdelegate yield(tasks, timeout), to: Task
end

View File

@ -0,0 +1,836 @@
defmodule OpentelemetryProcessPropagator.Task.Supervisor do
@moduledoc """
`OpentelemetryProcessPropagator.Task.Supervisor` provides a set of extensions
to the `Task.Supervisor` module to reduce some boilerplate in propagating OpenTelemetry
contexts across process boundaries. Since these are extensions rather
than a replacement of Elixir's module, this library can be aliased
into a file without concern for creating spans where you do not want them.
Each `Task.Supervisor` function is replicated with two variants: `*_with_span`
and `*_with_linked_span`. Each of these variations has a specific use case.
The original implementation for each function automatically propagates the
current context.
* `*` - propagates the current context
* `*_with_span` - propagates the current context and starts a new child span.
* `*_with_linked_span` - propagates the current context and starts a new linked span.
> #### Module Redefinement {: .info}
>
> This module does not redefine the `Task.Supervisor` module, instead providing a wrapper of the module,
> so this functionality will not globally modify the default behavior of the `Task` module.
"""
alias OpentelemetryProcessPropagator.Task.Wrapper
require OpenTelemetry.Tracer
@doc false
defdelegate child_spec(opts), to: Task.Supervisor
@doc """
Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on.
See `Task.Supervisor.async/3` for more information.
"""
@spec async(Supervisor.supervisor(), (() -> any())) :: Task.t()
def async(supervisor, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async(
supervisor,
fn ->
OpenTelemetry.Ctx.attach(ctx)
fun.()
end,
options
)
end
@doc """
Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on.
See `Task.Supervisor.async/5` for more information.
"""
@spec async(Supervisor.supervisor(), module(), atom(), [term()]) :: Task.t()
def async(supervisor, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async(supervisor, Wrapper, :with_ctx, [ctx, {module, function_name, args}], options)
end
@doc """
Starts a task with a new child span that can be awaited on.
See `Task.Supervisor.async/3` for more information.
"""
@spec async_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
(() -> any())
) :: Task.t()
def async_with_span(name, start_opts, supervisor, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async(
supervisor,
fn ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.()
end
end,
options
)
end
@doc """
Starts a task with a new child span that can be awaited on.
See `Task.Supervisor.async/5` for more information.
"""
@spec async_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
module(),
atom(),
[term()]
) :: Task.t()
def async_with_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async(
supervisor,
Wrapper,
:with_span,
[name, start_opts, ctx, {module, function_name, args}],
options
)
end
@doc """
Starts a task with a new linked span that can be awaited on.
See `Task.Supervisor.async/3` for more information.
"""
@spec async_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
(() -> any())
) :: Task.t()
def async_with_linked_span(name, start_opts, supervisor, fun, options \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.async(
supervisor,
fn ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.()
end
end,
options
)
end
@doc """
Starts a task with a new linked span that can be awaited on.
See `Task.Supervisor.async/5` for more information.
"""
@spec async_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
module(),
atom(),
[term()]
) :: Task.t()
def async_with_linked_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.async(
supervisor,
Wrapper,
:with_linked_span,
[name, start_opts, parent, {module, function_name, args}],
options
)
end
@doc """
Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on.
See `Task.Supervisor.async_nolink/3` for more information.
"""
@spec async_nolink(Supervisor.supervisor(), (() -> any())) :: Task.t()
def async_nolink(supervisor, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_nolink(
supervisor,
fn ->
OpenTelemetry.Ctx.attach(ctx)
fun.()
end,
options
)
end
@doc """
Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on.
See `Task.Supervisor.async_nolink/5` for more information.
"""
@spec async_nolink(Supervisor.supervisor(), module(), atom(), [term()]) :: Task.t()
def async_nolink(supervisor, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_nolink(supervisor, Wrapper, :with_ctx, [ctx, {module, function_name, args}], options)
end
@doc """
Starts a task with a new child span that can be awaited on.
See `Task.Supervisor.async_nolink/3` for more information.
"""
@spec async_nolink_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
(() -> any())
) :: Task.t()
def async_nolink_with_span(name, start_opts, supervisor, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_nolink(
supervisor,
fn ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.()
end
end,
options
)
end
@doc """
Starts a task with a new child span that can be awaited on.
See `Task.Supervisor.async_nolink/5` for more information.
"""
@spec async_nolink_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
module(),
atom(),
[term()]
) :: Task.t()
def async_nolink_with_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_nolink(
supervisor,
Wrapper,
:with_span,
[name, start_opts, ctx, {module, function_name, args}],
options
)
end
@doc """
Starts a task with a new linked span that can be awaited on.
See `Task.Supervisor.async_nolink/3` for more information.
"""
@spec async_nolink_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
(() -> any())
) :: Task.t()
def async_nolink_with_linked_span(name, start_opts, supervisor, fun, options \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.async_nolink(
supervisor,
fn ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.()
end
end,
options
)
end
@doc """
Starts a task with a new linked span that can be awaited on.
See `Task.Supervisor.async_nolink/5` for more information.
"""
@spec async_nolink_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
module(),
atom(),
[term()]
) :: Task.t()
def async_nolink_with_linked_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.async_nolink(
supervisor,
Wrapper,
:with_linked_span,
[name, start_opts, parent, {module, function_name, args}],
options
)
end
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with the current `t:OpenTelemetry.Ctx.t/0`
attached.
See `Task.Supervisor.async_stream/4` for more information.
"""
@spec async_stream(
Supervisor.supervisor(),
Enumerable.t(),
(term() -> term()),
keyword()
) :: Enumerable.t()
def async_stream(supervisor, enumerable, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_stream(
supervisor,
enumerable,
fn arg ->
OpenTelemetry.Ctx.attach(ctx)
fun.(arg)
end,
options
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with the
current `t:OpenTelemetry.Ctx.t/0` attached.
See `Task.Supervisor.async_stream/6` for more information.
"""
@spec async_stream(
Supervisor.supervisor(),
Enumerable.t(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream(supervisor, enumerable, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_stream(
supervisor,
enumerable,
Wrapper,
:with_ctx,
[ctx, {module, function_name, args}],
options
)
end
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with a new child span.
See `Task.Supervisor.async_stream/4` for more information.
"""
@spec async_stream_with_span(
Supervisor.supervisor(),
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(term() -> term()),
keyword()
) :: Enumerable.t()
def async_stream_with_span(supervisor, enumerable, name, start_opts, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_stream(
supervisor,
enumerable,
fn arg ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.(arg)
end
end,
options
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with a new child span.
See `Task.Supervisor.async_stream/6` for more information.
"""
@spec async_stream_with_span(
Supervisor.supervisor(),
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream_with_span(supervisor, enumerable, name, start_opts, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_stream(
supervisor,
enumerable,
Wrapper,
:with_span,
[name, start_opts, ctx, {module, function_name, args}],
options
)
end
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with a new linked span.
See `Task.Supervisor.async_stream/4` for more information.
"""
@spec async_stream_with_linked_span(
Supervisor.supervisor(),
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(term() -> term()),
keyword()
) :: Enumerable.t()
def async_stream_with_linked_span(supervisor, enumerable, name, start_opts, fun, options \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.async_stream(
supervisor,
enumerable,
fn arg ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.(arg)
end
end,
options
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with a new linked span.
See `Task.Supervisor.async_stream/6` for more information.
"""
@spec async_stream_with_linked_span(
Supervisor.supervisor(),
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream_with_linked_span(
supervisor,
enumerable,
name,
start_opts,
module,
function_name,
args,
options \\ []
) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.async_stream(
supervisor,
enumerable,
Wrapper,
:with_linked_span,
[name, start_opts, parent, {module, function_name, args}],
options
)
end
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with the current `t:OpenTelemetry.Ctx.t/0`
attached.
See `Task.Supervisor.async_stream_nolink/4` for more information.
"""
@spec async_stream_nolink(
Supervisor.supervisor(),
Enumerable.t(),
(term() -> term()),
keyword()
) :: Enumerable.t()
def async_stream_nolink(supervisor, enumerable, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_stream_nolink(
supervisor,
enumerable,
fn arg ->
OpenTelemetry.Ctx.attach(ctx)
fun.(arg)
end,
options
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with the
current `t:OpenTelemetry.Ctx.t/0` attached.
See `Task.Supervisor.async_stream_nolink/6` for more information.
"""
@spec async_stream_nolink(
Supervisor.supervisor(),
Enumerable.t(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream_nolink(supervisor, enumerable, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_stream_nolink(
supervisor,
enumerable,
Wrapper,
:with_ctx,
[ctx, {module, function_name, args}],
options
)
end
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with a new child span.
See `Task.Supervisor.async_stream_nolink/4` for more information.
"""
@spec async_stream_nolink_with_span(
Supervisor.supervisor(),
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(term() -> term()),
keyword()
) :: Enumerable.t()
def async_stream_nolink_with_span(supervisor, enumerable, name, start_opts, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_stream_nolink(
supervisor,
enumerable,
fn arg ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.(arg)
end
end,
options
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with a new child span.
See `Task.Supervisor.async_stream_nolink/6` for more information.
"""
@spec async_stream_nolink_with_span(
Supervisor.supervisor(),
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream_nolink_with_span(
supervisor,
enumerable,
name,
start_opts,
module,
function_name,
args,
options \\ []
) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_stream_nolink(
supervisor,
enumerable,
Wrapper,
:with_span,
[name, start_opts, ctx, {module, function_name, args}],
options
)
end
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with a new linked span.
See `Task.Supervisor.async_stream_nolink/4` for more information.
"""
@spec async_stream_nolink_with_linked_span(
Supervisor.supervisor(),
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(term() -> term()),
keyword()
) :: Enumerable.t()
def async_stream_nolink_with_linked_span(supervisor, enumerable, name, start_opts, fun, options \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.async_stream_nolink(
supervisor,
enumerable,
fn arg ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.(arg)
end
end,
options
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with a new linked span.
See `Task.Supervisor.async_stream_nolink/6` for more information.
"""
@spec async_stream_nolink_with_linked_span(
Supervisor.supervisor(),
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream_nolink_with_linked_span(
supervisor,
enumerable,
name,
start_opts,
module,
function_name,
args,
options \\ []
) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.async_stream_nolink(
supervisor,
enumerable,
Wrapper,
:with_linked_span,
[name, start_opts, parent, {module, function_name, args}],
options
)
end
@doc """
Starts a task as a child of the given `supervisor` with the
current `t:OpenTelemetry.Ctx.t/0`.
See `Task.Supervisor.start_child/3` for more information.
"""
@spec start_child(
Supervisor.supervisor(),
(() -> any()),
keyword()
) :: DynamicSupervisor.on_start_child()
def start_child(supervisor, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.start_child(
supervisor,
fn ->
OpenTelemetry.Ctx.attach(ctx)
fun.()
end,
options
)
end
@doc """
Starts a task as a child of the given `supervisor` with the
current `t:OpenTelemetry.Ctx.t/0`.
See `Task.Supervisor.start_child/5` for more information.
"""
@spec start_child(
Supervisor.supervisor(),
module(),
atom(),
[term()],
keyword()
) :: DynamicSupervisor.on_start_child()
def start_child(supervisor, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.start_child(supervisor, Wrapper, :with_ctx, [ctx, {module, function_name, args}], options)
end
@doc """
Starts a task as a child of the given `supervisor` in a new child span.
See `Task.Supervisor.start_child/3` for more information.
"""
@spec start_child_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
(() -> any()),
keyword()
) :: DynamicSupervisor.on_start_child()
def start_child_with_span(name, start_opts, supervisor, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.start_child(
supervisor,
fn ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.()
end
end,
options
)
end
@doc """
Starts a task as a child of the given `supervisor` in a new child span.
See `Task.Supervisor.start_child/5` for more information.
"""
@spec start_child_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
module(),
atom(),
[term()],
keyword()
) :: DynamicSupervisor.on_start_child()
def start_child_with_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.start_child(
supervisor,
Wrapper,
:with_span,
[name, start_opts, ctx, {module, function_name, args}],
options
)
end
@doc """
Starts a task as a child of the given `supervisor` in a new linked span.
See `Task.Supervisor.start_child/3` for more information.
"""
@spec start_child_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
(() -> any()),
keyword()
) :: DynamicSupervisor.on_start_child()
def start_child_with_linked_span(name, start_opts, supervisor, fun, options \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.start_child(
supervisor,
fn ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.()
end
end,
options
)
end
@doc """
Starts a task as a child of the given `supervisor` in a new linked span.
See `Task.Supervisor.start_child/5` for more information.
"""
@spec start_child_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
module(),
atom(),
[term()],
keyword()
) :: DynamicSupervisor.on_start_child()
def start_child_with_linked_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.start_child(
supervisor,
Wrapper,
:with_linked_span,
[name, start_opts, parent, {module, function_name, args}],
options
)
end
defdelegate children(supervisor), to: Task.Supervisor
defdelegate start_link(), to: Task.Supervisor
defdelegate terminate_child(supervisor, pid), to: Task.Supervisor
end

View File

@ -0,0 +1,89 @@
defmodule OpentelemetryProcessPropagator.Task.Wrapper do
@moduledoc false
require OpenTelemetry.Tracer
@spec with_ctx(OpenTelemetry.Ctx.t(), {module(), atom(), [term()]}) :: any()
def with_ctx(ctx, {m, f, a}) do
OpenTelemetry.Ctx.attach(ctx)
apply(m, f, a)
end
# for streams which prepend the value to the given arguments
@spec with_ctx(term(), OpenTelemetry.Ctx.t(), {module(), atom(), [term()]}) :: any()
def with_ctx(value, ctx, {m, f, a}) do
OpenTelemetry.Ctx.attach(ctx)
apply(m, f, [value | a])
end
@spec with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
OpenTelemetry.Ctx.t(),
{module(), atom(), [term()]}
) :: any()
def with_span(name, start_opts, ctx, {m, f, a}) do
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
apply(m, f, a)
end
end
# for streams which prepend the value to the given arguments
@spec with_span(
term(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
OpenTelemetry.Ctx.t(),
{module(), atom(), [term()]}
) :: any()
def with_span(value, name, start_opts, ctx, {m, f, a}) do
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
apply(m, f, [value | a])
end
end
@spec with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
OpenTelemetry.Ctx.t(),
{module(), atom(), [term()]}
) :: any()
def with_linked_span(name, start_opts, parent, {m, f, a}) do
links =
if parent == :undefined do
[]
else
[OpenTelemetry.link(parent)]
end
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, links) do
apply(m, f, a)
end
end
# for streams which prepend the value to the given arguments
@spec with_linked_span(
term(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
OpenTelemetry.Ctx.t(),
{module(), atom(), [term()]}
) :: any()
def with_linked_span(value, name, start_opts, parent, {m, f, a}) do
links =
if parent == :undefined do
[]
else
[OpenTelemetry.link(parent)]
end
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, links) do
apply(m, f, [value | a])
end
end
end

View File

@ -9,7 +9,7 @@ defmodule OpentelemetryProcessPropagator.MixProject do
app: app,
version: to_string(Keyword.fetch!(desc, :vsn)),
description: to_string(Keyword.fetch!(desc, :description)),
elixir: "~> 1.10",
elixir: "~> 1.11",
start_permanent: Mix.env() == :prod,
deps: deps(Keyword.fetch!(config, :deps)),
name: "Opentelemetry Process Propagator",

View File

@ -1,21 +1,21 @@
%{
"acceptor_pool": {:hex, :acceptor_pool, "1.0.0", "43c20d2acae35f0c2bcd64f9d2bde267e459f0f3fd23dab26485bf518c281b21", [:rebar3], [], "hexpm", "0cbcd83fdc8b9ad2eee2067ef8b91a14858a5883cb7cd800e6fcd5803e158788"},
"chatterbox": {:hex, :ts_chatterbox, "0.11.0", "b8f372c706023eb0de5bf2976764edb27c70fe67052c88c1f6a66b3a5626847f", [:rebar3], [{:hpack, "~>0.2.3", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "722fe2bad52913ab7e87d849fc6370375f0c961ffb2f0b5e6d647c9170c382a6"},
"chatterbox": {:hex, :ts_chatterbox, "0.12.0", "4e54f199e15c0320b85372a24e35554a2ccfc4342e0b7cd8daed9a04f9b8ef4a", [:rebar3], [{:hpack, "~>0.2.3", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "6478c161bc60244f41cd5847cc3accd26d997883e9f7facd36ff24533b2fa579"},
"ctx": {:hex, :ctx, "0.6.0", "8ff88b70e6400c4df90142e7f130625b82086077a45364a78d208ed3ed53c7fe", [:rebar3], [], "hexpm", "a14ed2d1b67723dbebbe423b28d7615eb0bdcba6ff28f2d1f1b0a7e1d4aa5fc2"},
"dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"},
"earmark_parser": {:hex, :earmark_parser, "1.4.24", "344f8d2a558691d3fcdef3f9400157d7c4b3b8e58ee5063297e9ae593e8326d9", [:mix], [], "hexpm", "1f6451b0116dd270449c8f5b30289940ee9c0a39154c783283a08e55af82ea34"},
"dialyxir": {:hex, :dialyxir, "1.2.0", "58344b3e87c2e7095304c81a9ae65cb68b613e28340690dfe1a5597fd08dec37", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "61072136427a851674cab81762be4dbeae7679f85b1272b6d25c3a839aff8463"},
"earmark_parser": {:hex, :earmark_parser, "1.4.26", "f4291134583f373c7d8755566122908eb9662df4c4b63caa66a0eabe06569b0a", [:mix], [], "hexpm", "48d460899f8a0c52c5470676611c01f64f3337bad0b26ddab43648428d94aabc"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.28.3", "6eea2f69995f5fba94cd6dd398df369fe4e777a47cd887714a0976930615c9e6", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "05387a6a2655b5f9820f3f627450ed20b4325c25977b2ee69bed90af6688e718"},
"ex_doc": {:hex, :ex_doc, "0.28.5", "3e52a6d2130ce74d096859e477b97080c156d0926701c13870a4e1f752363279", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d2c4b07133113e9aa3e9ba27efb9088ba900e9e51caa383919676afdf09ab181"},
"gproc": {:hex, :gproc, "0.8.0", "cea02c578589c61e5341fce149ea36ccef236cc2ecac8691fba408e7ea77ec2f", [:rebar3], [], "hexpm", "580adafa56463b75263ef5a5df4c86af321f68694e7786cb057fd805d1e2a7de"},
"grpcbox": {:hex, :grpcbox, "0.14.0", "3eb321bcd2275baf8b54cf381feb7b0559a50c02544de28fda039c7f2f9d1a7a", [:rebar3], [{:acceptor_pool, "~>1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~>0.11.0", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~>0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~>0.8.0", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "e24159b7b6d3f9869bbe528845c0125fed2259366ba908fd04a1f45fe81d0660"},
"grpcbox": {:hex, :grpcbox, "0.15.0", "97c7126296a091602d372ebf5860a04f7bc795b45b33a984cad2b8e362774fd8", [:rebar3], [{:acceptor_pool, "~>1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~>0.12.0", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~>0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~>0.8.0", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "161abe9e17e7d1982efa6488adeaa13c3e847a07984a6e6b224e553368918647"},
"hpack": {:hex, :hpack_erl, "0.2.3", "17670f83ff984ae6cd74b1c456edde906d27ff013740ee4d9efaa4f1bf999633", [:rebar3], [], "hexpm", "06f580167c4b8b8a6429040df36cc93bba6d571faeaec1b28816523379cbb23a"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
"opentelemetry": {:hex, :opentelemetry, "1.0.2", "9ffa9ddcbec9356154681bc9d0a54bb20f0de0e8c6696ccc298b49633308782b", [:rebar3], [{:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "e774506333c8afd9133fee4ba69d06e6413ccad6ee76f10e59c3ec624009af23"},
"opentelemetry_api": {:hex, :opentelemetry_api, "1.0.2", "91353ee40583b1d4f07d7b13ed62642abfec6aaa0d8a2114f07edafb2df781c5", [:mix, :rebar3], [], "hexpm", "2a8247f85c44216b883900067478d59955d11e58e5cfca7c884cd4f203ace3ac"},
"opentelemetry_exporter": {:hex, :opentelemetry_exporter, "1.0.2", "19a102d1f04776399a915be27121852468318c27146e553faf28008e3e474972", [:rebar3], [{:grpcbox, ">= 0.0.0", [hex: :grpcbox, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.0", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.11", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}], "hexpm", "43de904dd7f482009bf1a40d591ab5ed25f603f1072a04a162e87f7f8c08dbb5"},
"opentelemetry": {:hex, :opentelemetry, "1.0.5", "f0cd36ac8b30b68e8d70cec5bb88801ed7f3fe79aac67597054ed5490542e810", [:rebar3], [{:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "3b17f8933a58e1246f42a0c215840fd8218aebbcabdb0aac62b0c766fe85542e"},
"opentelemetry_api": {:hex, :opentelemetry_api, "1.0.3", "77f9644c42340cd8b18c728cde4822ed55ae136f0d07761b78e8c54da46af93a", [:mix, :rebar3], [], "hexpm", "4293e06bd369bc004e6fad5edbb56456d891f14bd3f9f1772b18f1923e0678ea"},
"opentelemetry_exporter": {:hex, :opentelemetry_exporter, "1.0.4", "60a64c75633a82b6c36a20043be355ac72a7b9b21633edd47407924c5596dde0", [:rebar3], [{:grpcbox, ">= 0.0.0", [hex: :grpcbox, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.0", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.11", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}], "hexpm", "61da65290fbb6cac3459b84b8cd630795bf608df93a2b2cc49251cae78200e5e"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
"tls_certificate_check": {:hex, :tls_certificate_check, "1.13.0", "c407200ca837df4e6cf533874e9c4c8bcde2e71520ab215856bdaec9c7fb9252", [:rebar3], [{:ssl_verify_fun, "1.1.6", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "b45a3117931b808fdaea580f6e7ed1a7889c171dae792e632a166d6dda6e88a2"},
"tls_certificate_check": {:hex, :tls_certificate_check, "1.15.0", "1c0377617a1111000bca3f4cd530b62690c9bd2dc9b868b4459203cd4d7f16ab", [:rebar3], [{:ssl_verify_fun, "1.1.6", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "87fd2e865078fdf8913a8c27bd8fe2be986383e31011f21d7f92cc5f7bc90731"},
}

View File

@ -0,0 +1,746 @@
defmodule OpentelemetryProcessPropagator.TaskSupervisorTest do
use ExUnit.Case
alias OpentelemetryProcessPropagator.Task
require OpenTelemetry.Tracer, as: Tracer
require OpenTelemetry.Span
require Record
for {name, spec} <- Record.extract_all(from_lib: "opentelemetry/include/otel_span.hrl") do
Record.defrecord(name, spec)
end
for {name, spec} <- Record.extract_all(from_lib: "opentelemetry_api/include/opentelemetry.hrl") do
Record.defrecord(name, spec)
end
setup do
:application.stop(:opentelemetry)
:application.set_env(:opentelemetry, :tracer, :otel_tracer_default)
:application.set_env(:opentelemetry, :processors, [
{:otel_batch_processor, %{scheduled_delay_ms: 1}}
])
:application.start(:opentelemetry)
:otel_batch_processor.set_exporter(:otel_exporter_pid, self())
:ok
end
defmodule OtherAppModule do
def fun_with_span(value) do
Tracer.with_span "already traced fun", %{attributes: %{value: value}} do
value
end
end
end
describe "async_stream" do
test "async_stream" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.async_stream(Test.TaskSupervisor, ["a", "b"], fn value ->
OtherAppModule.fun_with_span(value)
end)
|> Stream.run()
end
assert_receive {:span, span(name: "parent span", span_id: root_span_id)}
assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs1)}
assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs2)}
assert [%{value: "a"}, %{value: "b"}] == Enum.sort([attributes(attrs1), attributes(attrs2)])
end
test "async_stream/5" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.async_stream(Test.TaskSupervisor, ["a", "b"], __MODULE__, :stream_ctx_test_function, [
:async_stream
])
|> Stream.run()
end
assert_receive {:span, span(name: "parent span", attributes: attrs)}
assert %{} == attributes(attrs)
assert_receive {:span, span(name: "already traced fun", attributes: attrs1)}
assert_receive {:span, span(name: "already traced fun", attributes: attrs2)}
assert [%{value: "a"}, %{value: "b"}] == Enum.sort([attributes(attrs1), attributes(attrs2)])
end
test "async_stream_with_span/6" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.async_stream_with_span(
Test.TaskSupervisor,
["a", "b"],
"task span",
%{attributes: %{a: 1}},
fn value ->
Tracer.set_attribute(:value, value)
value
end
)
|> Stream.run()
end
assert_receive {:span, span(span_id: root_span_id, name: "parent span")}, 500
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs1)}, 500
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs2)}, 500
assert [%{value: "a", a: 1}, %{value: "b", a: 1}] == Enum.sort([attributes(attrs1), attributes(attrs2)])
end
test "async_stream_with_span/8" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.async_stream_with_span(
Test.TaskSupervisor,
["a", "b"],
"task span",
%{attributes: %{a: 1}},
__MODULE__,
:stream_test_function,
[:async_stream_with_span]
)
|> Stream.run()
end
assert_receive {:span, span(span_id: root_span_id, name: "parent span")}, 500
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}, 500
assert %{value: val1, a: 1} = attributes(attrs)
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}, 500
assert %{value: val2, a: 1} = attributes(attrs)
assert ["a", "b"] == Enum.sort([val1, val2])
end
test "async_stream_with_linked_span/6" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.async_stream_with_linked_span(
Test.TaskSupervisor,
["a", "b"],
"task span",
%{attributes: %{a: 1}},
fn value ->
Tracer.set_attribute(:value, value)
:ok
end
)
|> Stream.run()
end
assert_receive {:span, span(name: "parent span")}, 500
assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500
assert %{value: val1, a: 1} = attributes(attrs)
assert [] != links
assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500
assert %{value: val2, a: 1} = attributes(attrs)
assert [] != links
assert ["a", "b"] == Enum.sort([val1, val2])
end
test "async_stream_with_linked_span/8" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.async_stream_with_linked_span(
Test.TaskSupervisor,
["a", "b"],
"task span",
%{attributes: %{a: 1}},
__MODULE__,
:stream_test_function,
[:async_stream_with_linked_span]
)
|> Stream.run()
end
assert_receive {:span, span(name: "parent span")}, 500
assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500
assert %{a: 1, value: val1} = attributes(attrs)
assert [] != links
assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500
assert %{a: 1, value: val2} = attributes(attrs)
assert [] != links
assert ["a", "b"] == Enum.sort([val1, val2])
end
end
describe "async_stream_nolink" do
test "async_stream_nolink" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.async_stream_nolink(Test.TaskSupervisor, ["a", "b"], fn value ->
OtherAppModule.fun_with_span(value)
end)
|> Stream.run()
end
assert_receive {:span, span(name: "parent span", span_id: root_span_id)}
assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs1)}
assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs2)}
assert [%{value: "a"}, %{value: "b"}] == Enum.sort([attributes(attrs1), attributes(attrs2)])
end
test "async_stream_nolink/5" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.async_stream_nolink(
Test.TaskSupervisor,
["a", "b"],
__MODULE__,
:stream_ctx_test_function,
[
:async_stream_nolink
]
)
|> Stream.run()
end
assert_receive {:span, span(name: "parent span", attributes: attrs)}
assert %{} == attributes(attrs)
assert_receive {:span, span(name: "already traced fun", attributes: attrs1)}
assert_receive {:span, span(name: "already traced fun", attributes: attrs2)}
assert [%{value: "a"}, %{value: "b"}] == Enum.sort([attributes(attrs1), attributes(attrs2)])
end
test "async_stream_nolink_with_span/6" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.async_stream_nolink_with_span(
Test.TaskSupervisor,
["a", "b"],
"task span",
%{attributes: %{a: 1}},
fn value ->
Tracer.set_attribute(:value, value)
value
end
)
|> Stream.run()
end
assert_receive {:span, span(span_id: root_span_id, name: "parent span")}, 500
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs1)}, 500
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs2)}, 500
assert [%{value: "a", a: 1}, %{value: "b", a: 1}] == Enum.sort([attributes(attrs1), attributes(attrs2)])
end
test "async_stream_nolink_with_span/8" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.async_stream_nolink_with_span(
Test.TaskSupervisor,
["a", "b"],
"task span",
%{attributes: %{a: 1}},
__MODULE__,
:stream_test_function,
[:async_stream_nolink_with_span]
)
|> Stream.run()
end
assert_receive {:span, span(span_id: root_span_id, name: "parent span")}, 500
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}, 500
assert %{value: val1, a: 1} = attributes(attrs)
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}, 500
assert %{value: val2, a: 1} = attributes(attrs)
assert ["a", "b"] == Enum.sort([val1, val2])
end
test "async_stream_nolink_with_linked_span/6" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.async_stream_nolink_with_linked_span(
Test.TaskSupervisor,
["a", "b"],
"task span",
%{attributes: %{a: 1}},
fn value ->
Tracer.set_attribute(:value, value)
:ok
end
)
|> Stream.run()
end
assert_receive {:span, span(name: "parent span")}, 500
assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500
assert %{value: val1, a: 1} = attributes(attrs)
assert [] != links
assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500
assert %{value: val2, a: 1} = attributes(attrs)
assert [] != links
assert ["a", "b"] == Enum.sort([val1, val2])
end
test "async_stream_nolink_with_linked_span/8" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.async_stream_nolink_with_linked_span(
Test.TaskSupervisor,
["a", "b"],
"task span",
%{attributes: %{a: 1}},
__MODULE__,
:stream_test_function,
[:async_stream_nolink_with_linked_span]
)
|> Stream.run()
end
assert_receive {:span, span(name: "parent span")}, 500
assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500
assert %{a: 1, value: val1} = attributes(attrs)
assert [] != links
assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500
assert %{a: 1, value: val2} = attributes(attrs)
assert [] != links
assert ["a", "b"] == Enum.sort([val1, val2])
end
end
describe "async" do
test "async" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span", %{attributes: %{a: 1}} do
Task.Supervisor.async(Test.TaskSupervisor, fn ->
Tracer.set_attribute(:value, :async)
:ok
end)
|> Task.await()
end
assert_receive {:span, span(name: "parent span", attributes: attrs)}
assert %{a: 1, value: :async} == attributes(attrs)
end
test "async_mfa" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span", %{attributes: %{a: 1}} do
Task.Supervisor.async(Test.TaskSupervisor, __MODULE__, :test_function, [:async])
|> Task.await()
end
assert_receive {:span, span(name: "parent span", attributes: attrs)}
assert %{a: 1, value: :async} == attributes(attrs)
end
test "async_with_span" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.async_with_span("task span", %{attributes: %{a: 1}}, Test.TaskSupervisor, fn ->
Tracer.set_attribute(:value, :async_with_span)
:ok
end)
|> Task.await()
end
assert_receive {:span, span(span_id: root_span_id, name: "parent span")}
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}
assert %{a: 1, value: :async_with_span} == attributes(attrs)
end
test "async_with_span_mfa" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
result =
Task.Supervisor.async_with_span(
"task span",
%{attributes: %{a: 1}},
Test.TaskSupervisor,
__MODULE__,
:test_function,
[:async_with_span]
)
|> Task.await()
assert :async_with_span == result
end
assert_receive {:span, span(span_id: root_span_id, name: "parent span")}
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}
assert %{a: 1, value: :async_with_span} == attributes(attrs)
end
test "async_with_linked_span" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.async_with_linked_span("linked task span", %{attributes: %{a: 1}}, Test.TaskSupervisor, fn ->
Tracer.set_attribute(:value, :async_with_linked_span)
:ok
end)
|> Task.await()
end
assert_receive {:span, span(name: "parent span")}
assert_receive {:span,
span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")}
assert %{a: 1, value: :async_with_linked_span} == attributes(attrs)
assert [] != links
end
test "async_with_linked_span_mfa" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
result =
Task.Supervisor.async_with_linked_span(
"linked task span",
%{attributes: %{a: 1}},
Test.TaskSupervisor,
__MODULE__,
:test_function,
[
:async_with_linked_span
]
)
|> Task.await()
assert :async_with_linked_span == result
end
assert_receive {:span, span(name: "parent span")}
assert_receive {:span,
span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")}
assert %{a: 1, value: :async_with_linked_span} == attributes(attrs)
assert [] != links
end
end
describe "async_nolink" do
test "async_nolink" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span", %{attributes: %{a: 1}} do
Task.Supervisor.async_nolink(Test.TaskSupervisor, fn ->
Tracer.set_attribute(:value, :async_nolink)
:ok
end)
|> Task.await()
end
assert_receive {:span, span(name: "parent span", attributes: attrs)}
assert %{a: 1, value: :async_nolink} == attributes(attrs)
end
test "async_nolink_mfa" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span", %{attributes: %{a: 1}} do
Task.Supervisor.async_nolink(Test.TaskSupervisor, __MODULE__, :ctx_test_function, [
:async_nolink
])
|> Task.await()
end
assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)}
assert %{a: 1} == attributes(attrs)
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "already traced fun", attributes: attrs)}
assert %{value: :async_nolink} == attributes(attrs)
end
test "async_nolink_with_span" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.async_nolink_with_span("task span", %{attributes: %{a: 1}}, Test.TaskSupervisor, fn ->
Tracer.set_attribute(:value, :async_nolink_with_span)
:ok
end)
|> Task.await()
end
assert_receive {:span, span(span_id: root_span_id, name: "parent span")}
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}
assert %{a: 1, value: :async_nolink_with_span} == attributes(attrs)
end
test "async_nolink_with_span_mfa" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
result =
Task.Supervisor.async_nolink_with_span(
"task span",
%{attributes: %{a: 1}},
Test.TaskSupervisor,
__MODULE__,
:test_function,
[:async_nolink_with_span]
)
|> Task.await()
assert :async_nolink_with_span == result
end
assert_receive {:span, span(span_id: root_span_id, name: "parent span")}
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}
assert %{a: 1, value: :async_nolink_with_span} == attributes(attrs)
end
test "async_nolink_with_linked_span" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.async_nolink_with_linked_span(
"linked task span",
%{attributes: %{a: 1}},
Test.TaskSupervisor,
fn ->
Tracer.set_attribute(:value, :async_nolink_with_linked_span)
:ok
end
)
|> Task.await()
end
assert_receive {:span, span(name: "parent span")}
assert_receive {:span,
span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")}
assert %{a: 1, value: :async_nolink_with_linked_span} == attributes(attrs)
assert [] != links
end
test "async_nolink_with_linked_span_mfa" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
result =
Task.Supervisor.async_nolink_with_linked_span(
"linked task span",
%{attributes: %{a: 1}},
Test.TaskSupervisor,
__MODULE__,
:test_function,
[
:async_nolink_with_linked_span
]
)
|> Task.await()
assert :async_nolink_with_linked_span == result
end
assert_receive {:span, span(name: "parent span")}
assert_receive {:span,
span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")}
assert %{a: 1, value: :async_nolink_with_linked_span} == attributes(attrs)
assert [] != links
end
end
describe "start_child" do
test "start_child" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span", %{attributes: %{a: 1}} do
Task.Supervisor.start_child(Test.TaskSupervisor, fn ->
ctx_test_function(:start_child)
end)
end
assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)}
assert %{a: 1} == attributes(attrs)
assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs)}
assert %{value: :start_child} == attributes(attrs)
end
test "start_child_mfa" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span", %{attributes: %{a: 1}} do
Task.Supervisor.start_child(Test.TaskSupervisor, __MODULE__, :ctx_test_function, [
:start_child
])
end
assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)}
assert %{a: 1} == attributes(attrs)
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "already traced fun", attributes: attrs)}
assert %{value: :start_child} == attributes(attrs)
end
test "start_child_with_span" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.start_child_with_span("task span", %{attributes: %{a: 1}}, Test.TaskSupervisor, fn ->
Tracer.set_attribute(:value, :start_child_with_span)
:ok
end)
end
assert_receive {:span, span(span_id: root_span_id, name: "parent span")}
assert_receive {:span, span(parent_span_id: ^root_span_id, attributes: attrs, name: "task span")}
assert %{a: 1, value: :start_child_with_span} == attributes(attrs)
end
test "start_child_with_span_mfa" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.start_child_with_span(
"task span",
%{attributes: %{a: 1}},
Test.TaskSupervisor,
__MODULE__,
:test_function,
[:start_child_with_span]
)
end
assert_receive {:span, span(span_id: root_span_id, name: "parent span")}
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}
assert %{a: 1, value: :start_child_with_span} == attributes(attrs)
end
test "start_child_with_linked_span" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.start_child_with_linked_span(
"linked task span",
%{attributes: %{a: 1}},
Test.TaskSupervisor,
fn ->
Tracer.set_attribute(:value, :start_child_with_linked_span)
:ok
end
)
end
assert_receive {:span, span(name: "parent span")}
assert_receive {:span,
span(parent_span_id: :undefined, attributes: attrs, links: links, name: "linked task span")}
assert %{a: 1, value: :start_child_with_linked_span} == attributes(attrs)
assert [] != links
end
test "start_child_with_linked_span_mfa" do
start_supervised!({Task.Supervisor, name: Test.TaskSupervisor})
Tracer.with_span "parent span" do
Task.Supervisor.start_child_with_linked_span(
"linked task span",
%{attributes: %{a: 1}},
Test.TaskSupervisor,
__MODULE__,
:test_function,
[
:start_child_with_linked_span
]
)
end
assert_receive {:span, span(name: "parent span")}
assert_receive {:span,
span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")}
assert %{a: 1, value: :start_child_with_linked_span} == attributes(attrs)
assert [] != links
end
end
def test_function(value) do
Tracer.set_attributes(%{value: value})
value
end
def ctx_test_function(value) do
OtherAppModule.fun_with_span(value)
end
def stream_ctx_test_function(value, _args) do
OtherAppModule.fun_with_span(value)
end
def stream_test_function(value, _args) do
Tracer.set_attributes(%{value: value})
value
end
defp attributes({_, _, _, _, attributes}), do: attributes
end

View File

@ -0,0 +1,472 @@
defmodule OpentelemetryProcessPropagator.TaskTest do
use ExUnit.Case
alias OpentelemetryProcessPropagator.Task
require OpenTelemetry.Tracer, as: Tracer
require OpenTelemetry.Span
require Record
for {name, spec} <- Record.extract_all(from_lib: "opentelemetry/include/otel_span.hrl") do
Record.defrecord(name, spec)
end
for {name, spec} <- Record.extract_all(from_lib: "opentelemetry_api/include/opentelemetry.hrl") do
Record.defrecord(name, spec)
end
setup do
:application.stop(:opentelemetry)
:application.set_env(:opentelemetry, :tracer, :otel_tracer_default)
:application.set_env(:opentelemetry, :processors, [
{:otel_batch_processor, %{scheduled_delay_ms: 1}}
])
:application.start(:opentelemetry)
:otel_batch_processor.set_exporter(:otel_exporter_pid, self())
:ok
end
defmodule OtherAppModule do
def fun_with_span(value) do
Tracer.with_span "already traced fun", %{attributes: %{value: value}} do
value
end
end
end
describe "async_stream" do
test "async_stream/3" do
Tracer.with_span "parent span" do
Task.async_stream(["a", "b"], fn value ->
OtherAppModule.fun_with_span(value)
end)
|> Stream.run()
end
assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)}
assert %{} = attributes(attrs)
assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs)}
%{value: val1} = attributes(attrs)
assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs)}
%{value: val2} = attributes(attrs)
assert ["a", "b"] == Enum.sort([val1, val2])
end
test "async_stream/5" do
Tracer.with_span "parent span" do
Task.async_stream(["a", "b"], __MODULE__, :stream_ctx_test_function, [
:async_stream
])
|> Stream.run()
end
assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)}
assert %{} = attributes(attrs)
assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs)}
%{value: val1} = attributes(attrs)
assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs)}
%{value: val2} = attributes(attrs)
assert ["a", "b"] == Enum.sort([val1, val2])
end
test "async_stream_with_span/5" do
Tracer.with_span "parent span" do
Task.async_stream_with_span(["a", "b"], "task span", %{attributes: %{a: 1}}, fn value ->
Tracer.set_attribute(:value, value)
value
end)
|> Stream.run()
end
assert_receive {:span, span(span_id: root_span_id, name: "parent span")}, 500
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}, 500
assert %{value: val1, a: 1} = attributes(attrs)
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}, 500
assert %{value: val2, a: 1} = attributes(attrs)
assert ["a", "b"] == Enum.sort([val1, val2])
end
test "async_stream_with_span/7" do
Tracer.with_span "parent span" do
Task.async_stream_with_span(
["a", "b"],
"task span",
%{attributes: %{a: 1}},
__MODULE__,
:stream_test_function,
[:async_stream_with_span]
)
|> Stream.run()
end
assert_receive {:span, span(span_id: root_span_id, name: "parent span")}, 500
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}, 500
assert %{value: val1, a: 1} = attributes(attrs)
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}, 500
assert %{value: val2, a: 1} = attributes(attrs)
assert ["a", "b"] == Enum.sort([val1, val2])
end
test "async_stream_with_linked_span/5" do
Tracer.with_span "parent span" do
Task.async_stream_with_linked_span(["a", "b"], "task span", %{attributes: %{a: 1}}, fn value ->
Tracer.set_attribute(:value, value)
:ok
end)
|> Stream.run()
end
assert_receive {:span, span(name: "parent span")}, 500
assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500
assert %{value: val1, a: 1} = attributes(attrs)
assert [] != links
assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500
assert %{value: val2, a: 1} = attributes(attrs)
assert [] != links
assert ["a", "b"] == Enum.sort([val1, val2])
end
test "async_stream_with_linked_span/7" do
Tracer.with_span "parent span" do
Task.async_stream_with_linked_span(
["a", "b"],
"task span",
%{attributes: %{a: 1}},
__MODULE__,
:stream_test_function,
[:async_stream_with_linked_span]
)
|> Stream.run()
end
assert_receive {:span, span(name: "parent span")}, 500
assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500
assert %{a: 1, value: val1} = attributes(attrs)
assert [] != links
assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500
assert %{a: 1, value: val2} = attributes(attrs)
assert [] != links
assert ["a", "b"] == Enum.sort([val1, val2])
end
end
describe "async" do
test "async/1" do
Tracer.with_span "parent span", %{attributes: %{a: 1}} do
Task.async(fn ->
Tracer.set_attribute(:value, :async)
:ok
end)
|> Task.await()
end
assert_receive {:span, span(name: "parent span", attributes: attrs)}
assert %{a: 1, value: :async} == attributes(attrs)
end
test "async/3" do
Tracer.with_span "parent span", %{attributes: %{a: 1}} do
Task.async(__MODULE__, :ctx_test_function, [:async])
|> Task.await()
end
assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)}
assert %{a: 1} == attributes(attrs)
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "already traced fun", attributes: attrs)}
assert %{value: :async} == attributes(attrs)
end
test "async_with_span/3" do
Tracer.with_span "parent span" do
Task.async_with_span("task span", %{attributes: %{a: 1}}, fn ->
Tracer.set_attribute(:value, :async_with_span)
:ok
end)
|> Task.await()
end
assert_receive {:span, span(span_id: root_span_id, name: "parent span")}
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}
assert %{a: 1, value: :async_with_span} == attributes(attrs)
end
test "async_with_span/5" do
Tracer.with_span "parent span" do
result =
Task.async_with_span("task span", %{attributes: %{a: 1}}, __MODULE__, :test_function, [:async_with_span])
|> Task.await()
assert :async_with_span == result
end
assert_receive {:span, span(span_id: root_span_id, name: "parent span")}
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}
assert %{a: 1, value: :async_with_span} == attributes(attrs)
end
test "async_with_linked_span/3" do
Tracer.with_span "parent span" do
Task.async_with_linked_span("linked task span", %{attributes: %{a: 1}}, fn ->
Tracer.set_attribute(:value, :async_with_linked_span)
:ok
end)
|> Task.await()
end
assert_receive {:span, span(name: "parent span")}
assert_receive {:span,
span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")}
assert %{a: 1, value: :async_with_linked_span} == attributes(attrs)
assert [] != links
end
test "async_with_linked_span/6" do
Tracer.with_span "parent span" do
result =
Task.async_with_linked_span("linked task span", %{attributes: %{a: 1}}, __MODULE__, :test_function, [
:async_with_linked_span
])
|> Task.await()
assert :async_with_linked_span == result
end
assert_receive {:span, span(name: "parent span")}
assert_receive {:span,
span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")}
assert %{a: 1, value: :async_with_linked_span} == attributes(attrs)
assert [] != links
end
end
describe "start" do
test "start/3" do
Tracer.with_span "parent span", %{attributes: %{a: 1}} do
Task.start(fn ->
ctx_test_function(:start)
end)
end
assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)}
assert %{a: 1} == attributes(attrs)
assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs)}
assert %{value: :start} == attributes(attrs)
end
test "start/5" do
Tracer.with_span "parent span", %{attributes: %{a: 1}} do
Task.start(__MODULE__, :ctx_test_function, [:start])
end
assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)}
assert %{a: 1} == attributes(attrs)
assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs)}
assert %{value: :start} == attributes(attrs)
end
test "start_with_span/3" do
Tracer.with_span "parent span" do
Task.start_with_span("task span", %{attributes: %{a: 1}}, fn ->
Tracer.set_attribute(:value, :start_with_span)
:ok
end)
end
assert_receive {:span, span(span_id: root_span_id, name: "parent span")}
assert_receive {:span, span(parent_span_id: ^root_span_id, attributes: attrs, name: "task span")}
assert %{a: 1, value: :start_with_span} == attributes(attrs)
end
test "start_with_span/5" do
Tracer.with_span "parent span" do
Task.start_with_span("task span", %{attributes: %{a: 1}}, __MODULE__, :test_function, [:start_with_span])
end
assert_receive {:span, span(span_id: root_span_id, name: "parent span")}
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}
assert %{a: 1, value: :start_with_span} == attributes(attrs)
end
test "start_with_linked_span" do
Tracer.with_span "parent span" do
Task.start_with_linked_span("linked task span", %{attributes: %{a: 1}}, fn ->
Tracer.set_attribute(:value, :start_with_linked_span)
:ok
end)
end
assert_receive {:span, span(name: "parent span")}
assert_receive {:span,
span(parent_span_id: :undefined, attributes: attrs, links: links, name: "linked task span")}
assert %{a: 1, value: :start_with_linked_span} == attributes(attrs)
assert [] != links
end
test "start_with_linked_span_mfa" do
Tracer.with_span "parent span" do
Task.start_with_linked_span("linked task span", %{attributes: %{a: 1}}, __MODULE__, :test_function, [
:start_with_linked_span
])
end
assert_receive {:span, span(name: "parent span")}
assert_receive {:span,
span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")}
assert %{a: 1, value: :start_with_linked_span} == attributes(attrs)
assert [] != links
end
end
test "start_link" do
Tracer.with_span "parent span", %{attributes: %{a: 1}} do
Task.start_link(fn ->
ctx_test_function(:start_link)
:ok
end)
end
assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)}
assert %{a: 1} == attributes(attrs)
assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs)}
assert %{value: :start_link} == attributes(attrs)
end
test "start_link_mfa" do
Tracer.with_span "parent span", %{attributes: %{a: 1}} do
Task.start_link(__MODULE__, :ctx_test_function, [:start_link])
end
assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)}
assert %{a: 1} == attributes(attrs)
assert_receive {:span, span(parent_span_id: ^root_span_id, attributes: attrs, name: "already traced fun")}
assert %{value: :start_link} == attributes(attrs)
end
test "start_link_with_span" do
Tracer.with_span "parent span" do
Task.start_link_with_span("task span", %{attributes: %{a: 1}}, fn ->
Tracer.set_attribute(:value, :start_link_with_span)
:ok
end)
end
assert_receive {:span, span(name: "parent span", span_id: root_span_id)}
assert_receive {:span, span(parent_span_id: ^root_span_id, attributes: attrs, name: "task span")}
assert %{a: 1, value: :start_link_with_span} == attributes(attrs)
end
test "start_link_with_span_mfa" do
Tracer.with_span "parent span" do
Task.start_link_with_span("task span", %{attributes: %{a: 1}}, __MODULE__, :test_function, [:start_link_with_span])
end
assert_receive {:span, span(name: "parent span", span_id: root_span_id)}
assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}
assert %{a: 1, value: :start_link_with_span} == attributes(attrs)
end
test "start_link_with_linked_span" do
Tracer.with_span "parent span" do
Task.start_link_with_linked_span("linked task span", %{attributes: %{a: 1}}, fn ->
Tracer.set_attribute(:value, :start_link_with_linked_span)
:ok
end)
end
assert_receive {:span, span(name: "parent span")}
assert_receive {:span, span(parent_span_id: :undefined, attributes: attrs, links: links, name: "linked task span")}
assert %{a: 1, value: :start_link_with_linked_span} == attributes(attrs)
assert [] != links
end
test "start_link_with_linked_span_mfa" do
Tracer.with_span "parent span" do
Task.start_link_with_linked_span("linked task span", %{attributes: %{a: 1}}, __MODULE__, :test_function, [
:start_link_with_linked_span
])
end
assert_receive {:span, span(name: "parent span")}
assert_receive {:span, span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")}
assert %{a: 1, value: :start_link_with_linked_span} == attributes(attrs)
assert [] != links
end
def test_function(value) do
Tracer.set_attributes(%{value: value})
value
end
def ctx_test_function(value) do
OtherAppModule.fun_with_span(value)
end
def link_test_function(value) do
OtherAppModule.fun_with_span(value)
end
def stream_ctx_test_function(value, _args) do
OtherAppModule.fun_with_span(value)
end
def stream_test_function(value, _args) do
Tracer.set_attributes(%{value: value})
value
end
defp attributes({_, _, _, _, attributes}), do: attributes
end