71 #ifndef CGU_ASYNC_QUEUE_H
72 #define CGU_ASYNC_QUEUE_H
87 #ifdef CGU_USE_SCHED_YIELD
103 virtual const char*
what()
const throw() {
return "AsyncQueuePopError: popping from empty AsyncQueue object\n";}
147 template <
class T,
class Container = std::list<T> >
class AsyncQueue {
154 std::queue<T, Container> q;
167 #ifdef CGU_USE_SCHED_YIELD
192 void push(
const value_type& obj) {
217 q.push(std::move(obj));
243 template<
class... Args>
246 q.emplace(std::forward<Args>(args)...);
270 void pop(value_type& obj) {
315 obj = std::move(q.front());
440 if (
this != &other) {
441 lock2(mutex, other.mutex);
472 lock2(mutex, rhs.mutex);
475 std::queue<T, Container> temp{rhs.q};
506 q = std::move(rhs.q);
638 std::queue<T, Container> q;
651 #ifdef CGU_USE_SCHED_YIELD
676 void push(
const value_type& obj) {
702 q.push(std::move(obj));
729 template<
class... Args>
732 q.emplace(std::forward<Args>(args)...);
757 void pop(value_type& obj) {
802 obj = std::move(q.front());
895 while (q.empty()) cond.
wait(mutex);
946 while (q.empty()) cond.
wait(mutex);
948 obj = std::move(q.front());
1124 obj = std::move(q.front());
1270 if (
this != &other) {
1271 lock2(mutex, other.mutex);
1276 if (!other.q.empty()) other.cond.
broadcast();
1315 lock2(mutex, rhs.mutex);
1318 std::queue<T, Container> temp{rhs.q};
1357 q = std::move(rhs.q);
1443 q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
1482 template <
class T,
class Container>
1511 template <
class T,
class Container>
1517 #if defined(CGU_USE_INHERITABLE_QUEUE) && !defined(DOXYGEN_PARSING)
1526 template <
class T,
class Allocator>
1527 class AsyncQueue<T,
std::list<T, Allocator> > {
1529 typedef std::list<T, Allocator> Container;
1530 typedef typename Container::value_type
value_type;
1531 typedef typename Container::size_type
size_type;
1534 mutable Thread::Mutex mutex;
1540 class Q:
public std::queue<T, Container> {
1542 void splice_end(Container&& lst) {
1543 this->c.splice(this->c.end(), std::move(lst));
1545 void unsplice_beginning(Container& lst) {
1546 lst.splice(lst.begin(), this->c, this->c.begin());
1551 void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
1554 if (!m2.trylock()) {
1559 #ifdef CGU_USE_SCHED_YIELD
1568 void push(
const value_type& obj) {
1569 Container temp{obj};
1570 Thread::Mutex::Lock lock{mutex};
1572 q.splice_end(std::move(temp));
1575 void push(value_type&& obj) {
1582 temp.push_back(std::move(obj));
1583 Thread::Mutex::Lock lock{mutex};
1585 q.splice_end(std::move(temp));
1588 template<
class... Args>
1589 void emplace(Args&&... args) {
1591 temp.emplace_back(std::forward<Args>(args)...);
1592 Thread::Mutex::Lock lock{mutex};
1594 q.splice_end(std::move(temp));
1597 void pop(value_type& obj) {
1598 Thread::Mutex::Lock lock{mutex};
1599 if (q.empty())
throw AsyncQueuePopError();
1605 Thread::Mutex::Lock lock{mutex};
1606 if (q.empty())
throw AsyncQueuePopError();
1607 obj = std::move(q.front());
1617 Thread::Mutex::Lock lock{mutex};
1618 if (q.empty())
throw AsyncQueuePopError();
1620 q.unsplice_beginning(temp);
1622 obj = std::move(temp.front());
1626 Thread::Mutex::Lock lock{mutex};
1627 if (q.empty())
throw AsyncQueuePopError();
1631 bool empty()
const {
1632 Thread::Mutex::Lock lock{mutex};
1636 size_type
size()
const {
1637 Thread::Mutex::Lock lock{mutex};
1642 if (
this != &other) {
1643 lock2(mutex, other.mutex);
1652 lock2(mutex, rhs.mutex);
1662 Thread::Mutex::Lock lock{mutex};
1663 q = std::move(rhs.q);
1674 Thread::Mutex::Lock lock{mutex};
1687 template <
class T,
class Allocator>
1688 class AsyncQueueDispatch<T,
std::list<T, Allocator> > {
1690 typedef std::list<T, Allocator> Container;
1691 typedef typename Container::value_type
value_type;
1692 typedef typename Container::size_type
size_type;
1695 mutable Thread::Mutex mutex;
1702 class Q:
public std::queue<T, Container> {
1704 void splice_end(Container&& lst) {
1705 this->c.splice(this->c.end(), std::move(lst));
1707 void unsplice_beginning(Container& lst) {
1708 lst.splice(lst.begin(), this->c, this->c.begin());
1713 void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
1716 if (!m2.trylock()) {
1721 #ifdef CGU_USE_SCHED_YIELD
1730 void push(
const value_type& obj) {
1731 Container temp{obj};
1732 Thread::Mutex::Lock lock{mutex};
1734 q.splice_end(std::move(temp));
1738 void push(value_type&& obj) {
1745 temp.push_back(std::move(obj));
1746 Thread::Mutex::Lock lock{mutex};
1748 q.splice_end(std::move(temp));
1752 template<
class... Args>
1753 void emplace(Args&&... args) {
1755 temp.emplace_back(std::forward<Args>(args)...);
1756 Thread::Mutex::Lock lock{mutex};
1758 q.splice_end(std::move(temp));
1762 void pop(value_type& obj) {
1763 Thread::Mutex::Lock lock{mutex};
1764 if (q.empty())
throw AsyncQueuePopError();
1770 Thread::Mutex::Lock lock{mutex};
1771 if (q.empty())
throw AsyncQueuePopError();
1772 obj = std::move(q.front());
1782 Thread::Mutex::Lock lock{mutex};
1783 if (q.empty())
throw AsyncQueuePopError();
1785 q.unsplice_beginning(temp);
1787 obj = std::move(temp.front());
1791 Thread::Mutex::Lock lock{mutex};
1792 while (q.empty()) cond.wait(mutex);
1793 Thread::CancelBlock b;
1799 Thread::Mutex::Lock lock{mutex};
1800 while (q.empty()) cond.wait(mutex);
1801 Thread::CancelBlock b;
1802 obj = std::move(q.front());
1809 bool cancelstate_restored =
false;
1811 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
1816 pthread_setcancelstate(old_state, &ignore);
1817 cancelstate_restored =
true;
1818 Thread::Mutex::TrackLock lock{mutex};
1819 while (q.empty()) cond.wait(mutex);
1820 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &ignore);
1821 cancelstate_restored =
false;
1823 q.unsplice_beginning(temp);
1825 obj = std::move(temp.front());
1826 pthread_setcancelstate(old_state, &ignore);
1836 if (!cancelstate_restored) {
1837 pthread_setcancelstate(old_state, &ignore);
1846 Thread::Mutex::Lock lock{mutex};
1848 if (cond.timed_wait(mutex, ts))
return true;
1850 Thread::CancelBlock b;
1859 Thread::Mutex::Lock lock{mutex};
1861 if (cond.timed_wait(mutex, ts))
return true;
1863 Thread::CancelBlock b;
1864 obj = std::move(q.front());
1874 bool cancelstate_restored =
false;
1876 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
1881 pthread_setcancelstate(old_state, &ignore);
1882 cancelstate_restored =
true;
1883 Thread::Mutex::TrackLock lock{mutex};
1885 if (cond.timed_wait(mutex, ts))
return true;
1887 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &ignore);
1888 cancelstate_restored =
false;
1890 q.unsplice_beginning(temp);
1892 obj = std::move(temp.front());
1893 pthread_setcancelstate(old_state, &ignore);
1904 if (!cancelstate_restored) {
1905 pthread_setcancelstate(old_state, &ignore);
1912 Thread::Mutex::Lock lock{mutex};
1913 if (q.empty())
throw AsyncQueuePopError();
1917 bool empty()
const {
1918 Thread::Mutex::Lock lock{mutex};
1922 size_type
size()
const {
1923 Thread::Mutex::Lock lock{mutex};
1928 if (
this != &other) {
1929 lock2(mutex, other.mutex);
1933 if (!q.empty()) cond.broadcast();
1934 if (!other.q.empty()) other.cond.broadcast();
1940 lock2(mutex, rhs.mutex);
1945 if (!q.empty()) cond.broadcast();
1951 Thread::Mutex::Lock lock{mutex};
1952 q = std::move(rhs.q);
1953 if (!q.empty()) cond.broadcast();
1962 q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
1965 Thread::Mutex::Lock lock{mutex};
1971 #endif // CGU_USE_INHERITABLE_QUEUE
void push(value_type &&obj)
Definition: async_queue.h:215
void pop(value_type &obj)
Definition: async_queue.h:757
void pop()
Definition: async_queue.h:389
int lock() noexcept
Definition: mutex.h:147
size_type size() const
Definition: async_queue.h:1247
void swap(Cgu::AsyncQueue< T, Container > &q1, Cgu::AsyncQueue< T, Container > &q2)
Definition: async_queue.h:1483
int unlock() noexcept
Definition: mutex.h:170
~AsyncQueueDispatch()
Definition: async_queue.h:1452
Container::size_type size_type
Definition: async_queue.h:150
void emplace(Args &&...args)
Definition: async_queue.h:730
void pop_dispatch(value_type &obj)
Definition: async_queue.h:893
void move_pop(value_type &obj)
Definition: async_queue.h:312
A wrapper class for pthread condition variables.
Definition: mutex.h:449
AsyncQueue(const AsyncQueue &rhs)
Definition: async_queue.h:572
AsyncQueueDispatch()=default
An exception thrown if calling pop() on a AsyncQueue or AsyncQueueDispatch object fails because the q...
Definition: async_queue.h:102
void move_pop_basic(value_type &obj)
Definition: async_queue.h:860
A thread-safe asynchronous queue.
Definition: async_queue.h:147
A thread-safe asynchronous queue with a blocking pop() method.
Definition: async_queue.h:630
void pop()
Definition: async_queue.h:1214
static void get_abs_time(timespec &ts, unsigned int millisec)
void push(value_type &&obj)
Definition: async_queue.h:700
A class enabling the cancellation state of a thread to be controlled.
Definition: thread.h:723
bool move_pop_timed_dispatch(value_type &obj, unsigned int millisec)
Definition: async_queue.h:1116
A scoped locking class for exception safe Mutex locking.
Definition: mutex.h:207
bool move_pop_timed_dispatch_basic(value_type &obj, unsigned int millisec)
Definition: async_queue.h:1198
void move_pop_dispatch_basic(value_type &obj)
Definition: async_queue.h:1017
void move_pop_basic(value_type &obj)
Definition: async_queue.h:373
AsyncQueueDispatch & operator=(AsyncQueueDispatch &&rhs)
Definition: async_queue.h:1355
~AsyncQueue()
Definition: async_queue.h:579
AsyncQueueDispatch(AsyncQueueDispatch &&rhs)
Definition: async_queue.h:1412
virtual const char * what() const
Definition: async_queue.h:103
void push(const value_type &obj)
Definition: async_queue.h:192
int trylock() noexcept
Definition: mutex.h:157
int timed_wait(Mutex &mutex, const timespec &abs_time)
Definition: mutex.h:578
AsyncQueue(AsyncQueue &&rhs)
Definition: async_queue.h:547
A wrapper class for pthread mutexes.
Definition: mutex.h:117
void move_pop_dispatch(value_type &obj)
Definition: async_queue.h:944
void pop(value_type &obj)
Definition: async_queue.h:270
AsyncQueue & operator=(AsyncQueue &&rhs)
Definition: async_queue.h:504
Provides wrapper classes for pthread mutexes and condition variables, and scoped locking classes for ...
bool empty() const
Definition: async_queue.h:405
size_type size() const
Definition: async_queue.h:422
Definition: application.h:44
void push(const value_type &obj)
Definition: async_queue.h:676
Container::value_type value_type
Definition: async_queue.h:149
bool pop_timed_dispatch(value_type &obj, unsigned int millisec)
Definition: async_queue.h:1055
AsyncQueueDispatch(const AsyncQueueDispatch &rhs)
Definition: async_queue.h:1442
int signal() noexcept
Definition: mutex.h:472
bool empty() const
Definition: async_queue.h:1230
int broadcast() noexcept
Definition: mutex.h:483
void move_pop(value_type &obj)
Definition: async_queue.h:799
Container::value_type value_type
Definition: async_queue.h:632
void emplace(Args &&...args)
Definition: async_queue.h:244
AsyncQueueDispatch & operator=(const AsyncQueueDispatch &rhs)
Definition: async_queue.h:1313
void swap(AsyncQueue &other)
Definition: async_queue.h:439
Container container_type
Definition: async_queue.h:634
AsyncQueue & operator=(const AsyncQueue &rhs)
Definition: async_queue.h:470
Container container_type
Definition: async_queue.h:151
void swap(AsyncQueueDispatch &other)
Definition: async_queue.h:1269
#define CGU_GLIB_MEMORY_SLICES_FUNCS
Definition: cgu_config.h:84
Container::size_type size_type
Definition: async_queue.h:633
int wait(Mutex &mutex)
Definition: mutex.h:513