From fb6c3d2985816fa99cf373877c3dfa84f2798d64 Mon Sep 17 00:00:00 2001 From: Bryan Naegele Date: Fri, 26 Aug 2022 14:10:57 -0600 Subject: [PATCH] Conveniences for Tasks (#80) * Conveniences for Tasks * Finish Task module functions * Complete Task and add Task.Supervisor * Check if new funs added * Remove with_ctx in favor of wrapping default * Fix docs --- .../lib/task.ex | 553 ++++++++++++ .../lib/task/supervisor.ex | 836 ++++++++++++++++++ .../lib/task/wrapper.ex | 89 ++ .../opentelemetry_process_propagator/mix.exs | 2 +- .../opentelemetry_process_propagator/mix.lock | 18 +- .../test/task_supervisor_test.exs | 746 ++++++++++++++++ .../test/task_test.exs | 472 ++++++++++ 7 files changed, 2706 insertions(+), 10 deletions(-) create mode 100644 propagators/opentelemetry_process_propagator/lib/task.ex create mode 100644 propagators/opentelemetry_process_propagator/lib/task/supervisor.ex create mode 100644 propagators/opentelemetry_process_propagator/lib/task/wrapper.ex create mode 100644 propagators/opentelemetry_process_propagator/test/task_supervisor_test.exs create mode 100644 propagators/opentelemetry_process_propagator/test/task_test.exs diff --git a/propagators/opentelemetry_process_propagator/lib/task.ex b/propagators/opentelemetry_process_propagator/lib/task.ex new file mode 100644 index 0000000..4b05eb6 --- /dev/null +++ b/propagators/opentelemetry_process_propagator/lib/task.ex @@ -0,0 +1,553 @@ +defmodule OpentelemetryProcessPropagator.Task do + @moduledoc """ + `OpentelemetryProcessPropagator.Task` provides a set of extensions + to the `Task` module to reduce some boilerplate in propagating OpenTelemetry + contexts across process boundaries. Since these are extensions rather + than a replacement of Elixir's module, this library can be aliased + into a file without concern for creating spans where you do not want them. + + Each `Task` function is replicated with two variants: `*_with_span` + and `*_with_linked_span`. Each of these variations has a specific use case. + The original implementation for each function automatically propagates the + current context. + + * `*` - propagates the current context + * `*_with_span` - propagates the current context and starts a new child span. + * `*_with_linked_span` - propagates the current context and starts a new linked span. + + > #### Module Redefinement {: .info} + > + > This module does not redefine the `Task` module, instead providing a wrapper of the module, + > so this functionality will not globally modify the default behavior of the `Task` module. + + ## Usage + + ``` + defmodule MyApp do + require OpenTelemetry.Tracer + alias OpentelemetryProcessPropagator.Task + + def untraced_task do + Task.async(fn -> + :ok + end) + |> Task.await() + end + + def traced_task do + Task.async_with_span(:span_name, %{attributes: %{a: "b"}}, fn -> + Tracer.set_attribute(:c, "d") + :ok + end) + |> Task.await() + end + end + ``` + """ + alias OpentelemetryProcessPropagator.Task.Wrapper + require OpenTelemetry.Tracer + + @doc """ + Returns a stream that runs the given function `fun` concurrently + on each element in `enumerable` with the current `t:OpenTelemetry.Ctx.t/0` + attached. + + See `Task.async_stream/3` for more information. + """ + @spec async_stream(Enumerable.t(), (term() -> term()), keyword()) :: Enumerable.t() + def async_stream(enumerable, fun, opts \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.async_stream( + enumerable, + fn arg -> + OpenTelemetry.Ctx.attach(ctx) + + fun.(arg) + end, + opts + ) + end + + @doc """ + Returns a stream where the given function (`module` and `function`) + is mapped concurrently on each element in `enumerable` with the + current `t:OpenTelemetry.Ctx.t/0` attached. + + See `Task.async_stream/5` for more information. + """ + @spec async_stream( + Enumerable.t(), + module(), + atom(), + [term()], + keyword() + ) :: Enumerable.t() + def async_stream(enumerable, module, function_name, args, opts \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.async_stream(enumerable, Wrapper, :with_ctx, [ctx, {module, function_name, args}], opts) + end + + @doc """ + Returns a stream that runs the given function `fun` concurrently + on each element in `enumerable` with a new child span. + + See `Task.async_stream/3` for more information. + """ + @spec async_stream_with_span( + Enumerable.t(), + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + (term() -> term()), + keyword() + ) :: Enumerable.t() + def async_stream_with_span(enumerable, name, start_opts, fun, opts \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.async_stream( + enumerable, + fn arg -> + OpenTelemetry.Ctx.attach(ctx) + + OpenTelemetry.Tracer.with_span name, start_opts do + fun.(arg) + end + end, + opts + ) + end + + @doc """ + Returns a stream where the given function (`module` and `function`) + is mapped concurrently on each element in `enumerable` with a new child span. + + See `Task.async_stream/5` for more information. + """ + @spec async_stream_with_span( + Enumerable.t(), + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + module(), + atom(), + [term()], + keyword() + ) :: Enumerable.t() + def async_stream_with_span(enumerable, name, start_opts, module, function_name, args, opts \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.async_stream(enumerable, Wrapper, :with_span, [name, start_opts, ctx, {module, function_name, args}], opts) + end + + @doc """ + Returns a stream that runs the given function `fun` concurrently + on each element in `enumerable` with a new linked span. + + See `Task.async_stream/3` for more information. + """ + @spec async_stream_with_linked_span( + Enumerable.t(), + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + (term() -> term()), + keyword() + ) :: Enumerable.t() + def async_stream_with_linked_span(enumerable, name, start_opts, fun, opts \\ []) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.async_stream( + enumerable, + fn arg -> + link = OpenTelemetry.link(parent) + + OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do + fun.(arg) + end + end, + opts + ) + end + + @doc """ + Returns a stream where the given function (`module` and `function`) + is mapped concurrently on each element in `enumerable` with a new linked span. + + See `Task.async_stream/5` for more information. + """ + @spec async_stream_with_linked_span( + Enumerable.t(), + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + module(), + atom(), + [term()], + keyword() + ) :: Enumerable.t() + def async_stream_with_linked_span(enumerable, name, start_opts, module, function_name, args, opts \\ []) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.async_stream( + enumerable, + Wrapper, + :with_linked_span, + [name, start_opts, parent, {module, function_name, args}], + opts + ) + end + + @doc """ + Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on. + + See `Task.async/1` for more information. + """ + @spec async((() -> any())) :: Task.t() + def async(fun) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.async(fn -> + OpenTelemetry.Ctx.attach(ctx) + + fun.() + end) + end + + @doc """ + Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on. + + See `Task.async/3` for more information. + """ + @spec async(module(), atom(), [term()]) :: Task.t() + def async(module, function_name, args) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.async(Wrapper, :with_ctx, [ctx, {module, function_name, args}]) + end + + @doc """ + Starts a task with a new child span that can be awaited on. + + See `Task.async/1` for more information. + """ + @spec async_with_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + (() -> any()) + ) :: Task.t() + def async_with_span(name, start_opts, fun) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.async(fn -> + OpenTelemetry.Ctx.attach(ctx) + + OpenTelemetry.Tracer.with_span name, start_opts do + fun.() + end + end) + end + + @doc """ + Starts a task with a new child span that can be awaited on. + + See `Task.async/3` for more information. + """ + @spec async_with_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + module(), + atom(), + [term()] + ) :: Task.t() + def async_with_span(name, start_opts, module, function_name, args) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.async(Wrapper, :with_span, [name, start_opts, ctx, {module, function_name, args}]) + end + + @doc """ + Starts a task with a new linked span that can be awaited on. + + See `Task.async/1` for more information. + """ + @spec async_with_linked_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + (() -> any()) + ) :: Task.t() + def async_with_linked_span(name, start_opts, fun) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.async(fn -> + link = OpenTelemetry.link(parent) + + OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do + fun.() + end + end) + end + + @doc """ + Starts a task with a new linked span that can be awaited on. + + See `Task.async/3` for more information. + """ + @spec async_with_linked_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + module(), + atom(), + [term()] + ) :: Task.t() + def async_with_linked_span(name, start_opts, module, function_name, args) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.async(Wrapper, :with_linked_span, [name, start_opts, parent, {module, function_name, args}]) + end + + @doc """ + Starts a task with the current `t:OpenTelemetry.Ctx.t/0`. + + See `Task.start/1` for more information. + """ + @spec start((() -> any())) :: {:ok, pid()} + def start(fun) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.start(fn -> + OpenTelemetry.Ctx.attach(ctx) + + fun.() + end) + end + + @doc """ + Starts a task with the current `t:OpenTelemetry.Ctx.t/0`. + + See `Task.start/3` for more information. + """ + @spec start(module(), atom(), [term()]) :: {:ok, pid()} + def start(module, function_name, args) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.start(Wrapper, :with_ctx, [ctx, {module, function_name, args}]) + end + + @doc """ + Starts a task with a new child span. + + See `Task.start/1` for more information. + """ + @spec start_with_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + (() -> any()) + ) :: {:ok, pid()} + def start_with_span(name, start_opts, fun) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.start(fn -> + OpenTelemetry.Ctx.attach(ctx) + + OpenTelemetry.Tracer.with_span name, start_opts do + fun.() + end + end) + end + + @doc """ + Starts a task with a new child span. + + See `Task.start/3` for more information. + """ + @spec start_with_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + module(), + atom(), + [term()] + ) :: {:ok, pid()} + def start_with_span(name, start_opts, module, function_name, args) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.start(Wrapper, :with_span, [name, start_opts, ctx, {module, function_name, args}]) + end + + @doc """ + Starts a task with a new linked span. + + See `Task.start/1` for more information. + """ + @spec start_with_linked_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + (() -> any()) + ) :: {:ok, pid()} + def start_with_linked_span(name, start_opts, fun) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.start(fn -> + link = OpenTelemetry.link(parent) + + OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do + fun.() + end + end) + end + + @doc """ + Starts a task with a new linked span. + + See `Task.start/3` for more information. + """ + @spec start_with_linked_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + module(), + atom(), + [term()] + ) :: {:ok, pid()} + def start_with_linked_span(name, start_opts, module, function_name, args) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.start(Wrapper, :with_linked_span, [name, start_opts, parent, {module, function_name, args}]) + end + + @doc """ + Starts a task as part of a supervision tree with the given `fun` + with the current `t:OpenTelemetry.Ctx.t/0` attached. + + See `Task.start_link/1` for more information. + """ + @spec start_link((() -> any())) :: {:ok, pid()} + def start_link(fun) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.start_link(fn -> + OpenTelemetry.Ctx.attach(ctx) + + fun.() + end) + end + + @doc """ + Starts a task as part of a supervision tree with the given + `module`, `function`, and `args` with the current `t:OpenTelemetry.Ctx.t/0` + attached. + + See `Task.start_link/3` for more information. + """ + @spec start_link(module(), atom(), [term()]) :: {:ok, pid()} + def start_link(module, function_name, args) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.start_link(Wrapper, :with_ctx, [ctx, {module, function_name, args}]) + end + + @doc """ + Starts a task as part of a supervision tree with the given `fun` + in a new child span. + + See `Task.start_link/1` for more information. + """ + @spec start_link_with_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + (() -> any()) + ) :: {:ok, pid()} + def start_link_with_span(name, start_opts, fun) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.start_link(fn -> + OpenTelemetry.Ctx.attach(ctx) + + OpenTelemetry.Tracer.with_span name, start_opts do + fun.() + end + end) + end + + @doc """ + Starts a task as part of a supervision tree with the given + `module`, `function`, and `args` in a new child span. + + See `Task.start_link/3` for more information. + """ + @spec start_link_with_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + module(), + atom(), + [term()] + ) :: {:ok, pid()} + def start_link_with_span(name, start_opts, module, function_name, args) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.start_link(Wrapper, :with_span, [name, start_opts, ctx, {module, function_name, args}]) + end + + @doc """ + Starts a task as part of a supervision tree with the given `fun` + in a new linked span. + + See `Task.start_link/1` for more information. + """ + @spec start_link_with_linked_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + (() -> any()) + ) :: {:ok, pid()} + def start_link_with_linked_span(name, start_opts, fun) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.start_link(fn -> + link = OpenTelemetry.link(parent) + + OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do + fun.() + end + end) + end + + @doc """ + Starts a task as part of a supervision tree with the given + `module`, `function`, and `args` in a new linked span. + + See `Task.start_link/3` for more information. + """ + @spec start_link_with_linked_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + module(), + atom(), + [term()] + ) :: {:ok, pid()} + def start_link_with_linked_span(name, start_opts, module, function_name, args) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.start_link(Wrapper, :with_linked_span, [name, start_opts, parent, {module, function_name, args}]) + end + + defdelegate await(task), to: Task + defdelegate await(task, timeout), to: Task + + defdelegate await_many(tasks), to: Task + defdelegate await_many(tasks, timeout), to: Task + + defdelegate child_spec(arg), to: Task + + if Kernel.function_exported?(Task, :completed, 1) do + defdelegate completed(result), to: Task + end + + if Kernel.function_exported?(Task, :ignore, 1) do + defdelegate ignore(task), to: Task + end + + defdelegate shutdown(task), to: Task + defdelegate shutdown(task, timeout), to: Task + + defdelegate yield_many(tasks), to: Task + defdelegate yield_many(tasks, timeout), to: Task + + defdelegate yield(tasks), to: Task + defdelegate yield(tasks, timeout), to: Task +end diff --git a/propagators/opentelemetry_process_propagator/lib/task/supervisor.ex b/propagators/opentelemetry_process_propagator/lib/task/supervisor.ex new file mode 100644 index 0000000..45d6e66 --- /dev/null +++ b/propagators/opentelemetry_process_propagator/lib/task/supervisor.ex @@ -0,0 +1,836 @@ +defmodule OpentelemetryProcessPropagator.Task.Supervisor do + @moduledoc """ + `OpentelemetryProcessPropagator.Task.Supervisor` provides a set of extensions + to the `Task.Supervisor` module to reduce some boilerplate in propagating OpenTelemetry + contexts across process boundaries. Since these are extensions rather + than a replacement of Elixir's module, this library can be aliased + into a file without concern for creating spans where you do not want them. + + Each `Task.Supervisor` function is replicated with two variants: `*_with_span` + and `*_with_linked_span`. Each of these variations has a specific use case. + The original implementation for each function automatically propagates the + current context. + + * `*` - propagates the current context + * `*_with_span` - propagates the current context and starts a new child span. + * `*_with_linked_span` - propagates the current context and starts a new linked span. + + > #### Module Redefinement {: .info} + > + > This module does not redefine the `Task.Supervisor` module, instead providing a wrapper of the module, + > so this functionality will not globally modify the default behavior of the `Task` module. + """ + + alias OpentelemetryProcessPropagator.Task.Wrapper + require OpenTelemetry.Tracer + + @doc false + defdelegate child_spec(opts), to: Task.Supervisor + + @doc """ + Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on. + + See `Task.Supervisor.async/3` for more information. + """ + @spec async(Supervisor.supervisor(), (() -> any())) :: Task.t() + def async(supervisor, fun, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.async( + supervisor, + fn -> + OpenTelemetry.Ctx.attach(ctx) + + fun.() + end, + options + ) + end + + @doc """ + Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on. + + See `Task.Supervisor.async/5` for more information. + """ + @spec async(Supervisor.supervisor(), module(), atom(), [term()]) :: Task.t() + def async(supervisor, module, function_name, args, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.async(supervisor, Wrapper, :with_ctx, [ctx, {module, function_name, args}], options) + end + + @doc """ + Starts a task with a new child span that can be awaited on. + + See `Task.Supervisor.async/3` for more information. + """ + @spec async_with_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + Supervisor.supervisor(), + (() -> any()) + ) :: Task.t() + def async_with_span(name, start_opts, supervisor, fun, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.async( + supervisor, + fn -> + OpenTelemetry.Ctx.attach(ctx) + + OpenTelemetry.Tracer.with_span name, start_opts do + fun.() + end + end, + options + ) + end + + @doc """ + Starts a task with a new child span that can be awaited on. + + See `Task.Supervisor.async/5` for more information. + """ + @spec async_with_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + Supervisor.supervisor(), + module(), + atom(), + [term()] + ) :: Task.t() + def async_with_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.async( + supervisor, + Wrapper, + :with_span, + [name, start_opts, ctx, {module, function_name, args}], + options + ) + end + + @doc """ + Starts a task with a new linked span that can be awaited on. + + See `Task.Supervisor.async/3` for more information. + """ + @spec async_with_linked_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + Supervisor.supervisor(), + (() -> any()) + ) :: Task.t() + def async_with_linked_span(name, start_opts, supervisor, fun, options \\ []) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.Supervisor.async( + supervisor, + fn -> + link = OpenTelemetry.link(parent) + + OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do + fun.() + end + end, + options + ) + end + + @doc """ + Starts a task with a new linked span that can be awaited on. + + See `Task.Supervisor.async/5` for more information. + """ + @spec async_with_linked_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + Supervisor.supervisor(), + module(), + atom(), + [term()] + ) :: Task.t() + def async_with_linked_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.Supervisor.async( + supervisor, + Wrapper, + :with_linked_span, + [name, start_opts, parent, {module, function_name, args}], + options + ) + end + + @doc """ + Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on. + + See `Task.Supervisor.async_nolink/3` for more information. + """ + @spec async_nolink(Supervisor.supervisor(), (() -> any())) :: Task.t() + def async_nolink(supervisor, fun, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.async_nolink( + supervisor, + fn -> + OpenTelemetry.Ctx.attach(ctx) + + fun.() + end, + options + ) + end + + @doc """ + Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on. + + See `Task.Supervisor.async_nolink/5` for more information. + """ + @spec async_nolink(Supervisor.supervisor(), module(), atom(), [term()]) :: Task.t() + def async_nolink(supervisor, module, function_name, args, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.async_nolink(supervisor, Wrapper, :with_ctx, [ctx, {module, function_name, args}], options) + end + + @doc """ + Starts a task with a new child span that can be awaited on. + + See `Task.Supervisor.async_nolink/3` for more information. + """ + @spec async_nolink_with_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + Supervisor.supervisor(), + (() -> any()) + ) :: Task.t() + def async_nolink_with_span(name, start_opts, supervisor, fun, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.async_nolink( + supervisor, + fn -> + OpenTelemetry.Ctx.attach(ctx) + + OpenTelemetry.Tracer.with_span name, start_opts do + fun.() + end + end, + options + ) + end + + @doc """ + Starts a task with a new child span that can be awaited on. + + See `Task.Supervisor.async_nolink/5` for more information. + """ + @spec async_nolink_with_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + Supervisor.supervisor(), + module(), + atom(), + [term()] + ) :: Task.t() + def async_nolink_with_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.async_nolink( + supervisor, + Wrapper, + :with_span, + [name, start_opts, ctx, {module, function_name, args}], + options + ) + end + + @doc """ + Starts a task with a new linked span that can be awaited on. + + See `Task.Supervisor.async_nolink/3` for more information. + """ + @spec async_nolink_with_linked_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + Supervisor.supervisor(), + (() -> any()) + ) :: Task.t() + def async_nolink_with_linked_span(name, start_opts, supervisor, fun, options \\ []) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.Supervisor.async_nolink( + supervisor, + fn -> + link = OpenTelemetry.link(parent) + + OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do + fun.() + end + end, + options + ) + end + + @doc """ + Starts a task with a new linked span that can be awaited on. + + See `Task.Supervisor.async_nolink/5` for more information. + """ + @spec async_nolink_with_linked_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + Supervisor.supervisor(), + module(), + atom(), + [term()] + ) :: Task.t() + def async_nolink_with_linked_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.Supervisor.async_nolink( + supervisor, + Wrapper, + :with_linked_span, + [name, start_opts, parent, {module, function_name, args}], + options + ) + end + + @doc """ + Returns a stream that runs the given function `fun` concurrently + on each element in `enumerable` with the current `t:OpenTelemetry.Ctx.t/0` + attached. + + See `Task.Supervisor.async_stream/4` for more information. + """ + @spec async_stream( + Supervisor.supervisor(), + Enumerable.t(), + (term() -> term()), + keyword() + ) :: Enumerable.t() + def async_stream(supervisor, enumerable, fun, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.async_stream( + supervisor, + enumerable, + fn arg -> + OpenTelemetry.Ctx.attach(ctx) + + fun.(arg) + end, + options + ) + end + + @doc """ + Returns a stream where the given function (`module` and `function`) + is mapped concurrently on each element in `enumerable` with the + current `t:OpenTelemetry.Ctx.t/0` attached. + + See `Task.Supervisor.async_stream/6` for more information. + """ + @spec async_stream( + Supervisor.supervisor(), + Enumerable.t(), + module(), + atom(), + [term()], + keyword() + ) :: Enumerable.t() + def async_stream(supervisor, enumerable, module, function_name, args, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.async_stream( + supervisor, + enumerable, + Wrapper, + :with_ctx, + [ctx, {module, function_name, args}], + options + ) + end + + @doc """ + Returns a stream that runs the given function `fun` concurrently + on each element in `enumerable` with a new child span. + + See `Task.Supervisor.async_stream/4` for more information. + """ + @spec async_stream_with_span( + Supervisor.supervisor(), + Enumerable.t(), + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + (term() -> term()), + keyword() + ) :: Enumerable.t() + def async_stream_with_span(supervisor, enumerable, name, start_opts, fun, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.async_stream( + supervisor, + enumerable, + fn arg -> + OpenTelemetry.Ctx.attach(ctx) + + OpenTelemetry.Tracer.with_span name, start_opts do + fun.(arg) + end + end, + options + ) + end + + @doc """ + Returns a stream where the given function (`module` and `function`) + is mapped concurrently on each element in `enumerable` with a new child span. + + See `Task.Supervisor.async_stream/6` for more information. + """ + @spec async_stream_with_span( + Supervisor.supervisor(), + Enumerable.t(), + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + module(), + atom(), + [term()], + keyword() + ) :: Enumerable.t() + def async_stream_with_span(supervisor, enumerable, name, start_opts, module, function_name, args, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.async_stream( + supervisor, + enumerable, + Wrapper, + :with_span, + [name, start_opts, ctx, {module, function_name, args}], + options + ) + end + + @doc """ + Returns a stream that runs the given function `fun` concurrently + on each element in `enumerable` with a new linked span. + + See `Task.Supervisor.async_stream/4` for more information. + """ + @spec async_stream_with_linked_span( + Supervisor.supervisor(), + Enumerable.t(), + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + (term() -> term()), + keyword() + ) :: Enumerable.t() + def async_stream_with_linked_span(supervisor, enumerable, name, start_opts, fun, options \\ []) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.Supervisor.async_stream( + supervisor, + enumerable, + fn arg -> + link = OpenTelemetry.link(parent) + + OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do + fun.(arg) + end + end, + options + ) + end + + @doc """ + Returns a stream where the given function (`module` and `function`) + is mapped concurrently on each element in `enumerable` with a new linked span. + + See `Task.Supervisor.async_stream/6` for more information. + """ + @spec async_stream_with_linked_span( + Supervisor.supervisor(), + Enumerable.t(), + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + module(), + atom(), + [term()], + keyword() + ) :: Enumerable.t() + def async_stream_with_linked_span( + supervisor, + enumerable, + name, + start_opts, + module, + function_name, + args, + options \\ [] + ) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.Supervisor.async_stream( + supervisor, + enumerable, + Wrapper, + :with_linked_span, + [name, start_opts, parent, {module, function_name, args}], + options + ) + end + + @doc """ + Returns a stream that runs the given function `fun` concurrently + on each element in `enumerable` with the current `t:OpenTelemetry.Ctx.t/0` + attached. + + See `Task.Supervisor.async_stream_nolink/4` for more information. + """ + @spec async_stream_nolink( + Supervisor.supervisor(), + Enumerable.t(), + (term() -> term()), + keyword() + ) :: Enumerable.t() + def async_stream_nolink(supervisor, enumerable, fun, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.async_stream_nolink( + supervisor, + enumerable, + fn arg -> + OpenTelemetry.Ctx.attach(ctx) + + fun.(arg) + end, + options + ) + end + + @doc """ + Returns a stream where the given function (`module` and `function`) + is mapped concurrently on each element in `enumerable` with the + current `t:OpenTelemetry.Ctx.t/0` attached. + + See `Task.Supervisor.async_stream_nolink/6` for more information. + """ + @spec async_stream_nolink( + Supervisor.supervisor(), + Enumerable.t(), + module(), + atom(), + [term()], + keyword() + ) :: Enumerable.t() + def async_stream_nolink(supervisor, enumerable, module, function_name, args, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.async_stream_nolink( + supervisor, + enumerable, + Wrapper, + :with_ctx, + [ctx, {module, function_name, args}], + options + ) + end + + @doc """ + Returns a stream that runs the given function `fun` concurrently + on each element in `enumerable` with a new child span. + + See `Task.Supervisor.async_stream_nolink/4` for more information. + """ + @spec async_stream_nolink_with_span( + Supervisor.supervisor(), + Enumerable.t(), + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + (term() -> term()), + keyword() + ) :: Enumerable.t() + def async_stream_nolink_with_span(supervisor, enumerable, name, start_opts, fun, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.async_stream_nolink( + supervisor, + enumerable, + fn arg -> + OpenTelemetry.Ctx.attach(ctx) + + OpenTelemetry.Tracer.with_span name, start_opts do + fun.(arg) + end + end, + options + ) + end + + @doc """ + Returns a stream where the given function (`module` and `function`) + is mapped concurrently on each element in `enumerable` with a new child span. + + See `Task.Supervisor.async_stream_nolink/6` for more information. + """ + @spec async_stream_nolink_with_span( + Supervisor.supervisor(), + Enumerable.t(), + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + module(), + atom(), + [term()], + keyword() + ) :: Enumerable.t() + def async_stream_nolink_with_span( + supervisor, + enumerable, + name, + start_opts, + module, + function_name, + args, + options \\ [] + ) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.async_stream_nolink( + supervisor, + enumerable, + Wrapper, + :with_span, + [name, start_opts, ctx, {module, function_name, args}], + options + ) + end + + @doc """ + Returns a stream that runs the given function `fun` concurrently + on each element in `enumerable` with a new linked span. + + See `Task.Supervisor.async_stream_nolink/4` for more information. + """ + @spec async_stream_nolink_with_linked_span( + Supervisor.supervisor(), + Enumerable.t(), + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + (term() -> term()), + keyword() + ) :: Enumerable.t() + def async_stream_nolink_with_linked_span(supervisor, enumerable, name, start_opts, fun, options \\ []) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.Supervisor.async_stream_nolink( + supervisor, + enumerable, + fn arg -> + link = OpenTelemetry.link(parent) + + OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do + fun.(arg) + end + end, + options + ) + end + + @doc """ + Returns a stream where the given function (`module` and `function`) + is mapped concurrently on each element in `enumerable` with a new linked span. + + See `Task.Supervisor.async_stream_nolink/6` for more information. + """ + @spec async_stream_nolink_with_linked_span( + Supervisor.supervisor(), + Enumerable.t(), + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + module(), + atom(), + [term()], + keyword() + ) :: Enumerable.t() + def async_stream_nolink_with_linked_span( + supervisor, + enumerable, + name, + start_opts, + module, + function_name, + args, + options \\ [] + ) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.Supervisor.async_stream_nolink( + supervisor, + enumerable, + Wrapper, + :with_linked_span, + [name, start_opts, parent, {module, function_name, args}], + options + ) + end + + @doc """ + Starts a task as a child of the given `supervisor` with the + current `t:OpenTelemetry.Ctx.t/0`. + + See `Task.Supervisor.start_child/3` for more information. + """ + @spec start_child( + Supervisor.supervisor(), + (() -> any()), + keyword() + ) :: DynamicSupervisor.on_start_child() + def start_child(supervisor, fun, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.start_child( + supervisor, + fn -> + OpenTelemetry.Ctx.attach(ctx) + + fun.() + end, + options + ) + end + + @doc """ + Starts a task as a child of the given `supervisor` with the + current `t:OpenTelemetry.Ctx.t/0`. + + See `Task.Supervisor.start_child/5` for more information. + """ + @spec start_child( + Supervisor.supervisor(), + module(), + atom(), + [term()], + keyword() + ) :: DynamicSupervisor.on_start_child() + def start_child(supervisor, module, function_name, args, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.start_child(supervisor, Wrapper, :with_ctx, [ctx, {module, function_name, args}], options) + end + + @doc """ + Starts a task as a child of the given `supervisor` in a new child span. + + See `Task.Supervisor.start_child/3` for more information. + """ + @spec start_child_with_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + Supervisor.supervisor(), + (() -> any()), + keyword() + ) :: DynamicSupervisor.on_start_child() + def start_child_with_span(name, start_opts, supervisor, fun, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.start_child( + supervisor, + fn -> + OpenTelemetry.Ctx.attach(ctx) + + OpenTelemetry.Tracer.with_span name, start_opts do + fun.() + end + end, + options + ) + end + + @doc """ + Starts a task as a child of the given `supervisor` in a new child span. + + See `Task.Supervisor.start_child/5` for more information. + """ + @spec start_child_with_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + Supervisor.supervisor(), + module(), + atom(), + [term()], + keyword() + ) :: DynamicSupervisor.on_start_child() + def start_child_with_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do + ctx = OpenTelemetry.Ctx.get_current() + + Task.Supervisor.start_child( + supervisor, + Wrapper, + :with_span, + [name, start_opts, ctx, {module, function_name, args}], + options + ) + end + + @doc """ + Starts a task as a child of the given `supervisor` in a new linked span. + + See `Task.Supervisor.start_child/3` for more information. + """ + @spec start_child_with_linked_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + Supervisor.supervisor(), + (() -> any()), + keyword() + ) :: DynamicSupervisor.on_start_child() + def start_child_with_linked_span(name, start_opts, supervisor, fun, options \\ []) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.Supervisor.start_child( + supervisor, + fn -> + link = OpenTelemetry.link(parent) + + OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do + fun.() + end + end, + options + ) + end + + @doc """ + Starts a task as a child of the given `supervisor` in a new linked span. + + See `Task.Supervisor.start_child/5` for more information. + """ + @spec start_child_with_linked_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + Supervisor.supervisor(), + module(), + atom(), + [term()], + keyword() + ) :: DynamicSupervisor.on_start_child() + def start_child_with_linked_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do + parent = OpenTelemetry.Tracer.current_span_ctx() + + Task.Supervisor.start_child( + supervisor, + Wrapper, + :with_linked_span, + [name, start_opts, parent, {module, function_name, args}], + options + ) + end + + defdelegate children(supervisor), to: Task.Supervisor + defdelegate start_link(), to: Task.Supervisor + defdelegate terminate_child(supervisor, pid), to: Task.Supervisor +end diff --git a/propagators/opentelemetry_process_propagator/lib/task/wrapper.ex b/propagators/opentelemetry_process_propagator/lib/task/wrapper.ex new file mode 100644 index 0000000..313cd75 --- /dev/null +++ b/propagators/opentelemetry_process_propagator/lib/task/wrapper.ex @@ -0,0 +1,89 @@ +defmodule OpentelemetryProcessPropagator.Task.Wrapper do + @moduledoc false + require OpenTelemetry.Tracer + + @spec with_ctx(OpenTelemetry.Ctx.t(), {module(), atom(), [term()]}) :: any() + def with_ctx(ctx, {m, f, a}) do + OpenTelemetry.Ctx.attach(ctx) + + apply(m, f, a) + end + + # for streams which prepend the value to the given arguments + @spec with_ctx(term(), OpenTelemetry.Ctx.t(), {module(), atom(), [term()]}) :: any() + def with_ctx(value, ctx, {m, f, a}) do + OpenTelemetry.Ctx.attach(ctx) + + apply(m, f, [value | a]) + end + + @spec with_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + OpenTelemetry.Ctx.t(), + {module(), atom(), [term()]} + ) :: any() + def with_span(name, start_opts, ctx, {m, f, a}) do + OpenTelemetry.Ctx.attach(ctx) + + OpenTelemetry.Tracer.with_span name, start_opts do + apply(m, f, a) + end + end + + # for streams which prepend the value to the given arguments + @spec with_span( + term(), + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + OpenTelemetry.Ctx.t(), + {module(), atom(), [term()]} + ) :: any() + def with_span(value, name, start_opts, ctx, {m, f, a}) do + OpenTelemetry.Ctx.attach(ctx) + + OpenTelemetry.Tracer.with_span name, start_opts do + apply(m, f, [value | a]) + end + end + + @spec with_linked_span( + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + OpenTelemetry.Ctx.t(), + {module(), atom(), [term()]} + ) :: any() + def with_linked_span(name, start_opts, parent, {m, f, a}) do + links = + if parent == :undefined do + [] + else + [OpenTelemetry.link(parent)] + end + + OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, links) do + apply(m, f, a) + end + end + + # for streams which prepend the value to the given arguments + @spec with_linked_span( + term(), + OpenTelemetry.span_name(), + OpenTelemetry.Span.start_opts(), + OpenTelemetry.Ctx.t(), + {module(), atom(), [term()]} + ) :: any() + def with_linked_span(value, name, start_opts, parent, {m, f, a}) do + links = + if parent == :undefined do + [] + else + [OpenTelemetry.link(parent)] + end + + OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, links) do + apply(m, f, [value | a]) + end + end +end diff --git a/propagators/opentelemetry_process_propagator/mix.exs b/propagators/opentelemetry_process_propagator/mix.exs index c350bba..079113e 100644 --- a/propagators/opentelemetry_process_propagator/mix.exs +++ b/propagators/opentelemetry_process_propagator/mix.exs @@ -9,7 +9,7 @@ defmodule OpentelemetryProcessPropagator.MixProject do app: app, version: to_string(Keyword.fetch!(desc, :vsn)), description: to_string(Keyword.fetch!(desc, :description)), - elixir: "~> 1.10", + elixir: "~> 1.11", start_permanent: Mix.env() == :prod, deps: deps(Keyword.fetch!(config, :deps)), name: "Opentelemetry Process Propagator", diff --git a/propagators/opentelemetry_process_propagator/mix.lock b/propagators/opentelemetry_process_propagator/mix.lock index 1f0e901..c39777c 100644 --- a/propagators/opentelemetry_process_propagator/mix.lock +++ b/propagators/opentelemetry_process_propagator/mix.lock @@ -1,21 +1,21 @@ %{ "acceptor_pool": {:hex, :acceptor_pool, "1.0.0", "43c20d2acae35f0c2bcd64f9d2bde267e459f0f3fd23dab26485bf518c281b21", [:rebar3], [], "hexpm", "0cbcd83fdc8b9ad2eee2067ef8b91a14858a5883cb7cd800e6fcd5803e158788"}, - "chatterbox": {:hex, :ts_chatterbox, "0.11.0", "b8f372c706023eb0de5bf2976764edb27c70fe67052c88c1f6a66b3a5626847f", [:rebar3], [{:hpack, "~>0.2.3", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "722fe2bad52913ab7e87d849fc6370375f0c961ffb2f0b5e6d647c9170c382a6"}, + "chatterbox": {:hex, :ts_chatterbox, "0.12.0", "4e54f199e15c0320b85372a24e35554a2ccfc4342e0b7cd8daed9a04f9b8ef4a", [:rebar3], [{:hpack, "~>0.2.3", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "6478c161bc60244f41cd5847cc3accd26d997883e9f7facd36ff24533b2fa579"}, "ctx": {:hex, :ctx, "0.6.0", "8ff88b70e6400c4df90142e7f130625b82086077a45364a78d208ed3ed53c7fe", [:rebar3], [], "hexpm", "a14ed2d1b67723dbebbe423b28d7615eb0bdcba6ff28f2d1f1b0a7e1d4aa5fc2"}, - "dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.24", "344f8d2a558691d3fcdef3f9400157d7c4b3b8e58ee5063297e9ae593e8326d9", [:mix], [], "hexpm", "1f6451b0116dd270449c8f5b30289940ee9c0a39154c783283a08e55af82ea34"}, + "dialyxir": {:hex, :dialyxir, "1.2.0", "58344b3e87c2e7095304c81a9ae65cb68b613e28340690dfe1a5597fd08dec37", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "61072136427a851674cab81762be4dbeae7679f85b1272b6d25c3a839aff8463"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.26", "f4291134583f373c7d8755566122908eb9662df4c4b63caa66a0eabe06569b0a", [:mix], [], "hexpm", "48d460899f8a0c52c5470676611c01f64f3337bad0b26ddab43648428d94aabc"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, - "ex_doc": {:hex, :ex_doc, "0.28.3", "6eea2f69995f5fba94cd6dd398df369fe4e777a47cd887714a0976930615c9e6", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "05387a6a2655b5f9820f3f627450ed20b4325c25977b2ee69bed90af6688e718"}, + "ex_doc": {:hex, :ex_doc, "0.28.5", "3e52a6d2130ce74d096859e477b97080c156d0926701c13870a4e1f752363279", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d2c4b07133113e9aa3e9ba27efb9088ba900e9e51caa383919676afdf09ab181"}, "gproc": {:hex, :gproc, "0.8.0", "cea02c578589c61e5341fce149ea36ccef236cc2ecac8691fba408e7ea77ec2f", [:rebar3], [], "hexpm", "580adafa56463b75263ef5a5df4c86af321f68694e7786cb057fd805d1e2a7de"}, - "grpcbox": {:hex, :grpcbox, "0.14.0", "3eb321bcd2275baf8b54cf381feb7b0559a50c02544de28fda039c7f2f9d1a7a", [:rebar3], [{:acceptor_pool, "~>1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~>0.11.0", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~>0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~>0.8.0", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "e24159b7b6d3f9869bbe528845c0125fed2259366ba908fd04a1f45fe81d0660"}, + "grpcbox": {:hex, :grpcbox, "0.15.0", "97c7126296a091602d372ebf5860a04f7bc795b45b33a984cad2b8e362774fd8", [:rebar3], [{:acceptor_pool, "~>1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~>0.12.0", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~>0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~>0.8.0", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "161abe9e17e7d1982efa6488adeaa13c3e847a07984a6e6b224e553368918647"}, "hpack": {:hex, :hpack_erl, "0.2.3", "17670f83ff984ae6cd74b1c456edde906d27ff013740ee4d9efaa4f1bf999633", [:rebar3], [], "hexpm", "06f580167c4b8b8a6429040df36cc93bba6d571faeaec1b28816523379cbb23a"}, "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, "nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"}, - "opentelemetry": {:hex, :opentelemetry, "1.0.2", "9ffa9ddcbec9356154681bc9d0a54bb20f0de0e8c6696ccc298b49633308782b", [:rebar3], [{:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "e774506333c8afd9133fee4ba69d06e6413ccad6ee76f10e59c3ec624009af23"}, - "opentelemetry_api": {:hex, :opentelemetry_api, "1.0.2", "91353ee40583b1d4f07d7b13ed62642abfec6aaa0d8a2114f07edafb2df781c5", [:mix, :rebar3], [], "hexpm", "2a8247f85c44216b883900067478d59955d11e58e5cfca7c884cd4f203ace3ac"}, - "opentelemetry_exporter": {:hex, :opentelemetry_exporter, "1.0.2", "19a102d1f04776399a915be27121852468318c27146e553faf28008e3e474972", [:rebar3], [{:grpcbox, ">= 0.0.0", [hex: :grpcbox, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.0", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.11", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}], "hexpm", "43de904dd7f482009bf1a40d591ab5ed25f603f1072a04a162e87f7f8c08dbb5"}, + "opentelemetry": {:hex, :opentelemetry, "1.0.5", "f0cd36ac8b30b68e8d70cec5bb88801ed7f3fe79aac67597054ed5490542e810", [:rebar3], [{:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "3b17f8933a58e1246f42a0c215840fd8218aebbcabdb0aac62b0c766fe85542e"}, + "opentelemetry_api": {:hex, :opentelemetry_api, "1.0.3", "77f9644c42340cd8b18c728cde4822ed55ae136f0d07761b78e8c54da46af93a", [:mix, :rebar3], [], "hexpm", "4293e06bd369bc004e6fad5edbb56456d891f14bd3f9f1772b18f1923e0678ea"}, + "opentelemetry_exporter": {:hex, :opentelemetry_exporter, "1.0.4", "60a64c75633a82b6c36a20043be355ac72a7b9b21633edd47407924c5596dde0", [:rebar3], [{:grpcbox, ">= 0.0.0", [hex: :grpcbox, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.0", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.11", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}], "hexpm", "61da65290fbb6cac3459b84b8cd630795bf608df93a2b2cc49251cae78200e5e"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, - "tls_certificate_check": {:hex, :tls_certificate_check, "1.13.0", "c407200ca837df4e6cf533874e9c4c8bcde2e71520ab215856bdaec9c7fb9252", [:rebar3], [{:ssl_verify_fun, "1.1.6", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "b45a3117931b808fdaea580f6e7ed1a7889c171dae792e632a166d6dda6e88a2"}, + "tls_certificate_check": {:hex, :tls_certificate_check, "1.15.0", "1c0377617a1111000bca3f4cd530b62690c9bd2dc9b868b4459203cd4d7f16ab", [:rebar3], [{:ssl_verify_fun, "1.1.6", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "87fd2e865078fdf8913a8c27bd8fe2be986383e31011f21d7f92cc5f7bc90731"}, } diff --git a/propagators/opentelemetry_process_propagator/test/task_supervisor_test.exs b/propagators/opentelemetry_process_propagator/test/task_supervisor_test.exs new file mode 100644 index 0000000..12872b6 --- /dev/null +++ b/propagators/opentelemetry_process_propagator/test/task_supervisor_test.exs @@ -0,0 +1,746 @@ +defmodule OpentelemetryProcessPropagator.TaskSupervisorTest do + use ExUnit.Case + + alias OpentelemetryProcessPropagator.Task + + require OpenTelemetry.Tracer, as: Tracer + require OpenTelemetry.Span + require Record + + for {name, spec} <- Record.extract_all(from_lib: "opentelemetry/include/otel_span.hrl") do + Record.defrecord(name, spec) + end + + for {name, spec} <- Record.extract_all(from_lib: "opentelemetry_api/include/opentelemetry.hrl") do + Record.defrecord(name, spec) + end + + setup do + :application.stop(:opentelemetry) + :application.set_env(:opentelemetry, :tracer, :otel_tracer_default) + + :application.set_env(:opentelemetry, :processors, [ + {:otel_batch_processor, %{scheduled_delay_ms: 1}} + ]) + + :application.start(:opentelemetry) + + :otel_batch_processor.set_exporter(:otel_exporter_pid, self()) + + :ok + end + + defmodule OtherAppModule do + def fun_with_span(value) do + Tracer.with_span "already traced fun", %{attributes: %{value: value}} do + value + end + end + end + + describe "async_stream" do + test "async_stream" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.async_stream(Test.TaskSupervisor, ["a", "b"], fn value -> + OtherAppModule.fun_with_span(value) + end) + |> Stream.run() + end + + assert_receive {:span, span(name: "parent span", span_id: root_span_id)} + + assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs1)} + assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs2)} + + assert [%{value: "a"}, %{value: "b"}] == Enum.sort([attributes(attrs1), attributes(attrs2)]) + end + + test "async_stream/5" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.async_stream(Test.TaskSupervisor, ["a", "b"], __MODULE__, :stream_ctx_test_function, [ + :async_stream + ]) + |> Stream.run() + end + + assert_receive {:span, span(name: "parent span", attributes: attrs)} + assert %{} == attributes(attrs) + + assert_receive {:span, span(name: "already traced fun", attributes: attrs1)} + assert_receive {:span, span(name: "already traced fun", attributes: attrs2)} + + assert [%{value: "a"}, %{value: "b"}] == Enum.sort([attributes(attrs1), attributes(attrs2)]) + end + + test "async_stream_with_span/6" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.async_stream_with_span( + Test.TaskSupervisor, + ["a", "b"], + "task span", + %{attributes: %{a: 1}}, + fn value -> + Tracer.set_attribute(:value, value) + + value + end + ) + |> Stream.run() + end + + assert_receive {:span, span(span_id: root_span_id, name: "parent span")}, 500 + + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs1)}, 500 + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs2)}, 500 + + assert [%{value: "a", a: 1}, %{value: "b", a: 1}] == Enum.sort([attributes(attrs1), attributes(attrs2)]) + end + + test "async_stream_with_span/8" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.async_stream_with_span( + Test.TaskSupervisor, + ["a", "b"], + "task span", + %{attributes: %{a: 1}}, + __MODULE__, + :stream_test_function, + [:async_stream_with_span] + ) + |> Stream.run() + end + + assert_receive {:span, span(span_id: root_span_id, name: "parent span")}, 500 + + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}, 500 + assert %{value: val1, a: 1} = attributes(attrs) + + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}, 500 + assert %{value: val2, a: 1} = attributes(attrs) + + assert ["a", "b"] == Enum.sort([val1, val2]) + end + + test "async_stream_with_linked_span/6" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.async_stream_with_linked_span( + Test.TaskSupervisor, + ["a", "b"], + "task span", + %{attributes: %{a: 1}}, + fn value -> + Tracer.set_attribute(:value, value) + + :ok + end + ) + |> Stream.run() + end + + assert_receive {:span, span(name: "parent span")}, 500 + + assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500 + assert %{value: val1, a: 1} = attributes(attrs) + assert [] != links + + assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500 + assert %{value: val2, a: 1} = attributes(attrs) + assert [] != links + + assert ["a", "b"] == Enum.sort([val1, val2]) + end + + test "async_stream_with_linked_span/8" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.async_stream_with_linked_span( + Test.TaskSupervisor, + ["a", "b"], + "task span", + %{attributes: %{a: 1}}, + __MODULE__, + :stream_test_function, + [:async_stream_with_linked_span] + ) + |> Stream.run() + end + + assert_receive {:span, span(name: "parent span")}, 500 + + assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500 + assert %{a: 1, value: val1} = attributes(attrs) + assert [] != links + + assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500 + assert %{a: 1, value: val2} = attributes(attrs) + assert [] != links + + assert ["a", "b"] == Enum.sort([val1, val2]) + end + end + + describe "async_stream_nolink" do + test "async_stream_nolink" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.async_stream_nolink(Test.TaskSupervisor, ["a", "b"], fn value -> + OtherAppModule.fun_with_span(value) + end) + |> Stream.run() + end + + assert_receive {:span, span(name: "parent span", span_id: root_span_id)} + + assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs1)} + assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs2)} + + assert [%{value: "a"}, %{value: "b"}] == Enum.sort([attributes(attrs1), attributes(attrs2)]) + end + + test "async_stream_nolink/5" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.async_stream_nolink( + Test.TaskSupervisor, + ["a", "b"], + __MODULE__, + :stream_ctx_test_function, + [ + :async_stream_nolink + ] + ) + |> Stream.run() + end + + assert_receive {:span, span(name: "parent span", attributes: attrs)} + assert %{} == attributes(attrs) + + assert_receive {:span, span(name: "already traced fun", attributes: attrs1)} + assert_receive {:span, span(name: "already traced fun", attributes: attrs2)} + + assert [%{value: "a"}, %{value: "b"}] == Enum.sort([attributes(attrs1), attributes(attrs2)]) + end + + test "async_stream_nolink_with_span/6" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.async_stream_nolink_with_span( + Test.TaskSupervisor, + ["a", "b"], + "task span", + %{attributes: %{a: 1}}, + fn value -> + Tracer.set_attribute(:value, value) + + value + end + ) + |> Stream.run() + end + + assert_receive {:span, span(span_id: root_span_id, name: "parent span")}, 500 + + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs1)}, 500 + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs2)}, 500 + + assert [%{value: "a", a: 1}, %{value: "b", a: 1}] == Enum.sort([attributes(attrs1), attributes(attrs2)]) + end + + test "async_stream_nolink_with_span/8" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.async_stream_nolink_with_span( + Test.TaskSupervisor, + ["a", "b"], + "task span", + %{attributes: %{a: 1}}, + __MODULE__, + :stream_test_function, + [:async_stream_nolink_with_span] + ) + |> Stream.run() + end + + assert_receive {:span, span(span_id: root_span_id, name: "parent span")}, 500 + + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}, 500 + assert %{value: val1, a: 1} = attributes(attrs) + + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}, 500 + assert %{value: val2, a: 1} = attributes(attrs) + + assert ["a", "b"] == Enum.sort([val1, val2]) + end + + test "async_stream_nolink_with_linked_span/6" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.async_stream_nolink_with_linked_span( + Test.TaskSupervisor, + ["a", "b"], + "task span", + %{attributes: %{a: 1}}, + fn value -> + Tracer.set_attribute(:value, value) + + :ok + end + ) + |> Stream.run() + end + + assert_receive {:span, span(name: "parent span")}, 500 + + assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500 + assert %{value: val1, a: 1} = attributes(attrs) + assert [] != links + + assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500 + assert %{value: val2, a: 1} = attributes(attrs) + assert [] != links + + assert ["a", "b"] == Enum.sort([val1, val2]) + end + + test "async_stream_nolink_with_linked_span/8" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.async_stream_nolink_with_linked_span( + Test.TaskSupervisor, + ["a", "b"], + "task span", + %{attributes: %{a: 1}}, + __MODULE__, + :stream_test_function, + [:async_stream_nolink_with_linked_span] + ) + |> Stream.run() + end + + assert_receive {:span, span(name: "parent span")}, 500 + + assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500 + assert %{a: 1, value: val1} = attributes(attrs) + assert [] != links + + assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500 + assert %{a: 1, value: val2} = attributes(attrs) + assert [] != links + + assert ["a", "b"] == Enum.sort([val1, val2]) + end + end + + describe "async" do + test "async" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span", %{attributes: %{a: 1}} do + Task.Supervisor.async(Test.TaskSupervisor, fn -> + Tracer.set_attribute(:value, :async) + + :ok + end) + |> Task.await() + end + + assert_receive {:span, span(name: "parent span", attributes: attrs)} + assert %{a: 1, value: :async} == attributes(attrs) + end + + test "async_mfa" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span", %{attributes: %{a: 1}} do + Task.Supervisor.async(Test.TaskSupervisor, __MODULE__, :test_function, [:async]) + |> Task.await() + end + + assert_receive {:span, span(name: "parent span", attributes: attrs)} + assert %{a: 1, value: :async} == attributes(attrs) + end + + test "async_with_span" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.async_with_span("task span", %{attributes: %{a: 1}}, Test.TaskSupervisor, fn -> + Tracer.set_attribute(:value, :async_with_span) + + :ok + end) + |> Task.await() + end + + assert_receive {:span, span(span_id: root_span_id, name: "parent span")} + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)} + + assert %{a: 1, value: :async_with_span} == attributes(attrs) + end + + test "async_with_span_mfa" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + result = + Task.Supervisor.async_with_span( + "task span", + %{attributes: %{a: 1}}, + Test.TaskSupervisor, + __MODULE__, + :test_function, + [:async_with_span] + ) + |> Task.await() + + assert :async_with_span == result + end + + assert_receive {:span, span(span_id: root_span_id, name: "parent span")} + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)} + + assert %{a: 1, value: :async_with_span} == attributes(attrs) + end + + test "async_with_linked_span" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.async_with_linked_span("linked task span", %{attributes: %{a: 1}}, Test.TaskSupervisor, fn -> + Tracer.set_attribute(:value, :async_with_linked_span) + + :ok + end) + |> Task.await() + end + + assert_receive {:span, span(name: "parent span")} + + assert_receive {:span, + span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")} + + assert %{a: 1, value: :async_with_linked_span} == attributes(attrs) + assert [] != links + end + + test "async_with_linked_span_mfa" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + result = + Task.Supervisor.async_with_linked_span( + "linked task span", + %{attributes: %{a: 1}}, + Test.TaskSupervisor, + __MODULE__, + :test_function, + [ + :async_with_linked_span + ] + ) + |> Task.await() + + assert :async_with_linked_span == result + end + + assert_receive {:span, span(name: "parent span")} + + assert_receive {:span, + span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")} + + assert %{a: 1, value: :async_with_linked_span} == attributes(attrs) + assert [] != links + end + end + + describe "async_nolink" do + test "async_nolink" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span", %{attributes: %{a: 1}} do + Task.Supervisor.async_nolink(Test.TaskSupervisor, fn -> + Tracer.set_attribute(:value, :async_nolink) + + :ok + end) + |> Task.await() + end + + assert_receive {:span, span(name: "parent span", attributes: attrs)} + assert %{a: 1, value: :async_nolink} == attributes(attrs) + end + + test "async_nolink_mfa" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span", %{attributes: %{a: 1}} do + Task.Supervisor.async_nolink(Test.TaskSupervisor, __MODULE__, :ctx_test_function, [ + :async_nolink + ]) + |> Task.await() + end + + assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)} + assert %{a: 1} == attributes(attrs) + + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "already traced fun", attributes: attrs)} + assert %{value: :async_nolink} == attributes(attrs) + end + + test "async_nolink_with_span" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.async_nolink_with_span("task span", %{attributes: %{a: 1}}, Test.TaskSupervisor, fn -> + Tracer.set_attribute(:value, :async_nolink_with_span) + + :ok + end) + |> Task.await() + end + + assert_receive {:span, span(span_id: root_span_id, name: "parent span")} + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)} + + assert %{a: 1, value: :async_nolink_with_span} == attributes(attrs) + end + + test "async_nolink_with_span_mfa" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + result = + Task.Supervisor.async_nolink_with_span( + "task span", + %{attributes: %{a: 1}}, + Test.TaskSupervisor, + __MODULE__, + :test_function, + [:async_nolink_with_span] + ) + |> Task.await() + + assert :async_nolink_with_span == result + end + + assert_receive {:span, span(span_id: root_span_id, name: "parent span")} + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)} + + assert %{a: 1, value: :async_nolink_with_span} == attributes(attrs) + end + + test "async_nolink_with_linked_span" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.async_nolink_with_linked_span( + "linked task span", + %{attributes: %{a: 1}}, + Test.TaskSupervisor, + fn -> + Tracer.set_attribute(:value, :async_nolink_with_linked_span) + + :ok + end + ) + |> Task.await() + end + + assert_receive {:span, span(name: "parent span")} + + assert_receive {:span, + span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")} + + assert %{a: 1, value: :async_nolink_with_linked_span} == attributes(attrs) + assert [] != links + end + + test "async_nolink_with_linked_span_mfa" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + result = + Task.Supervisor.async_nolink_with_linked_span( + "linked task span", + %{attributes: %{a: 1}}, + Test.TaskSupervisor, + __MODULE__, + :test_function, + [ + :async_nolink_with_linked_span + ] + ) + |> Task.await() + + assert :async_nolink_with_linked_span == result + end + + assert_receive {:span, span(name: "parent span")} + + assert_receive {:span, + span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")} + + assert %{a: 1, value: :async_nolink_with_linked_span} == attributes(attrs) + assert [] != links + end + end + + describe "start_child" do + test "start_child" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span", %{attributes: %{a: 1}} do + Task.Supervisor.start_child(Test.TaskSupervisor, fn -> + ctx_test_function(:start_child) + end) + end + + assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)} + assert %{a: 1} == attributes(attrs) + + assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs)} + assert %{value: :start_child} == attributes(attrs) + end + + test "start_child_mfa" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span", %{attributes: %{a: 1}} do + Task.Supervisor.start_child(Test.TaskSupervisor, __MODULE__, :ctx_test_function, [ + :start_child + ]) + end + + assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)} + assert %{a: 1} == attributes(attrs) + + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "already traced fun", attributes: attrs)} + assert %{value: :start_child} == attributes(attrs) + end + + test "start_child_with_span" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.start_child_with_span("task span", %{attributes: %{a: 1}}, Test.TaskSupervisor, fn -> + Tracer.set_attribute(:value, :start_child_with_span) + + :ok + end) + end + + assert_receive {:span, span(span_id: root_span_id, name: "parent span")} + assert_receive {:span, span(parent_span_id: ^root_span_id, attributes: attrs, name: "task span")} + assert %{a: 1, value: :start_child_with_span} == attributes(attrs) + end + + test "start_child_with_span_mfa" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.start_child_with_span( + "task span", + %{attributes: %{a: 1}}, + Test.TaskSupervisor, + __MODULE__, + :test_function, + [:start_child_with_span] + ) + end + + assert_receive {:span, span(span_id: root_span_id, name: "parent span")} + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)} + + assert %{a: 1, value: :start_child_with_span} == attributes(attrs) + end + + test "start_child_with_linked_span" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.start_child_with_linked_span( + "linked task span", + %{attributes: %{a: 1}}, + Test.TaskSupervisor, + fn -> + Tracer.set_attribute(:value, :start_child_with_linked_span) + + :ok + end + ) + end + + assert_receive {:span, span(name: "parent span")} + + assert_receive {:span, + span(parent_span_id: :undefined, attributes: attrs, links: links, name: "linked task span")} + + assert %{a: 1, value: :start_child_with_linked_span} == attributes(attrs) + assert [] != links + end + + test "start_child_with_linked_span_mfa" do + start_supervised!({Task.Supervisor, name: Test.TaskSupervisor}) + + Tracer.with_span "parent span" do + Task.Supervisor.start_child_with_linked_span( + "linked task span", + %{attributes: %{a: 1}}, + Test.TaskSupervisor, + __MODULE__, + :test_function, + [ + :start_child_with_linked_span + ] + ) + end + + assert_receive {:span, span(name: "parent span")} + + assert_receive {:span, + span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")} + + assert %{a: 1, value: :start_child_with_linked_span} == attributes(attrs) + assert [] != links + end + end + + def test_function(value) do + Tracer.set_attributes(%{value: value}) + + value + end + + def ctx_test_function(value) do + OtherAppModule.fun_with_span(value) + end + + def stream_ctx_test_function(value, _args) do + OtherAppModule.fun_with_span(value) + end + + def stream_test_function(value, _args) do + Tracer.set_attributes(%{value: value}) + + value + end + + defp attributes({_, _, _, _, attributes}), do: attributes +end diff --git a/propagators/opentelemetry_process_propagator/test/task_test.exs b/propagators/opentelemetry_process_propagator/test/task_test.exs new file mode 100644 index 0000000..0d847b3 --- /dev/null +++ b/propagators/opentelemetry_process_propagator/test/task_test.exs @@ -0,0 +1,472 @@ +defmodule OpentelemetryProcessPropagator.TaskTest do + use ExUnit.Case + + alias OpentelemetryProcessPropagator.Task + + require OpenTelemetry.Tracer, as: Tracer + require OpenTelemetry.Span + require Record + + for {name, spec} <- Record.extract_all(from_lib: "opentelemetry/include/otel_span.hrl") do + Record.defrecord(name, spec) + end + + for {name, spec} <- Record.extract_all(from_lib: "opentelemetry_api/include/opentelemetry.hrl") do + Record.defrecord(name, spec) + end + + setup do + :application.stop(:opentelemetry) + :application.set_env(:opentelemetry, :tracer, :otel_tracer_default) + + :application.set_env(:opentelemetry, :processors, [ + {:otel_batch_processor, %{scheduled_delay_ms: 1}} + ]) + + :application.start(:opentelemetry) + + :otel_batch_processor.set_exporter(:otel_exporter_pid, self()) + + :ok + end + + defmodule OtherAppModule do + def fun_with_span(value) do + Tracer.with_span "already traced fun", %{attributes: %{value: value}} do + value + end + end + end + + describe "async_stream" do + test "async_stream/3" do + Tracer.with_span "parent span" do + Task.async_stream(["a", "b"], fn value -> + OtherAppModule.fun_with_span(value) + end) + |> Stream.run() + end + + assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)} + assert %{} = attributes(attrs) + + assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs)} + %{value: val1} = attributes(attrs) + + assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs)} + %{value: val2} = attributes(attrs) + + assert ["a", "b"] == Enum.sort([val1, val2]) + end + + test "async_stream/5" do + Tracer.with_span "parent span" do + Task.async_stream(["a", "b"], __MODULE__, :stream_ctx_test_function, [ + :async_stream + ]) + |> Stream.run() + end + + assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)} + assert %{} = attributes(attrs) + + assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs)} + %{value: val1} = attributes(attrs) + + assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs)} + %{value: val2} = attributes(attrs) + + assert ["a", "b"] == Enum.sort([val1, val2]) + end + + test "async_stream_with_span/5" do + Tracer.with_span "parent span" do + Task.async_stream_with_span(["a", "b"], "task span", %{attributes: %{a: 1}}, fn value -> + Tracer.set_attribute(:value, value) + + value + end) + |> Stream.run() + end + + assert_receive {:span, span(span_id: root_span_id, name: "parent span")}, 500 + + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}, 500 + assert %{value: val1, a: 1} = attributes(attrs) + + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}, 500 + assert %{value: val2, a: 1} = attributes(attrs) + + assert ["a", "b"] == Enum.sort([val1, val2]) + end + + test "async_stream_with_span/7" do + Tracer.with_span "parent span" do + Task.async_stream_with_span( + ["a", "b"], + "task span", + %{attributes: %{a: 1}}, + __MODULE__, + :stream_test_function, + [:async_stream_with_span] + ) + |> Stream.run() + end + + assert_receive {:span, span(span_id: root_span_id, name: "parent span")}, 500 + + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}, 500 + assert %{value: val1, a: 1} = attributes(attrs) + + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)}, 500 + assert %{value: val2, a: 1} = attributes(attrs) + + assert ["a", "b"] == Enum.sort([val1, val2]) + end + + test "async_stream_with_linked_span/5" do + Tracer.with_span "parent span" do + Task.async_stream_with_linked_span(["a", "b"], "task span", %{attributes: %{a: 1}}, fn value -> + Tracer.set_attribute(:value, value) + + :ok + end) + |> Stream.run() + end + + assert_receive {:span, span(name: "parent span")}, 500 + + assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500 + assert %{value: val1, a: 1} = attributes(attrs) + assert [] != links + + assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500 + assert %{value: val2, a: 1} = attributes(attrs) + assert [] != links + + assert ["a", "b"] == Enum.sort([val1, val2]) + end + + test "async_stream_with_linked_span/7" do + Tracer.with_span "parent span" do + Task.async_stream_with_linked_span( + ["a", "b"], + "task span", + %{attributes: %{a: 1}}, + __MODULE__, + :stream_test_function, + [:async_stream_with_linked_span] + ) + |> Stream.run() + end + + assert_receive {:span, span(name: "parent span")}, 500 + + assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500 + assert %{a: 1, value: val1} = attributes(attrs) + assert [] != links + + assert_receive {:span, span(parent_span_id: :undefined, links: links, name: "task span", attributes: attrs)}, 500 + assert %{a: 1, value: val2} = attributes(attrs) + assert [] != links + + assert ["a", "b"] == Enum.sort([val1, val2]) + end + end + + describe "async" do + test "async/1" do + Tracer.with_span "parent span", %{attributes: %{a: 1}} do + Task.async(fn -> + Tracer.set_attribute(:value, :async) + + :ok + end) + |> Task.await() + end + + assert_receive {:span, span(name: "parent span", attributes: attrs)} + assert %{a: 1, value: :async} == attributes(attrs) + end + + test "async/3" do + Tracer.with_span "parent span", %{attributes: %{a: 1}} do + Task.async(__MODULE__, :ctx_test_function, [:async]) + |> Task.await() + end + + assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)} + assert %{a: 1} == attributes(attrs) + + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "already traced fun", attributes: attrs)} + assert %{value: :async} == attributes(attrs) + end + + test "async_with_span/3" do + Tracer.with_span "parent span" do + Task.async_with_span("task span", %{attributes: %{a: 1}}, fn -> + Tracer.set_attribute(:value, :async_with_span) + + :ok + end) + |> Task.await() + end + + assert_receive {:span, span(span_id: root_span_id, name: "parent span")} + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)} + + assert %{a: 1, value: :async_with_span} == attributes(attrs) + end + + test "async_with_span/5" do + Tracer.with_span "parent span" do + result = + Task.async_with_span("task span", %{attributes: %{a: 1}}, __MODULE__, :test_function, [:async_with_span]) + |> Task.await() + + assert :async_with_span == result + end + + assert_receive {:span, span(span_id: root_span_id, name: "parent span")} + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)} + + assert %{a: 1, value: :async_with_span} == attributes(attrs) + end + + test "async_with_linked_span/3" do + Tracer.with_span "parent span" do + Task.async_with_linked_span("linked task span", %{attributes: %{a: 1}}, fn -> + Tracer.set_attribute(:value, :async_with_linked_span) + + :ok + end) + |> Task.await() + end + + assert_receive {:span, span(name: "parent span")} + + assert_receive {:span, + span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")} + + assert %{a: 1, value: :async_with_linked_span} == attributes(attrs) + assert [] != links + end + + test "async_with_linked_span/6" do + Tracer.with_span "parent span" do + result = + Task.async_with_linked_span("linked task span", %{attributes: %{a: 1}}, __MODULE__, :test_function, [ + :async_with_linked_span + ]) + |> Task.await() + + assert :async_with_linked_span == result + end + + assert_receive {:span, span(name: "parent span")} + + assert_receive {:span, + span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")} + + assert %{a: 1, value: :async_with_linked_span} == attributes(attrs) + assert [] != links + end + end + + describe "start" do + test "start/3" do + Tracer.with_span "parent span", %{attributes: %{a: 1}} do + Task.start(fn -> + ctx_test_function(:start) + end) + end + + assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)} + assert %{a: 1} == attributes(attrs) + + assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs)} + assert %{value: :start} == attributes(attrs) + end + + test "start/5" do + Tracer.with_span "parent span", %{attributes: %{a: 1}} do + Task.start(__MODULE__, :ctx_test_function, [:start]) + end + + assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)} + assert %{a: 1} == attributes(attrs) + + assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs)} + assert %{value: :start} == attributes(attrs) + end + + test "start_with_span/3" do + Tracer.with_span "parent span" do + Task.start_with_span("task span", %{attributes: %{a: 1}}, fn -> + Tracer.set_attribute(:value, :start_with_span) + + :ok + end) + end + + assert_receive {:span, span(span_id: root_span_id, name: "parent span")} + assert_receive {:span, span(parent_span_id: ^root_span_id, attributes: attrs, name: "task span")} + assert %{a: 1, value: :start_with_span} == attributes(attrs) + end + + test "start_with_span/5" do + Tracer.with_span "parent span" do + Task.start_with_span("task span", %{attributes: %{a: 1}}, __MODULE__, :test_function, [:start_with_span]) + end + + assert_receive {:span, span(span_id: root_span_id, name: "parent span")} + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)} + + assert %{a: 1, value: :start_with_span} == attributes(attrs) + end + + test "start_with_linked_span" do + Tracer.with_span "parent span" do + Task.start_with_linked_span("linked task span", %{attributes: %{a: 1}}, fn -> + Tracer.set_attribute(:value, :start_with_linked_span) + + :ok + end) + end + + assert_receive {:span, span(name: "parent span")} + + assert_receive {:span, + span(parent_span_id: :undefined, attributes: attrs, links: links, name: "linked task span")} + + assert %{a: 1, value: :start_with_linked_span} == attributes(attrs) + assert [] != links + end + + test "start_with_linked_span_mfa" do + Tracer.with_span "parent span" do + Task.start_with_linked_span("linked task span", %{attributes: %{a: 1}}, __MODULE__, :test_function, [ + :start_with_linked_span + ]) + end + + assert_receive {:span, span(name: "parent span")} + + assert_receive {:span, + span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")} + + assert %{a: 1, value: :start_with_linked_span} == attributes(attrs) + assert [] != links + end + end + + test "start_link" do + Tracer.with_span "parent span", %{attributes: %{a: 1}} do + Task.start_link(fn -> + ctx_test_function(:start_link) + + :ok + end) + end + + assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)} + assert %{a: 1} == attributes(attrs) + + assert_receive {:span, span(name: "already traced fun", parent_span_id: ^root_span_id, attributes: attrs)} + assert %{value: :start_link} == attributes(attrs) + end + + test "start_link_mfa" do + Tracer.with_span "parent span", %{attributes: %{a: 1}} do + Task.start_link(__MODULE__, :ctx_test_function, [:start_link]) + end + + assert_receive {:span, span(name: "parent span", span_id: root_span_id, attributes: attrs)} + assert %{a: 1} == attributes(attrs) + + assert_receive {:span, span(parent_span_id: ^root_span_id, attributes: attrs, name: "already traced fun")} + assert %{value: :start_link} == attributes(attrs) + end + + test "start_link_with_span" do + Tracer.with_span "parent span" do + Task.start_link_with_span("task span", %{attributes: %{a: 1}}, fn -> + Tracer.set_attribute(:value, :start_link_with_span) + + :ok + end) + end + + assert_receive {:span, span(name: "parent span", span_id: root_span_id)} + assert_receive {:span, span(parent_span_id: ^root_span_id, attributes: attrs, name: "task span")} + + assert %{a: 1, value: :start_link_with_span} == attributes(attrs) + end + + test "start_link_with_span_mfa" do + Tracer.with_span "parent span" do + Task.start_link_with_span("task span", %{attributes: %{a: 1}}, __MODULE__, :test_function, [:start_link_with_span]) + end + + assert_receive {:span, span(name: "parent span", span_id: root_span_id)} + assert_receive {:span, span(parent_span_id: ^root_span_id, name: "task span", attributes: attrs)} + + assert %{a: 1, value: :start_link_with_span} == attributes(attrs) + end + + test "start_link_with_linked_span" do + Tracer.with_span "parent span" do + Task.start_link_with_linked_span("linked task span", %{attributes: %{a: 1}}, fn -> + Tracer.set_attribute(:value, :start_link_with_linked_span) + + :ok + end) + end + + assert_receive {:span, span(name: "parent span")} + assert_receive {:span, span(parent_span_id: :undefined, attributes: attrs, links: links, name: "linked task span")} + + assert %{a: 1, value: :start_link_with_linked_span} == attributes(attrs) + assert [] != links + end + + test "start_link_with_linked_span_mfa" do + Tracer.with_span "parent span" do + Task.start_link_with_linked_span("linked task span", %{attributes: %{a: 1}}, __MODULE__, :test_function, [ + :start_link_with_linked_span + ]) + end + + assert_receive {:span, span(name: "parent span")} + assert_receive {:span, span(parent_span_id: :undefined, links: links, attributes: attrs, name: "linked task span")} + + assert %{a: 1, value: :start_link_with_linked_span} == attributes(attrs) + assert [] != links + end + + def test_function(value) do + Tracer.set_attributes(%{value: value}) + + value + end + + def ctx_test_function(value) do + OtherAppModule.fun_with_span(value) + end + + def link_test_function(value) do + OtherAppModule.fun_with_span(value) + end + + def stream_ctx_test_function(value, _args) do + OtherAppModule.fun_with_span(value) + end + + def stream_test_function(value, _args) do + Tracer.set_attributes(%{value: value}) + + value + end + + defp attributes({_, _, _, _, attributes}), do: attributes +end