//
// experimental/impl/parallel_group.hpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP
#define BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP

#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)

#include <boost/asio/detail/config.hpp>
#include <atomic>
#include <memory>
#include <new>
#include <tuple>
#include <boost/asio/associated_cancellation_slot.hpp>
#include <boost/asio/detail/recycling_allocator.hpp>
#include <boost/asio/detail/type_traits.hpp>
#include <boost/asio/dispatch.hpp>

#include <boost/asio/detail/push_options.hpp>

namespace boost {
namespace asio {
namespace experimental {
namespace detail {

// Stores the result from an individual asynchronous operation.
template <typename T, typename = void>
struct parallel_group_op_result
{
public:
  parallel_group_op_result()
    : has_value_(false)
  {
  }

  parallel_group_op_result(parallel_group_op_result&& other)
    : has_value_(other.has_value_)
  {
    if (has_value_)
      new (&u_.value_) T(std::move(other.get()));
  }

  ~parallel_group_op_result()
  {
    if (has_value_)
      u_.value_.~T();
  }

  T& get() noexcept
  {
    return u_.value_;
  }

  template <typename... Args>
  void emplace(Args&&... args)
  {
    new (&u_.value_) T(std::forward<Args>(args)...);
    has_value_ = true;
  }

private:
  union u
  {
    u() {}
    ~u() {}
    char c_;
    T value_;
  } u_;
  bool has_value_;
};

// Proxy completion handler for the group of parallel operatations. Unpacks and
// concatenates the individual operations' results, and invokes the user's
// completion handler.
template <typename Handler, typename... Ops>
struct parallel_group_completion_handler
{
  typedef typename decay<
      typename prefer_result<
        typename associated_executor<Handler>::type,
        execution::outstanding_work_t::tracked_t
      >::type
    >::type executor_type;

  parallel_group_completion_handler(Handler&& h)
    : handler_(std::move(h)),
      executor_(
          boost::asio::prefer(
            boost::asio::get_associated_executor(handler_),
            execution::outstanding_work.tracked))
  {
  }

  executor_type get_executor() const noexcept
  {
    return executor_;
  }

  void operator()()
  {
    this->invoke(boost::asio::detail::make_index_sequence<sizeof...(Ops)>());
  }

  template <std::size_t... I>
  void invoke(boost::asio::detail::index_sequence<I...>)
  {
    this->invoke(std::tuple_cat(std::move(std::get<I>(args_).get())...));
  }

  template <typename... Args>
  void invoke(std::tuple<Args...>&& args)
  {
    this->invoke(std::move(args),
        boost::asio::detail::index_sequence_for<Args...>());
  }

  template <typename... Args, std::size_t... I>
  void invoke(std::tuple<Args...>&& args,
      boost::asio::detail::index_sequence<I...>)
  {
    std::move(handler_)(completion_order_, std::move(std::get<I>(args))...);
  }

  Handler handler_;
  executor_type executor_;
  std::array<std::size_t, sizeof...(Ops)> completion_order_{};
  std::tuple<
      parallel_group_op_result<
        typename parallel_op_signature_as_tuple<
          typename completion_signature_of<Ops>::type
        >::type
      >...
    > args_{};
};

// Shared state for the parallel group.
template <typename Condition, typename Handler, typename... Ops>
struct parallel_group_state
{
  parallel_group_state(Condition&& c, Handler&& h)
    : cancellation_condition_(std::move(c)),
      handler_(std::move(h))
  {
  }

  // The number of operations that have completed so far. Used to determine the
  // order of completion.
  std::atomic<unsigned int> completed_{0};

  // The non-none cancellation type that resulted from a cancellation condition.
  // Stored here for use by the group's initiating function.
  std::atomic<cancellation_type_t> cancel_type_{cancellation_type::none};

  // The number of cancellations that have been requested, either on completion
  // of the operations within the group, or via the cancellation slot for the
  // group operation. Initially set to the number of operations to prevent
  // cancellation signals from being emitted until after all of the group's
  // operations' initiating functions have completed.
  std::atomic<unsigned int> cancellations_requested_{sizeof...(Ops)};

  // The number of operations that are yet to complete. Used to determine when
  // it is safe to invoke the user's completion handler.
  std::atomic<unsigned int> outstanding_{sizeof...(Ops)};

  // The cancellation signals for each operation in the group.
  boost::asio::cancellation_signal cancellation_signals_[sizeof...(Ops)];

  // The cancellation condition is used to determine whether the results from an
  // individual operation warrant a cancellation request for the whole group.
  Condition cancellation_condition_;

  // The proxy handler to be invoked once all operations in the group complete.
  parallel_group_completion_handler<Handler, Ops...> handler_;
};

// Handler for an individual operation within the parallel group.
template <std::size_t I, typename Condition, typename Handler, typename... Ops>
struct parallel_group_op_handler
{
  typedef boost::asio::cancellation_slot cancellation_slot_type;

  parallel_group_op_handler(
    std::shared_ptr<parallel_group_state<Condition, Handler, Ops...> > state)
    : state_(std::move(state))
  {
  }

  cancellation_slot_type get_cancellation_slot() const noexcept
  {
    return state_->cancellation_signals_[I].slot();
  }

  template <typename... Args>
  void operator()(Args... args)
  {
    // Capture this operation into the completion order.
    state_->handler_.completion_order_[state_->completed_++] = I;

    // Determine whether the results of this operation require cancellation of
    // the whole group.
    cancellation_type_t cancel_type = state_->cancellation_condition_(args...);

    // Capture the result of the operation into the proxy completion handler.
    std::get<I>(state_->handler_.args_).emplace(std::move(args)...);

    if (cancel_type != cancellation_type::none)
    {
      // Save the type for potential use by the group's initiating function.
      state_->cancel_type_ = cancel_type;

      // If we are the first operation to request cancellation, emit a signal
      // for each operation in the group.
      if (state_->cancellations_requested_++ == 0)
        for (std::size_t i = 0; i < sizeof...(Ops); ++i)
          if (i != I)
            state_->cancellation_signals_[i].emit(cancel_type);
    }

    // If this is the last outstanding operation, invoke the user's handler.
    if (--state_->outstanding_ == 0)
      boost::asio::dispatch(std::move(state_->handler_));
  }

  std::shared_ptr<parallel_group_state<Condition, Handler, Ops...> > state_;
};

// Handler for an individual operation within the parallel group that has an
// explicitly specified executor.
template <typename Executor, std::size_t I,
    typename Condition, typename Handler, typename... Ops>
struct parallel_group_op_handler_with_executor :
  parallel_group_op_handler<I, Condition, Handler, Ops...>
{
  typedef parallel_group_op_handler<I, Condition, Handler, Ops...> base_type;
  typedef boost::asio::cancellation_slot cancellation_slot_type;
  typedef Executor executor_type;

  parallel_group_op_handler_with_executor(
      std::shared_ptr<parallel_group_state<Condition, Handler, Ops...> > state,
      executor_type ex)
    : parallel_group_op_handler<I, Condition, Handler, Ops...>(std::move(state))
  {
    cancel_proxy_ =
      &this->state_->cancellation_signals_[I].slot().template
        emplace<cancel_proxy>(this->state_, std::move(ex));
  }

  cancellation_slot_type get_cancellation_slot() const noexcept
  {
    return cancel_proxy_->signal_.slot();
  }

  executor_type get_executor() const noexcept
  {
    return cancel_proxy_->executor_;
  }

  // Proxy handler that forwards the emitted signal to the correct executor.
  struct cancel_proxy
  {
    cancel_proxy(
        std::shared_ptr<parallel_group_state<
          Condition, Handler, Ops...> > state,
        executor_type ex)
      : state_(std::move(state)),
        executor_(std::move(ex))
    {
    }

    void operator()(cancellation_type_t type)
    {
      if (auto state = state_.lock())
      {
        boost::asio::cancellation_signal* sig = &signal_;
        boost::asio::dispatch(executor_,
            [state, sig, type]{ sig->emit(type); });
      }
    }

    std::weak_ptr<parallel_group_state<Condition, Handler, Ops...> > state_;
    boost::asio::cancellation_signal signal_;
    executor_type executor_;
  };

  cancel_proxy* cancel_proxy_;
};

// Helper to launch an operation using the correct executor, if any.
template <std::size_t I, typename Op, typename = void>
struct parallel_group_op_launcher
{
  template <typename Condition, typename Handler, typename... Ops>
  static void launch(Op& op,
    const std::shared_ptr<parallel_group_state<
      Condition, Handler, Ops...> >& state)
  {
    typedef typename associated_executor<Op>::type ex_type;
    ex_type ex = boost::asio::get_associated_executor(op);
    std::move(op)(
        parallel_group_op_handler_with_executor<ex_type, I,
          Condition, Handler, Ops...>(state, std::move(ex)));
  }
};

// Specialised launcher for operations that specify no executor.
template <std::size_t I, typename Op>
struct parallel_group_op_launcher<I, Op,
    typename enable_if<
      is_same<
        typename associated_executor<
          Op>::asio_associated_executor_is_unspecialised,
        void
      >::value
    >::type>
{
  template <typename Condition, typename Handler, typename... Ops>
  static void launch(Op& op,
    const std::shared_ptr<parallel_group_state<
      Condition, Handler, Ops...> >& state)
  {
    std::move(op)(
        parallel_group_op_handler<I, Condition, Handler, Ops...>(state));
  }
};

template <typename Condition, typename Handler, typename... Ops>
struct parallel_group_cancellation_handler
{
  parallel_group_cancellation_handler(
    std::shared_ptr<parallel_group_state<Condition, Handler, Ops...> > state)
    : state_(std::move(state))
  {
  }

  void operator()(cancellation_type_t cancel_type)
  {
    // If we are the first place to request cancellation, i.e. no operation has
    // yet completed and requested cancellation, emit a signal for each
    // operation in the group.
    if (cancel_type != cancellation_type::none)
      if (auto state = state_.lock())
        if (state->cancellations_requested_++ == 0)
          for (std::size_t i = 0; i < sizeof...(Ops); ++i)
            state->cancellation_signals_[i].emit(cancel_type);
  }

  std::weak_ptr<parallel_group_state<Condition, Handler, Ops...> > state_;
};

template <typename Condition, typename Handler,
    typename... Ops, std::size_t... I>
void parallel_group_launch(Condition cancellation_condition, Handler handler,
    std::tuple<Ops...>& ops, boost::asio::detail::index_sequence<I...>)
{
  // Get the user's completion handler's cancellation slot, so that we can allow
  // cancellation of the entire group.
  typename associated_cancellation_slot<Handler>::type slot
    = boost::asio::get_associated_cancellation_slot(handler);

  // Create the shared state for the operation.
  typedef parallel_group_state<Condition, Handler, Ops...> state_type;
  std::shared_ptr<state_type> state = std::allocate_shared<state_type>(
      boost::asio::detail::recycling_allocator<state_type,
        boost::asio::detail::thread_info_base::parallel_group_tag>(),
      std::move(cancellation_condition), std::move(handler));

  // Initiate each individual operation in the group.
  int fold[] = { 0,
    ( parallel_group_op_launcher<I, Ops>::launch(std::get<I>(ops), state),
      0 )...
  };
  (void)fold;

  // Check if any of the operations has already requested cancellation, and if
  // so, emit a signal for each operation in the group.
  if ((state->cancellations_requested_ -= sizeof...(Ops)) > 0)
    for (auto& signal : state->cancellation_signals_)
      signal.emit(state->cancel_type_);

  // Register a handler with the user's completion handler's cancellation slot.
  if (slot.is_connected())
    slot.template emplace<
      parallel_group_cancellation_handler<
        Condition, Handler, Ops...> >(state);
}

} // namespace detail
} // namespace experimental

template <template <typename, typename> class Associator,
    typename Handler, typename... Ops, typename DefaultCandidate>
struct associator<Associator,
    experimental::detail::parallel_group_completion_handler<Handler, Ops...>,
    DefaultCandidate>
  : Associator<Handler, DefaultCandidate>
{
  static typename Associator<Handler, DefaultCandidate>::type get(
      const experimental::detail::parallel_group_completion_handler<
        Handler, Ops...>& h,
      const DefaultCandidate& c = DefaultCandidate()) BOOST_ASIO_NOEXCEPT
  {
    return Associator<Handler, DefaultCandidate>::get(h.handler_, c);
  }
};

} // namespace asio
} // namespace boost

#include <boost/asio/detail/pop_options.hpp>

#endif // BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP