HelenOS sources
This source file includes following definitions.
- destroy
- mark_set
- is_set
- set_exception
- has_exception
- throw_stored_exception
- wait
- wait_for
- wait_until
- timed_wait_
- set_value
- set_value
- get
- set_value
- get
- destroy
- wait
- timed_wait_
- destroy
- wait
- invoke_
- timed_wait_
#ifndef LIBCPP_BITS_THREAD_SHARED_STATE
#define LIBCPP_BITS_THREAD_SHARED_STATE
#include <__bits/exception.hpp>
#include <__bits/functional/function.hpp>
#include <__bits/functional/invoke.hpp>
#include <__bits/refcount_obj.hpp>
#include <__bits/thread/future_common.hpp>
#include <__bits/thread/threading.hpp>
#include <cerrno>
#include <thread>
#include <tuple>
namespace std::aux
{
class shared_state_base: public aux::refcount_obj
{
public:
shared_state_base()
: mutex_{}, condvar_{}, value_set_{false},
exception_{}, has_exception_{false}
{
threading::mutex::init(mutex_);
threading::condvar::init(condvar_);
}
void destroy() override
{
}
void mark_set(bool set = true) noexcept
{
value_set_ = set;
}
bool is_set() const noexcept
{
return value_set_;
}
void set_exception(exception_ptr ptr, bool set = true)
{
exception_ = ptr;
has_exception_ = set;
}
bool has_exception() const noexcept
{
return has_exception_;
}
void throw_stored_exception() const
{
if (has_exception_)
rethrow_exception(exception_);
}
virtual void wait() const
{
aux::threading::mutex::lock(
const_cast<aux::mutex_t&>(mutex_)
);
while (!value_set_)
{
aux::threading::condvar::wait(
const_cast<aux::condvar_t&>(condvar_),
const_cast<aux::mutex_t&>(mutex_)
);
}
aux::threading::mutex::unlock(
const_cast<aux::mutex_t&>(mutex_)
);
}
template<class Rep, class Period>
future_status
wait_for(const chrono::duration<Rep, Period>& rel_time) const
{
if (value_set_)
return future_status::ready;
aux::threading::mutex::lock(
const_cast<aux::mutex_t&>(mutex_)
);
auto res = timed_wait_(
aux::threading::time::convert(rel_time)
);
aux::threading::mutex::unlock(
const_cast<aux::mutex_t&>(mutex_)
);
return res;
}
template<class Clock, class Duration>
future_status
wait_until(const chrono::time_point<Clock, Duration>& abs_time)
{
if (value_set_)
return future_status::ready;
aux::threading::mutex::lock(
const_cast<aux::mutex_t&>(mutex_)
);
auto res = timed_wait_(
aux::threading::time::convert(abs_time - Clock::now())
);
aux::threading::mutex::unlock(
const_cast<aux::mutex_t&>(mutex_)
);
return res;
}
~shared_state_base() override = default;
protected:
aux::mutex_t mutex_;
aux::condvar_t condvar_;
bool value_set_;
exception_ptr exception_;
bool has_exception_;
virtual future_status timed_wait_(aux::time_unit_t time) const
{
auto res = aux::threading::condvar::wait_for(
const_cast<aux::condvar_t&>(condvar_),
const_cast<aux::mutex_t&>(mutex_), time
);
return res == ETIMEOUT ? future_status::timeout
: future_status::ready;
}
};
template<class R>
class shared_state: public shared_state_base
{
public:
shared_state()
: shared_state_base{}
{ }
void set_value(const R& val, bool set)
{
aux::threading::mutex::lock(mutex_);
value_ = val;
value_set_ = set;
aux::threading::mutex::unlock(mutex_);
if (set)
aux::threading::condvar::broadcast(condvar_);
}
void set_value(R&& val, bool set = true)
{
aux::threading::mutex::lock(mutex_);
value_ = std::move(val);
value_set_ = set;
aux::threading::mutex::unlock(mutex_);
if (set)
aux::threading::condvar::broadcast(condvar_);
}
R& get()
{
return value_;
}
protected:
R value_;
};
template<>
class shared_state<void>: public shared_state_base
{
public:
shared_state()
: shared_state_base{}
{ }
void set_value()
{
value_set_ = true;
aux::threading::condvar::broadcast(condvar_);
}
void get()
{ }
};
template<class R, class F, class... Args>
class async_shared_state: public shared_state<R>
{
public:
async_shared_state(F&& f, Args&&... args)
: shared_state<R>{}, thread_{}
{
thread_ = thread{
[=](){
try
{
if constexpr (!is_same_v<R, void>)
this->set_value(invoke(f, args...));
else
{
invoke(f, args...);
this->mark_set(true);
}
}
catch(const exception& __exception)
{
this->set_exception(make_exception_ptr(__exception));
}
}
};
}
void destroy() override
{
if (!this->is_set())
thread_.join();
}
void wait() const override
{
if (!this->is_set())
const_cast<thread&>(thread_).join();
}
~async_shared_state() override
{
destroy();
}
protected:
future_status timed_wait_(aux::time_unit_t time) const override
{
aux::threading::time::sleep(time);
if (this->value_set_)
return future_status::ready;
else
return future_status::timeout;
}
private:
thread thread_;
};
template<class R, class F, class... Args>
class deferred_shared_state: public shared_state<R>
{
public:
template<class G>
deferred_shared_state(G&& f, Args&&... args)
: shared_state<R>{}, func_{forward<F>(f)},
args_{forward<Args>(args)...}
{ }
void destroy() override
{
aux::threading::mutex::lock(this->mutex_);
if (!this->is_set())
invoke_(make_index_sequence<sizeof...(Args)>{});
aux::threading::mutex::unlock(this->mutex_);
}
void wait() const override
{
if (!this->is_set())
{
const_cast<
deferred_shared_state<R, F, Args...>*
>(this)->invoke_(make_index_sequence<sizeof...(Args)>{});
}
}
~deferred_shared_state() override
{
destroy();
}
protected:
function<R(decay_t<Args>...)> func_;
tuple<decay_t<Args>...> args_;
template<size_t... Is>
void invoke_(index_sequence<Is...>)
{
try
{
if constexpr (!is_same_v<R, void>)
this->set_value(invoke(move(func_), get<Is>(move(args_))...));
else
{
invoke(move(func_), get<Is>(move(args_))...);
this->mark_set(true);
}
}
catch(const exception& __exception)
{
this->set_exception(make_exception_ptr(__exception));
}
}
future_status timed_wait_(aux::time_unit_t) const override
{
return future_status::deferred;
}
};
template<class R>
void set_state_value_at_thread_exit(shared_state<R>* state)
{
__unimplemented();
}
template<class R>
void set_state_exception_at_thread_exit(shared_state<R>* state)
{
__unimplemented();
}
}
#endif
HelenOS homepage, sources at GitHub