defmodule OpentelemetryProcessPropagator.Task do @moduledoc """ `OpentelemetryProcessPropagator.Task` provides a set of extensions to the `Task` module to reduce some boilerplate in propagating OpenTelemetry contexts across process boundaries. Since these are extensions rather than a replacement of Elixir's module, this library can be aliased into a file without concern for creating spans where you do not want them. Each `Task` function is replicated with two variants: `*_with_span` and `*_with_linked_span`. Each of these variations has a specific use case. The original implementation for each function automatically propagates the current context. * `*` - propagates the current context * `*_with_span` - propagates the current context and starts a new child span. * `*_with_linked_span` - propagates the current context and starts a new linked span. > #### Module Redefinement {: .info} > > This module does not redefine the `Task` module, instead providing a wrapper of the module, > so this functionality will not globally modify the default behavior of the `Task` module. ## Usage ``` defmodule MyApp do require OpenTelemetry.Tracer alias OpentelemetryProcessPropagator.Task def untraced_task do Task.async(fn -> :ok end) |> Task.await() end def traced_task do Task.async_with_span(:span_name, %{attributes: %{a: "b"}}, fn -> Tracer.set_attribute(:c, "d") :ok end) |> Task.await() end end ``` """ alias OpentelemetryProcessPropagator.Task.Wrapper require OpenTelemetry.Tracer @doc """ Returns a stream that runs the given function `fun` concurrently on each element in `enumerable` with the current `t:OpenTelemetry.Ctx.t/0` attached. See `Task.async_stream/3` for more information. """ @spec async_stream(Enumerable.t(), (term() -> term()), keyword()) :: Enumerable.t() def async_stream(enumerable, fun, opts \\ []) do ctx = OpenTelemetry.Ctx.get_current() Task.async_stream( enumerable, fn arg -> OpenTelemetry.Ctx.attach(ctx) fun.(arg) end, opts ) end @doc """ Returns a stream where the given function (`module` and `function`) is mapped concurrently on each element in `enumerable` with the current `t:OpenTelemetry.Ctx.t/0` attached. See `Task.async_stream/5` for more information. """ @spec async_stream( Enumerable.t(), module(), atom(), [term()], keyword() ) :: Enumerable.t() def async_stream(enumerable, module, function_name, args, opts \\ []) do ctx = OpenTelemetry.Ctx.get_current() Task.async_stream(enumerable, Wrapper, :with_ctx, [ctx, {module, function_name, args}], opts) end @doc """ Returns a stream that runs the given function `fun` concurrently on each element in `enumerable` with a new child span. See `Task.async_stream/3` for more information. """ @spec async_stream_with_span( Enumerable.t(), OpenTelemetry.span_name(), OpenTelemetry.Span.start_opts(), (term() -> term()), keyword() ) :: Enumerable.t() def async_stream_with_span(enumerable, name, start_opts, fun, opts \\ []) do ctx = OpenTelemetry.Ctx.get_current() Task.async_stream( enumerable, fn arg -> OpenTelemetry.Ctx.attach(ctx) OpenTelemetry.Tracer.with_span name, start_opts do fun.(arg) end end, opts ) end @doc """ Returns a stream where the given function (`module` and `function`) is mapped concurrently on each element in `enumerable` with a new child span. See `Task.async_stream/5` for more information. """ @spec async_stream_with_span( Enumerable.t(), OpenTelemetry.span_name(), OpenTelemetry.Span.start_opts(), module(), atom(), [term()], keyword() ) :: Enumerable.t() def async_stream_with_span(enumerable, name, start_opts, module, function_name, args, opts \\ []) do ctx = OpenTelemetry.Ctx.get_current() Task.async_stream(enumerable, Wrapper, :with_span, [name, start_opts, ctx, {module, function_name, args}], opts) end @doc """ Returns a stream that runs the given function `fun` concurrently on each element in `enumerable` with a new linked span. See `Task.async_stream/3` for more information. """ @spec async_stream_with_linked_span( Enumerable.t(), OpenTelemetry.span_name(), OpenTelemetry.Span.start_opts(), (term() -> term()), keyword() ) :: Enumerable.t() def async_stream_with_linked_span(enumerable, name, start_opts, fun, opts \\ []) do parent = OpenTelemetry.Tracer.current_span_ctx() Task.async_stream( enumerable, fn arg -> link = OpenTelemetry.link(parent) OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do fun.(arg) end end, opts ) end @doc """ Returns a stream where the given function (`module` and `function`) is mapped concurrently on each element in `enumerable` with a new linked span. See `Task.async_stream/5` for more information. """ @spec async_stream_with_linked_span( Enumerable.t(), OpenTelemetry.span_name(), OpenTelemetry.Span.start_opts(), module(), atom(), [term()], keyword() ) :: Enumerable.t() def async_stream_with_linked_span(enumerable, name, start_opts, module, function_name, args, opts \\ []) do parent = OpenTelemetry.Tracer.current_span_ctx() Task.async_stream( enumerable, Wrapper, :with_linked_span, [name, start_opts, parent, {module, function_name, args}], opts ) end @doc """ Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on. See `Task.async/1` for more information. """ @spec async((() -> any())) :: Task.t() def async(fun) do ctx = OpenTelemetry.Ctx.get_current() Task.async(fn -> OpenTelemetry.Ctx.attach(ctx) fun.() end) end @doc """ Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on. See `Task.async/3` for more information. """ @spec async(module(), atom(), [term()]) :: Task.t() def async(module, function_name, args) do ctx = OpenTelemetry.Ctx.get_current() Task.async(Wrapper, :with_ctx, [ctx, {module, function_name, args}]) end @doc """ Starts a task with a new child span that can be awaited on. See `Task.async/1` for more information. """ @spec async_with_span( OpenTelemetry.span_name(), OpenTelemetry.Span.start_opts(), (() -> any()) ) :: Task.t() def async_with_span(name, start_opts, fun) do ctx = OpenTelemetry.Ctx.get_current() Task.async(fn -> OpenTelemetry.Ctx.attach(ctx) OpenTelemetry.Tracer.with_span name, start_opts do fun.() end end) end @doc """ Starts a task with a new child span that can be awaited on. See `Task.async/3` for more information. """ @spec async_with_span( OpenTelemetry.span_name(), OpenTelemetry.Span.start_opts(), module(), atom(), [term()] ) :: Task.t() def async_with_span(name, start_opts, module, function_name, args) do ctx = OpenTelemetry.Ctx.get_current() Task.async(Wrapper, :with_span, [name, start_opts, ctx, {module, function_name, args}]) end @doc """ Starts a task with a new linked span that can be awaited on. See `Task.async/1` for more information. """ @spec async_with_linked_span( OpenTelemetry.span_name(), OpenTelemetry.Span.start_opts(), (() -> any()) ) :: Task.t() def async_with_linked_span(name, start_opts, fun) do parent = OpenTelemetry.Tracer.current_span_ctx() Task.async(fn -> link = OpenTelemetry.link(parent) OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do fun.() end end) end @doc """ Starts a task with a new linked span that can be awaited on. See `Task.async/3` for more information. """ @spec async_with_linked_span( OpenTelemetry.span_name(), OpenTelemetry.Span.start_opts(), module(), atom(), [term()] ) :: Task.t() def async_with_linked_span(name, start_opts, module, function_name, args) do parent = OpenTelemetry.Tracer.current_span_ctx() Task.async(Wrapper, :with_linked_span, [name, start_opts, parent, {module, function_name, args}]) end @doc """ Starts a task with the current `t:OpenTelemetry.Ctx.t/0`. See `Task.start/1` for more information. """ @spec start((() -> any())) :: {:ok, pid()} def start(fun) do ctx = OpenTelemetry.Ctx.get_current() Task.start(fn -> OpenTelemetry.Ctx.attach(ctx) fun.() end) end @doc """ Starts a task with the current `t:OpenTelemetry.Ctx.t/0`. See `Task.start/3` for more information. """ @spec start(module(), atom(), [term()]) :: {:ok, pid()} def start(module, function_name, args) do ctx = OpenTelemetry.Ctx.get_current() Task.start(Wrapper, :with_ctx, [ctx, {module, function_name, args}]) end @doc """ Starts a task with a new child span. See `Task.start/1` for more information. """ @spec start_with_span( OpenTelemetry.span_name(), OpenTelemetry.Span.start_opts(), (() -> any()) ) :: {:ok, pid()} def start_with_span(name, start_opts, fun) do ctx = OpenTelemetry.Ctx.get_current() Task.start(fn -> OpenTelemetry.Ctx.attach(ctx) OpenTelemetry.Tracer.with_span name, start_opts do fun.() end end) end @doc """ Starts a task with a new child span. See `Task.start/3` for more information. """ @spec start_with_span( OpenTelemetry.span_name(), OpenTelemetry.Span.start_opts(), module(), atom(), [term()] ) :: {:ok, pid()} def start_with_span(name, start_opts, module, function_name, args) do ctx = OpenTelemetry.Ctx.get_current() Task.start(Wrapper, :with_span, [name, start_opts, ctx, {module, function_name, args}]) end @doc """ Starts a task with a new linked span. See `Task.start/1` for more information. """ @spec start_with_linked_span( OpenTelemetry.span_name(), OpenTelemetry.Span.start_opts(), (() -> any()) ) :: {:ok, pid()} def start_with_linked_span(name, start_opts, fun) do parent = OpenTelemetry.Tracer.current_span_ctx() Task.start(fn -> link = OpenTelemetry.link(parent) OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do fun.() end end) end @doc """ Starts a task with a new linked span. See `Task.start/3` for more information. """ @spec start_with_linked_span( OpenTelemetry.span_name(), OpenTelemetry.Span.start_opts(), module(), atom(), [term()] ) :: {:ok, pid()} def start_with_linked_span(name, start_opts, module, function_name, args) do parent = OpenTelemetry.Tracer.current_span_ctx() Task.start(Wrapper, :with_linked_span, [name, start_opts, parent, {module, function_name, args}]) end @doc """ Starts a task as part of a supervision tree with the given `fun` with the current `t:OpenTelemetry.Ctx.t/0` attached. See `Task.start_link/1` for more information. """ @spec start_link((() -> any())) :: {:ok, pid()} def start_link(fun) do ctx = OpenTelemetry.Ctx.get_current() Task.start_link(fn -> OpenTelemetry.Ctx.attach(ctx) fun.() end) end @doc """ Starts a task as part of a supervision tree with the given `module`, `function`, and `args` with the current `t:OpenTelemetry.Ctx.t/0` attached. See `Task.start_link/3` for more information. """ @spec start_link(module(), atom(), [term()]) :: {:ok, pid()} def start_link(module, function_name, args) do ctx = OpenTelemetry.Ctx.get_current() Task.start_link(Wrapper, :with_ctx, [ctx, {module, function_name, args}]) end @doc """ Starts a task as part of a supervision tree with the given `fun` in a new child span. See `Task.start_link/1` for more information. """ @spec start_link_with_span( OpenTelemetry.span_name(), OpenTelemetry.Span.start_opts(), (() -> any()) ) :: {:ok, pid()} def start_link_with_span(name, start_opts, fun) do ctx = OpenTelemetry.Ctx.get_current() Task.start_link(fn -> OpenTelemetry.Ctx.attach(ctx) OpenTelemetry.Tracer.with_span name, start_opts do fun.() end end) end @doc """ Starts a task as part of a supervision tree with the given `module`, `function`, and `args` in a new child span. See `Task.start_link/3` for more information. """ @spec start_link_with_span( OpenTelemetry.span_name(), OpenTelemetry.Span.start_opts(), module(), atom(), [term()] ) :: {:ok, pid()} def start_link_with_span(name, start_opts, module, function_name, args) do ctx = OpenTelemetry.Ctx.get_current() Task.start_link(Wrapper, :with_span, [name, start_opts, ctx, {module, function_name, args}]) end @doc """ Starts a task as part of a supervision tree with the given `fun` in a new linked span. See `Task.start_link/1` for more information. """ @spec start_link_with_linked_span( OpenTelemetry.span_name(), OpenTelemetry.Span.start_opts(), (() -> any()) ) :: {:ok, pid()} def start_link_with_linked_span(name, start_opts, fun) do parent = OpenTelemetry.Tracer.current_span_ctx() Task.start_link(fn -> link = OpenTelemetry.link(parent) OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do fun.() end end) end @doc """ Starts a task as part of a supervision tree with the given `module`, `function`, and `args` in a new linked span. See `Task.start_link/3` for more information. """ @spec start_link_with_linked_span( OpenTelemetry.span_name(), OpenTelemetry.Span.start_opts(), module(), atom(), [term()] ) :: {:ok, pid()} def start_link_with_linked_span(name, start_opts, module, function_name, args) do parent = OpenTelemetry.Tracer.current_span_ctx() Task.start_link(Wrapper, :with_linked_span, [name, start_opts, parent, {module, function_name, args}]) end defdelegate await(task), to: Task defdelegate await(task, timeout), to: Task defdelegate await_many(tasks), to: Task defdelegate await_many(tasks, timeout), to: Task defdelegate child_spec(arg), to: Task if Kernel.function_exported?(Task, :completed, 1) do defdelegate completed(result), to: Task end if Kernel.function_exported?(Task, :ignore, 1) do defdelegate ignore(task), to: Task end defdelegate shutdown(task), to: Task defdelegate shutdown(task, timeout), to: Task defdelegate yield_many(tasks), to: Task defdelegate yield_many(tasks, timeout), to: Task defdelegate yield(tasks), to: Task defdelegate yield(tasks, timeout), to: Task end