diff --git a/src/generic/IntrusiveContainers.h b/src/generic/IntrusiveContainers.h index def2c273568673c8f56dff8db64dbceb97345769..5e15b95d98f86b4a4b84b189f8de37963af0ccc1 100644 --- a/src/generic/IntrusiveContainers.h +++ b/src/generic/IntrusiveContainers.h @@ -362,20 +362,18 @@ private: public: IntrusiveQueueNemesis(): head(nullptr), tail(nullptr) {} - bool push(T& first, T& last) { + void push(T& first, T& last) { last.link[NUM].next = nullptr; // make sure previous write to 'next' and following write to 'head' are not reordered with this update T* prev = __atomic_exchange_n(&tail, &last, __ATOMIC_SEQ_CST); // swing tail to last of new element(s) if (prev) { prev->link[NUM].next = &first; - return false; } else { head = &first; - return true; } } - bool push(T& elem) { return push(elem, elem); } + void push(T& elem) { push(elem, elem); } T* peek() { return head; } @@ -400,30 +398,29 @@ public: return element; } - bool transferAllFrom(IntrusiveQueue<T,NUM,CNT,LT>& eq) { - if (eq.empty()) return false; + void transferAllFrom(IntrusiveQueue<T,NUM,CNT,LT>& eq) { + if (eq.empty()) return; T* first = eq.front(); T* last = eq.popAll(); - return push(*first, *last); + push(*first, *last); } }; -// NOTE WELL: This simple design using Link* for 'head' and 'tail' and downcasting -// only works, if Link is the first class that T inherits from. -// // http://doc.cat-v.org/inferno/concurrent_gc/concurrent_gc.pdf // https://www.cs.rice.edu/~johnmc/papers/cqueues-mellor-crummey-TR229-1987.pdf // http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue // https://github.com/samanbarghi/MPSCQ/blob/master/src/MPSCQueue.h +// +// NOTE WELL: Downcasting 'stub' only works, if Link is the first class that T inherits from. template<typename T, size_t NUM, size_t CNT, typename LT, bool Blocking> class IntrusiveQueueStub { static_assert(NUM < CNT, "NUM >= CNT"); public: typedef LT Link; private: - Link stub; - Link* head; - Link* tail; + Link stub; + T* head; + T* tail; // peek/pop operate in chunks of elements and re-append stub after each chunk // after re-appending stub, tail points to stub, if no further insertions -> empty! @@ -446,8 +443,9 @@ private: } public: - IntrusiveQueueStub() : head(&stub), tail(&stub) { - if (Blocking) tail = (Link*)(uintptr_t(tail) | 1); // mark queue empty + IntrusiveQueueStub() { + head = tail = stub.link[NUM].next = stub.link[NUM].prev = static_cast<T*>(&stub); + if (Blocking) tail = (T*)(uintptr_t(tail) | 1); // mark queue empty } bool push(Link& first, Link& last) { @@ -458,7 +456,7 @@ public: empty = uintptr_t(prev) & 1; // check empty marking prev = (Link*)(uintptr_t(prev) & ~uintptr_t(1)); // clear marking } - prev->link[NUM].next = &static_cast<T&>(first); // append segments to previous tail + prev->link[NUM].next = static_cast<T*>(&first); // append segments to previous tail return empty; } @@ -466,13 +464,13 @@ public: T* peek() { if (!checkStub()) return nullptr; - return static_cast<T*>(head); + return head; } template<bool Peeked = false> T* pop() { if (!Peeked && !checkStub()) return nullptr; - T* retval = static_cast<T*>(head); // head will be returned + T* retval = head; // head will be returned while (!(T* volatile)head->link[NUM].next) Pause(); // producer in push() head = head->link[NUM].next; // remove head #if TESTING_ENABLE_ASSERTIONS @@ -483,8 +481,8 @@ public: bool transferAllFrom(IntrusiveQueue<T,NUM,CNT,LT>& eq) { if (eq.empty()) return false; - Link* first = eq.front(); - Link* last = eq.popAll(); + T* first = eq.front(); + T* last = eq.popAll(); return push(*first, *last); } }; diff --git a/src/generic/SpinLocks.h b/src/generic/SpinLocks.h index ee6a3912827551a18a171993b3058b9668977f2c..9f29eea97f30182361256007a9ab53343d393e8b 100644 --- a/src/generic/SpinLocks.h +++ b/src/generic/SpinLocks.h @@ -33,7 +33,7 @@ protected: volatile bool locked; public: BinaryLock() : locked(false) {} - bool test() { return locked; } + bool test() const { return locked; } bool tryAcquire() { if (locked) return false; return !__atomic_test_and_set(&locked, __ATOMIC_SEQ_CST); @@ -61,7 +61,7 @@ class BinaryOwnerLock { size_t counter; public: BinaryOwnerLock() : owner(noOwner), counter(0) {} - bool test() { return owner != noOwner; } + bool test() const { return owner != noOwner; } size_t tryAcquire(T caller) { if (owner != caller) { if (owner != noOwner) return 0; @@ -99,7 +99,7 @@ class TicketLock { size_t ticket; public: TicketLock() : serving(0), ticket(0) {} - bool test() { return serving != ticket; } + bool test() const { return serving != ticket; } bool tryAcquire() { if (serving != ticket) return false; size_t tryticket = serving; @@ -128,7 +128,7 @@ private: Node* tail; public: MCSLock() : tail(nullptr) {} - bool test() { return tail != nullptr; } + bool test() const { return tail != nullptr; } void acquire(Node& n) { n.next = nullptr; Node* prev = __atomic_exchange_n(&tail, &n, __ATOMIC_SEQ_CST); diff --git a/src/generic/stats.cc b/src/generic/stats.cc index 6c84a88458e2d93303cacef6fcac41f1a55018ca..f72b06d3acd1eec24d2a10ba2f51886e5226b14d 100644 --- a/src/generic/stats.cc +++ b/src/generic/stats.cc @@ -48,7 +48,7 @@ bool TimerStats::print(ostream& os) { bool PollerStats::print(ostream& os) { if (events == 0) return false; StatsObject::print(os); - os << events << blocks; + os << events << polls << blocks; return true; } diff --git a/src/generic/stats.h b/src/generic/stats.h index 0f41aa1733017c0fc92984786f7bc6918b67e92c..94275bfa506ffbc7151dfb5fa8772eb505eb332f 100644 --- a/src/generic/stats.h +++ b/src/generic/stats.h @@ -136,6 +136,7 @@ struct TimerStats : public StatsObject { struct PollerStats : public StatsObject { Average events; + Counter polls; Counter blocks; PollerStats(void* o, const char* n = "Poller") : StatsObject(o, n) {} bool print(ostream& os); diff --git a/src/kernel/KernelLocks.h b/src/kernel/KernelLocks.h index abae780724f0bc52cc7725fc43ea8e88e6726f2f..a08ceffa99e7521b0631290e7dc60a3983588448 100644 --- a/src/kernel/KernelLocks.h +++ b/src/kernel/KernelLocks.h @@ -24,7 +24,7 @@ template<typename SpinLock> class KernelSpinLock : protected SpinLock { public: - bool test() { return SpinLock::test(); } + bool test() const { return SpinLock::test(); } bool tryAcquire() { LocalProcessor::lock(); if (SpinLock::tryAcquire()) return true; diff --git a/src/libfibre/Poller.cc b/src/libfibre/Poller.cc index 0e64a08cae6cc21c3c94193c0cf8513d15d88c10..4110f850ec842a1d4392e886c4f25e05597eb145 100644 --- a/src/libfibre/Poller.cc +++ b/src/libfibre/Poller.cc @@ -27,7 +27,6 @@ inline void BasePoller::pollLoop(T& This, bool pollerFibre) { #endif SystemProcessor::setupFakeContext((StackContext*)&This, _friend<BasePoller>()); while (!This.pollTerminate) { - This.stats->blocks.count(); int evcnt = This.blockingPoll(); for (;;) { // drain all events with non-blocking epoll_wait This.stats->events.add(evcnt); @@ -54,6 +53,7 @@ inline void BasePoller::pollLoop(T& This, bool pollerFibre) { p.first->bulkResume(p.second, _friend<BasePoller>()); } if (evcnt < maxPoll) break; + This.stats->polls.count(); #if __FreeBSD__ static const timespec ts = Time::zero(); evcnt = kevent(This.pollFD, nullptr, 0, This.events, maxPoll, &ts); @@ -68,6 +68,8 @@ inline void BasePoller::pollLoop(T& This, bool pollerFibre) { inline int PollerThread::blockingPoll() { int evcnt; for (;;) { + stats->blocks.count(); + stats->polls.count(); #if __FreeBSD__ evcnt = kevent(pollFD, nullptr, 0, events, maxPoll, nullptr); // blocking #else // __linux__ below @@ -122,6 +124,7 @@ void Poller::stop() { inline int Poller::blockingPoll() { int evcnt; for (;;) { + stats->polls.count(); #if __FreeBSD__ static const timespec ts = Time::zero(); evcnt = kevent(pollFD, nullptr, 0, events, maxPoll, &ts); @@ -130,6 +133,7 @@ inline int Poller::blockingPoll() { #endif if (evcnt > 0) break; if (evcnt < 0) { GENASSERT1(errno == EINTR, errno); } // gracefully handle EINTR + stats->blocks.count(); _lfEventEngine->blockPollFD(pollFD); } if (paused) pauseSem.P(); diff --git a/src/runtime/BaseProcessor.h b/src/runtime/BaseProcessor.h index 6509f2447d465a8226e8b4ca270e6466581b92c9..5b222eaa4a516b29abe52bb90efd33610239b16c 100644 --- a/src/runtime/BaseProcessor.h +++ b/src/runtime/BaseProcessor.h @@ -26,13 +26,9 @@ class ReadyQueue { typedef SystemLock ReadyLock; ReadyLock readyLock; #if TESTING_CONCURRENT_READYQUEUE -#if TESTING_BLOCKING_CONCURRENT - BlockingStackMPSC queue; + StackMPSC<> queue[numPriority]; #else - StackMPSC queue[numPriority]; -#endif -#else /* TESTING_CONCURRENT_READYQUEUE */ - StackQueue queue[numPriority]; + StackQueue<> queue[numPriority]; #endif volatile size_t count; @@ -44,18 +40,6 @@ public: template<bool AffinityCheck = false> StackContext* dequeue() { #if TESTING_CONCURRENT_READYQUEUE -#if TESTING_BLOCKING_CONCURRENT - StackContext* s = AffinityCheck ? queue.peek() : queue.pop(); - if fastpath(s) { - if (AffinityCheck) { - if (s->getAffinity()) return nullptr; - else queue.pop<true>(); - } - // just using '1' here seems to confuse clang without optimization - __atomic_sub_fetch(&count, size_t(1), __ATOMIC_RELAXED); - return s; - } -#else while (count > 0) { // spin until push completes for (size_t p = 0; p <= maxPriority; p += 1) { StackContext* s = AffinityCheck ? queue[p].peek() : queue[p].pop(); @@ -70,7 +54,6 @@ public: } } } -#endif /* TESTING_BLOCKING_CONCURRENT */ #else ScopedLock<ReadyLock> sl(readyLock); for (size_t p = 0; p <= maxPriority; p += 1) { @@ -94,13 +77,8 @@ public: bool singleEnqueue(StackContext& s) { #if TESTING_CONCURRENT_READYQUEUE -#if TESTING_BLOCKING_CONCURRENT - __atomic_add_fetch(&count, 1, __ATOMIC_RELAXED); - return queue.push(s); -#else queue[s.getPriority()].push(s); return __atomic_add_fetch(&count, 1, __ATOMIC_RELAXED) == 1; -#endif /* TESTING_BLOCKING_CONCURRENT */ #else ScopedLock<ReadyLock> sl(readyLock); queue[s.getPriority()].push(s); @@ -111,16 +89,11 @@ public: bool bulkEnqueue(ResumeQueue& rq) { #if TESTING_CONCURRENT_READYQUEUE -#if TESTING_BLOCKING_CONCURRENT - __atomic_add_fetch(&count, rq.count, __ATOMIC_RELAXED); - return queue.transferAllFrom(rq.queue); -#else bool retcode = (__atomic_fetch_add(&count, rq.count, __ATOMIC_RELAXED) == 0); for (size_t p = 0; p <= maxPriority; p += 1) { queue[p].transferAllFrom(rq.queue[p]); } return retcode; -#endif /* TESTING_BLOCKING_CONCURRENT */ #else ScopedLock<ReadyLock> sl(readyLock); for (size_t p = 0; p <= maxPriority; p += 1) { diff --git a/src/runtime/BlockingSync.h b/src/runtime/BlockingSync.h index 1ba9d62241e70e63f5aaf6f68cf93136d827f9e8..0100b23b146762e02f09124e3d67756abf5bca0d 100644 --- a/src/runtime/BlockingSync.h +++ b/src/runtime/BlockingSync.h @@ -111,7 +111,7 @@ protected: static_assert(withQueue == false, "wrong 'suspend' without queue"); GENASSERT(lock.test()); } - void prepare(StackContext& cs, StackList& queue) { + void prepare(StackContext& cs, StackList<>& queue) { static_assert(withQueue == true, "wrong 'suspend' with queue"); GENASSERT(lock.test()); queue.push_back(cs); @@ -123,7 +123,7 @@ public: unlockAndSuspend(cs, resumeRace, lock); return !timedOut; } - bool suspend(StackList& queue, StackContext& cs = *CurrStack()) { + bool suspend(StackList<>& queue, StackContext& cs = *CurrStack()) { prepare(cs, queue); unlockAndSuspend(cs, true, lock); return !timedOut; @@ -132,7 +132,7 @@ public: timedOut = true; if (!ot) return; ScopedLock<Lock> al(lock); - StackList::remove(*ot); + StackList<>::remove(*ot); } }; @@ -143,7 +143,7 @@ class TimeoutSynchronizationInfo : public TimeoutInfo, public SynchronizationInf using BaseSI::timedOut; public: TimeoutSynchronizationInfo(TimerQueue& tq, Lock& l) : TimeoutInfo(tq), BaseSI(l) {} - bool suspendAbsolute(StackList& queue, const Time& timeout, const Time& now, StackContext& cs = *CurrStack()) { + bool suspendAbsolute(StackList<>& queue, const Time& timeout, const Time& now, StackContext& cs = *CurrStack()) { BaseSI::prepare(cs, queue); TimeoutInfo::prepareAbsolute(cs, timeout, now); unlockAndSuspend(cs, true, lock, tQueue.lock); @@ -191,7 +191,7 @@ inline void TimerQueueGeneric<Lock>::checkExpiry(const Time& now) { template<typename Lock> class BlockingQueue { - StackList queue; + StackList<> queue; BlockingQueue(const BlockingQueue&) = delete; // no copy BlockingQueue& operator=(const BlockingQueue&) = delete; // no assignment @@ -206,11 +206,11 @@ public: StackContext* s = queue.front(); while (s != queue.edge()) { ResumeInfo* ri = s->raceResume(); - StackContext* ns = StackList::next(*s); + StackContext* ns = StackList<>::next(*s); if (ri) { ri->cancelTimeout(); ri->cancelSync(); - StackList::remove(*s); + StackList<>::remove(*s); } s = ns; } @@ -241,10 +241,10 @@ public: } StackContext* unblock(bool resumeNow = true) { // not concurrency-safe; better hold lock - for (StackContext* s = queue.front(); s != queue.edge(); s = StackList::next(*s)) { + for (StackContext* s = queue.front(); s != queue.edge(); s = StackList<>::next(*s)) { ResumeInfo* ri = s->raceResume(); if (ri) { - StackList::remove(*s); + StackList<>::remove(*s); ri->cancelTimeout(); if (resumeNow) s->resume<true>(); Runtime::debugB( "Stack ", FmtHex(&s), " resumed from ", FmtHex(&queue)); @@ -398,7 +398,7 @@ public: FifoMutex() : owner(nullptr) {} // baton passing requires serialization at destruction ~FifoMutex() { ScopedLock<Lock> sl(lock); } - bool test() { return owner != nullptr; } + bool test() const { return owner != nullptr; } bool tryAcquire() { return internalAcquire(false); } bool acquire(const Time& timeout = Time::zero()) { return internalAcquire(true, timeout); } @@ -432,7 +432,7 @@ protected: public: BargingMutex() : owner(nullptr) {} - bool test() { return owner != nullptr; } + bool test() const { return owner != nullptr; } bool tryAcquire() { return internalAcquire(false); } bool acquire(const Time& timeout = Time::zero()) { return internalAcquire(true, timeout); } @@ -478,7 +478,7 @@ protected: public: SpinMutex() : owner(nullptr), sem(1) {} - bool test() { return __atomic_load_n(&owner, __ATOMIC_RELAXED) != nullptr; } + bool test() const { return owner != nullptr; } bool tryAcquire() { return internalAcquire(false); } bool acquire(const Time& timeout = Time::zero()) { return internalAcquire(true, timeout); } diff --git a/src/runtime/StackContext.h b/src/runtime/StackContext.h index a364b2501359d14a5b3eb4e43fb58110ca536274..9f11c090c19a6fc57a471f09cc97d7ff321e9f88 100644 --- a/src/runtime/StackContext.h +++ b/src/runtime/StackContext.h @@ -38,24 +38,30 @@ class EventEngine; #if TESTING_ENABLE_DEBUGGING #define STACKLINKCOUNT 2 -typedef IntrusiveList<StackContext,1,STACKLINKCOUNT,DoubleLink<StackContext,STACKLINKCOUNT>> GlobalStackList; -extern SystemLock* _globalStackLock; -extern GlobalStackList* _globalStackList; #else #define STACKLINKCOUNT 1 #endif -typedef IntrusiveList<StackContext,0,STACKLINKCOUNT,DoubleLink<StackContext,STACKLINKCOUNT>> StackList; -typedef IntrusiveQueue<StackContext,0,STACKLINKCOUNT,DoubleLink<StackContext,STACKLINKCOUNT>> StackQueue; +template <size_t NUM = 0> class StackList : +public IntrusiveList<StackContext,NUM,STACKLINKCOUNT,DoubleLink<StackContext,STACKLINKCOUNT>> {}; + +template <size_t NUM = 0> class StackQueue : +public IntrusiveQueue<StackContext,NUM,STACKLINKCOUNT,DoubleLink<StackContext,STACKLINKCOUNT>> {}; + +template <size_t NUM = 0> class StackMPSC : #if TESTING_NEMESIS_READYQUEUE -typedef IntrusiveQueueNemesis<StackContext,0,STACKLINKCOUNT,DoubleLink<StackContext,STACKLINKCOUNT>> StackMPSC; -typedef IntrusiveQueueNemesis<StackContext,0,STACKLINKCOUNT,DoubleLink<StackContext,STACKLINKCOUNT>> BlockingStackMPSC; +public IntrusiveQueueNemesis<StackContext,NUM,STACKLINKCOUNT,DoubleLink<StackContext,STACKLINKCOUNT>> {}; #else -typedef IntrusiveQueueStub<StackContext,0,STACKLINKCOUNT,DoubleLink<StackContext,STACKLINKCOUNT>> StackMPSC; -typedef IntrusiveQueueStub<StackContext,0,STACKLINKCOUNT,DoubleLink<StackContext,STACKLINKCOUNT>,true> BlockingStackMPSC; +public IntrusiveQueueStub<StackContext,NUM,STACKLINKCOUNT,DoubleLink<StackContext,STACKLINKCOUNT>> {}; #endif -class StackContext : public StackList::Link { +#if TESTING_ENABLE_DEBUGGING +typedef StackList<1> GlobalStackList; +extern SystemLock* _globalStackLock; +extern GlobalStackList* _globalStackList; +#endif + +class StackContext : public StackList<>::Link { vaddr stackPointer; // holds stack pointer while stack inactive VirtualProcessor* resumeProcessor; // next resumption on this processor size_t priority; // scheduling priority @@ -174,13 +180,8 @@ public: struct ResumeQueue { size_t count; -#if TESTING_CONCURRENT_READYQUEUE && TESTING_BLOCKING_CONCURRENT - StackQueue queue; - void push(StackContext& sc) { queue.push(sc); count += 1; } -#else - StackQueue queue[numPriority]; + StackQueue<> queue[numPriority]; void push(StackContext& sc) { queue[sc.getPriority()].push(sc); count += 1; } -#endif ResumeQueue() : count(0) {} }; diff --git a/src/testoptions.h b/src/testoptions.h index 77debd74bb8bcaacdbfc678fb4b6a59c39d880ed..e1f98ba15131b7e03f0779f5b941a7a41a08e2a1 100644 --- a/src/testoptions.h +++ b/src/testoptions.h @@ -17,7 +17,6 @@ #define TESTING_CONCURRENT_READYQUEUE 1 // vs. traditional locking #define TESTING_NEMESIS_READYQUEUE 1 // vs. stub-based MPSC -//#define TESTING_BLOCKING_CONCURRENT 1 // using MPSC blocking semantics (no prio levels) //#define TESTING_IDLE_SPIN 128 // spin before idle/halt threshold //#define TESTING_MUTEX_FIFO 1 // use fifo/baton mutex //#define TESTING_MUTEX_BARGING 1 // use blocking/barging mutex