opentelemetry-erlang-contrib/propagators/opentelemetry_process_propa.../lib/task.ex

562 lines
15 KiB
Elixir

defmodule OpentelemetryProcessPropagator.Task do
@moduledoc """
`OpentelemetryProcessPropagator.Task` provides a set of extensions
to the `Task` module to reduce some boilerplate in propagating OpenTelemetry
contexts across process boundaries. Since these are extensions rather
than a replacement of Elixir's module, this library can be aliased
into a file without concern for creating spans where you do not want them.
Each `Task` function is replicated with two variants: `*_with_span`
and `*_with_linked_span`. Each of these variations has a specific use case.
The original implementation for each function automatically propagates the
current context.
* `*` - propagates the current context
* `*_with_span` - propagates the current context and starts a new child span.
* `*_with_linked_span` - propagates the current context and starts a new linked span.
> #### Module Redefinement {: .info}
>
> This module does not redefine the `Task` module, instead providing a wrapper of the module,
> so this functionality will not globally modify the default behavior of the `Task` module.
## Usage
```
defmodule MyApp do
require OpenTelemetry.Tracer
alias OpentelemetryProcessPropagator.Task
def traced_task_with_existing_span do
Task.async(fn ->
:ok
end)
|> Task.await()
end
def traced_task_with_new_span do
Task.async_with_span(:span_name, %{attributes: %{a: "b"}}, fn ->
Tracer.set_attribute(:c, "d")
:ok
end)
|> Task.await()
end
def traced_task_with_new_linked_span do
Task.async_with_linked_span(:span_name, %{attributes: %{a: "b"}}, fn ->
Tracer.set_attribute(:c, "d")
:ok
end)
|> Task.await()
end
end
```
"""
alias OpentelemetryProcessPropagator.Task.Wrapper
require OpenTelemetry.Tracer
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with the current `t:OpenTelemetry.Ctx.t/0`
attached.
See `Task.async_stream/3` for more information.
"""
@spec async_stream(Enumerable.t(), (term() -> term()), keyword()) :: Enumerable.t()
def async_stream(enumerable, fun, opts \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.async_stream(
enumerable,
fn arg ->
OpenTelemetry.Ctx.attach(ctx)
fun.(arg)
end,
opts
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with the
current `t:OpenTelemetry.Ctx.t/0` attached.
See `Task.async_stream/5` for more information.
"""
@spec async_stream(
Enumerable.t(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream(enumerable, module, function_name, args, opts \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.async_stream(enumerable, Wrapper, :with_ctx, [ctx, {module, function_name, args}], opts)
end
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with a new child span.
See `Task.async_stream/3` for more information.
"""
@spec async_stream_with_span(
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(term() -> term()),
keyword()
) :: Enumerable.t()
def async_stream_with_span(enumerable, name, start_opts, fun, opts \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.async_stream(
enumerable,
fn arg ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.(arg)
end
end,
opts
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with a new child span.
See `Task.async_stream/5` for more information.
"""
@spec async_stream_with_span(
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream_with_span(enumerable, name, start_opts, module, function_name, args, opts \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.async_stream(enumerable, Wrapper, :with_span, [name, start_opts, ctx, {module, function_name, args}], opts)
end
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with a new linked span.
See `Task.async_stream/3` for more information.
"""
@spec async_stream_with_linked_span(
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(term() -> term()),
keyword()
) :: Enumerable.t()
def async_stream_with_linked_span(enumerable, name, start_opts, fun, opts \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.async_stream(
enumerable,
fn arg ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.(arg)
end
end,
opts
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with a new linked span.
See `Task.async_stream/5` for more information.
"""
@spec async_stream_with_linked_span(
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream_with_linked_span(enumerable, name, start_opts, module, function_name, args, opts \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.async_stream(
enumerable,
Wrapper,
:with_linked_span,
[name, start_opts, parent, {module, function_name, args}],
opts
)
end
@doc """
Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on.
See `Task.async/1` for more information.
"""
@spec async((() -> any())) :: Task.t()
def async(fun) do
ctx = OpenTelemetry.Ctx.get_current()
Task.async(fn ->
OpenTelemetry.Ctx.attach(ctx)
fun.()
end)
end
@doc """
Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on.
See `Task.async/3` for more information.
"""
@spec async(module(), atom(), [term()]) :: Task.t()
def async(module, function_name, args) do
ctx = OpenTelemetry.Ctx.get_current()
Task.async(Wrapper, :with_ctx, [ctx, {module, function_name, args}])
end
@doc """
Starts a task with a new child span that can be awaited on.
See `Task.async/1` for more information.
"""
@spec async_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(() -> any())
) :: Task.t()
def async_with_span(name, start_opts, fun) do
ctx = OpenTelemetry.Ctx.get_current()
Task.async(fn ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.()
end
end)
end
@doc """
Starts a task with a new child span that can be awaited on.
See `Task.async/3` for more information.
"""
@spec async_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()]
) :: Task.t()
def async_with_span(name, start_opts, module, function_name, args) do
ctx = OpenTelemetry.Ctx.get_current()
Task.async(Wrapper, :with_span, [name, start_opts, ctx, {module, function_name, args}])
end
@doc """
Starts a task with a new linked span that can be awaited on.
See `Task.async/1` for more information.
"""
@spec async_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(() -> any())
) :: Task.t()
def async_with_linked_span(name, start_opts, fun) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.async(fn ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.()
end
end)
end
@doc """
Starts a task with a new linked span that can be awaited on.
See `Task.async/3` for more information.
"""
@spec async_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()]
) :: Task.t()
def async_with_linked_span(name, start_opts, module, function_name, args) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.async(Wrapper, :with_linked_span, [name, start_opts, parent, {module, function_name, args}])
end
@doc """
Starts a task with the current `t:OpenTelemetry.Ctx.t/0`.
See `Task.start/1` for more information.
"""
@spec start((() -> any())) :: {:ok, pid()}
def start(fun) do
ctx = OpenTelemetry.Ctx.get_current()
Task.start(fn ->
OpenTelemetry.Ctx.attach(ctx)
fun.()
end)
end
@doc """
Starts a task with the current `t:OpenTelemetry.Ctx.t/0`.
See `Task.start/3` for more information.
"""
@spec start(module(), atom(), [term()]) :: {:ok, pid()}
def start(module, function_name, args) do
ctx = OpenTelemetry.Ctx.get_current()
Task.start(Wrapper, :with_ctx, [ctx, {module, function_name, args}])
end
@doc """
Starts a task with a new child span.
See `Task.start/1` for more information.
"""
@spec start_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(() -> any())
) :: {:ok, pid()}
def start_with_span(name, start_opts, fun) do
ctx = OpenTelemetry.Ctx.get_current()
Task.start(fn ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.()
end
end)
end
@doc """
Starts a task with a new child span.
See `Task.start/3` for more information.
"""
@spec start_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()]
) :: {:ok, pid()}
def start_with_span(name, start_opts, module, function_name, args) do
ctx = OpenTelemetry.Ctx.get_current()
Task.start(Wrapper, :with_span, [name, start_opts, ctx, {module, function_name, args}])
end
@doc """
Starts a task with a new linked span.
See `Task.start/1` for more information.
"""
@spec start_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(() -> any())
) :: {:ok, pid()}
def start_with_linked_span(name, start_opts, fun) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.start(fn ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.()
end
end)
end
@doc """
Starts a task with a new linked span.
See `Task.start/3` for more information.
"""
@spec start_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()]
) :: {:ok, pid()}
def start_with_linked_span(name, start_opts, module, function_name, args) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.start(Wrapper, :with_linked_span, [name, start_opts, parent, {module, function_name, args}])
end
@doc """
Starts a task as part of a supervision tree with the given `fun`
with the current `t:OpenTelemetry.Ctx.t/0` attached.
See `Task.start_link/1` for more information.
"""
@spec start_link((() -> any())) :: {:ok, pid()}
def start_link(fun) do
ctx = OpenTelemetry.Ctx.get_current()
Task.start_link(fn ->
OpenTelemetry.Ctx.attach(ctx)
fun.()
end)
end
@doc """
Starts a task as part of a supervision tree with the given
`module`, `function`, and `args` with the current `t:OpenTelemetry.Ctx.t/0`
attached.
See `Task.start_link/3` for more information.
"""
@spec start_link(module(), atom(), [term()]) :: {:ok, pid()}
def start_link(module, function_name, args) do
ctx = OpenTelemetry.Ctx.get_current()
Task.start_link(Wrapper, :with_ctx, [ctx, {module, function_name, args}])
end
@doc """
Starts a task as part of a supervision tree with the given `fun`
in a new child span.
See `Task.start_link/1` for more information.
"""
@spec start_link_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(() -> any())
) :: {:ok, pid()}
def start_link_with_span(name, start_opts, fun) do
ctx = OpenTelemetry.Ctx.get_current()
Task.start_link(fn ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.()
end
end)
end
@doc """
Starts a task as part of a supervision tree with the given
`module`, `function`, and `args` in a new child span.
See `Task.start_link/3` for more information.
"""
@spec start_link_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()]
) :: {:ok, pid()}
def start_link_with_span(name, start_opts, module, function_name, args) do
ctx = OpenTelemetry.Ctx.get_current()
Task.start_link(Wrapper, :with_span, [name, start_opts, ctx, {module, function_name, args}])
end
@doc """
Starts a task as part of a supervision tree with the given `fun`
in a new linked span.
See `Task.start_link/1` for more information.
"""
@spec start_link_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(() -> any())
) :: {:ok, pid()}
def start_link_with_linked_span(name, start_opts, fun) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.start_link(fn ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.()
end
end)
end
@doc """
Starts a task as part of a supervision tree with the given
`module`, `function`, and `args` in a new linked span.
See `Task.start_link/3` for more information.
"""
@spec start_link_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()]
) :: {:ok, pid()}
def start_link_with_linked_span(name, start_opts, module, function_name, args) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.start_link(Wrapper, :with_linked_span, [name, start_opts, parent, {module, function_name, args}])
end
defdelegate await(task), to: Task
defdelegate await(task, timeout), to: Task
defdelegate await_many(tasks), to: Task
defdelegate await_many(tasks, timeout), to: Task
defdelegate child_spec(arg), to: Task
if Kernel.function_exported?(Task, :completed, 1) do
defdelegate completed(result), to: Task
end
if Kernel.function_exported?(Task, :ignore, 1) do
defdelegate ignore(task), to: Task
end
defdelegate shutdown(task), to: Task
defdelegate shutdown(task, timeout), to: Task
defdelegate yield_many(tasks), to: Task
defdelegate yield_many(tasks, timeout), to: Task
defdelegate yield(tasks), to: Task
defdelegate yield(tasks, timeout), to: Task
end