From c56e9d4efa88c70fda55127ca54b9bff72c3633e Mon Sep 17 00:00:00 2001 From: Martin Karsten <mkarsten@uwaterloo.ca> Date: Wed, 20 Jun 2018 18:20:09 -0400 Subject: [PATCH] - merge FibreCluster and PollerCluster --- src/libfibre/EventEngine.h | 4 +- src/libfibre/Fibre.h | 4 +- src/libfibre/FibreCluster.h | 110 ++++++++++++++++++++++++++++ src/libfibre/Poller.cc | 2 +- src/libfibre/Poller.h | 57 +++++++++++++-- src/libfibre/SystemProcessor.cc | 1 + src/libfibre/SystemProcessor.h | 122 ++------------------------------ src/libfibre/cfibre.cc | 12 ++-- src/libfibre/echotest.cpp | 6 +- src/libfibre/fibre.h | 12 ++-- src/libfibre/include/u++.h | 2 +- src/libfibre/lfcore.cc | 5 +- src/libfibre/webserver.cpp | 4 +- 13 files changed, 194 insertions(+), 147 deletions(-) create mode 100644 src/libfibre/FibreCluster.h diff --git a/src/libfibre/EventEngine.h b/src/libfibre/EventEngine.h index 7a274e8..6614922 100644 --- a/src/libfibre/EventEngine.h +++ b/src/libfibre/EventEngine.h @@ -49,8 +49,8 @@ class EventEngine { // file operations are not considered blocking in terms of select/poll/epoll // therefore, all file operations are executed on dedicated Cluster/SP(s) - FibreCluster diskCluster; - SystemProcessor diskProcessor; + FibreClusterGeneric<NoPoller> diskCluster; + SystemProcessor diskProcessor; MasterPoller masterPoller; // runs without cluster diff --git a/src/libfibre/Fibre.h b/src/libfibre/Fibre.h index abfa33c..3484825 100644 --- a/src/libfibre/Fibre.h +++ b/src/libfibre/Fibre.h @@ -18,7 +18,7 @@ #define _Fibre_h_ 1 #include "runtime/BlockingSync.h" -#include "libfibre/SystemProcessor.h" +#include "libfibre/FibreCluster.h" #include <sys/mman.h> @@ -79,7 +79,7 @@ class Fibre : public StackContext { public: // general constructor - Fibre(PollerCluster& cluster = CurrCluster(), size_t sz = defaultStackSize, bool bg = false) + Fibre(FibreCluster& cluster = CurrCluster(), size_t sz = defaultStackSize, bool bg = false) : StackContext(cluster, bg), stackSize(stackAlloc(sz)) { initDebug(); } // helper constructor to start fibre right away diff --git a/src/libfibre/FibreCluster.h b/src/libfibre/FibreCluster.h new file mode 100644 index 0000000..d3558b8 --- /dev/null +++ b/src/libfibre/FibreCluster.h @@ -0,0 +1,110 @@ +/****************************************************************************** + Copyright © 2017 Martin Karsten + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +******************************************************************************/ +#ifndef _FibreCluster_h_ +#define _FibreCluster_h_ 1 + +#include "runtime/Cluster.h" +#include "libfibre/Poller.h" +#include "libfibre/SystemProcessor.h" + +#include <semaphore.h> + +template<typename P = Poller> +class FibreClusterGeneric : public Cluster { + volatile bool paused; + SystemSemaphore<true> pauseWaitSem; + SystemSemaphore<true> pauseContSem; + + P poller; + +public: + FibreClusterGeneric(size_t t = 1) : paused(false), pauseContSem(1), poller(*this, t) {} + + ~FibreClusterGeneric() { +#if TESTING_POLLER_FIBRES + GENASSERT(poller.stopped() || procCount() > 0); +#elif TESTING_POLLER_IDLEWAIT + poller.finish(); +#endif + } + + P& getPoller() { return poller; } + + void stopPoller() { +#if TESTING_POLLER_FIBRES + GENASSERT(!poller.stopped() && procCount() > 0); +#endif + poller.stop(); + } + + void pause() { + poller.pause(); + pauseContSem.P(); + paused = true; + idleLock.acquire(); + BaseProcessor* proc = idleList.front(); + while (proc != idleList.edge()) { + proc->wakeUp(); + proc = ProcessorList::next(*proc); + } + idleLock.release(); + size_t start = (&CurrProcessor().getCluster() == this) ? 1 : 0; + for (size_t i = start; i < procCount(); i += 1) pauseWaitSem.P(); + } + + void resume() { + paused = false; + pauseContSem.V(); + poller.resume(); + } + + size_t setProcessorIdle(BaseProcessor& proc, bool terminate) { + if (paused) { + pauseWaitSem.V(); + pauseContSem.P(); + pauseContSem.V(); + } + size_t c = Cluster::setProcessorIdle(proc, terminate); +#if !TESTING_POLLER_FIBRES && TESTING_POLLER_IDLEWAIT + poller.signal(c); +#endif + return c; + } + +#if !TESTING_POLLER_FIBRES + bool waitForPoll() { +#if TESTING_POLLER_IDLEWAIT + ScopedLock<SystemLock> al(idleLock); + return poller.wait(idleLock, idleCount); +#else + return false; +#endif + } +#endif +}; + +typedef FibreClusterGeneric<Poller> FibreCluster; + +static inline FibreCluster& CurrCluster() { + return reinterpret_cast<FibreCluster&>(CurrProcessor().getCluster()); +} + +static inline Poller& CurrPoller() { + return CurrCluster().getPoller(); +} + +#endif /* _FibreCluster_h_ */ diff --git a/src/libfibre/Poller.cc b/src/libfibre/Poller.cc index 80a0855..556c5b9 100644 --- a/src/libfibre/Poller.cc +++ b/src/libfibre/Poller.cc @@ -100,7 +100,7 @@ void Poller::pollLoopSetup(void* This) { pollLoop(*reinterpret_cast<Poller*>(This), true); } -Poller::Poller(PollerCluster& cluster) : BasePoller("PollerFibre") { +Poller::Poller(FibreCluster& cluster) : BasePoller("PollerFibre") { pollFibre = new Fibre(cluster, defaultStackSize, true); pollFibre->setPriority(lowPriority); pollFibre->run(pollLoopSetup, this); diff --git a/src/libfibre/Poller.h b/src/libfibre/Poller.h index e26552f..13b70a1 100644 --- a/src/libfibre/Poller.h +++ b/src/libfibre/Poller.h @@ -26,7 +26,7 @@ #endif class Fibre; -class PollerCluster; +template<typename> class FibreClusterGeneric; class BasePoller { protected: @@ -181,7 +181,7 @@ class Poller : public BasePoller { FifoSemaphore<BinaryLock<>,true> pauseSem; static void pollLoopSetup(void*); public: - Poller(PollerCluster&); + Poller(FibreClusterGeneric<Poller>&, size_t t); ~Poller() { if (!pollTerminate) stop(); } bool stopped() { return pollTerminate; } void stop(); @@ -192,15 +192,64 @@ public: #else class Poller : public PollerThread { - PollerCluster& cluster; + FibreClusterGeneric<Poller>& cluster; + +#if TESTING_POLLER_IDLEWAIT + size_t pollThreshold; + SystemCondition pollCond; +#endif + static void* pollLoopSetup(void*); + public: - Poller(PollerCluster& c) : PollerThread("PollerThread"), cluster(c) { + Poller(FibreClusterGeneric<Poller>& c, size_t t) : PollerThread("PollerThread"), cluster(c) { +#if TESTING_POLLER_IDLEWAIT + pollThreshold = t; +#endif PollerThread::start(pollLoopSetup); } + inline int blockingPoll(); + +#if TESTING_POLLER_IDLEWAIT + bool wait(SystemLock& lock, size_t count) { + if (count >= pollThreshold) return false; +#if TESTING_POLLER_IDLETIMEDWAIT + Time t; + SYSCALL(clock_gettime(CLOCK_REALTIME, &t)); + pollCond.wait(lock, t + Time(0,10000000)); // 10 ms = 10,000,000 ns; +#else + pollCond.wait(lock); +#endif // TESTING_POLLER_IDLETIMEDWAIT + return true; + } + + void signal(size_t c) { + if (c == pollThreshold) pollCond.signal(); + } + + void finish() { +#if !TESTING_POLLER_IDLETIMEDWAIT + pollThreshold = 0; + pollCond.signal() +#endif // !TESTING_POLLER_IDLETIMEDWAIT + } +#endif // TESTING_POLLER_IDLEWAIT + }; #endif +class NoPoller { +public: + NoPoller(FibreClusterGeneric<NoPoller>&, size_t) {} + bool stopped() { return true; } + void stop() {} + void pause() {} + void resume() {} + bool wait(SystemLock&, size_t) { return false; } + void signal(size_t) {} + void finish() {} +}; + #endif /* _Poller_h_ */ diff --git a/src/libfibre/SystemProcessor.cc b/src/libfibre/SystemProcessor.cc index 2ba970e..3f6f031 100644 --- a/src/libfibre/SystemProcessor.cc +++ b/src/libfibre/SystemProcessor.cc @@ -16,6 +16,7 @@ ******************************************************************************/ #include "runtime/RuntimeImpl.h" #include "libfibre/SystemProcessor.h" +#include "libfibre/FibreCluster.h" #include <limits.h> // PTHREAD_STACK_MIN diff --git a/src/libfibre/SystemProcessor.h b/src/libfibre/SystemProcessor.h index 536199c..437bdce 100644 --- a/src/libfibre/SystemProcessor.h +++ b/src/libfibre/SystemProcessor.h @@ -17,15 +17,15 @@ #ifndef _SystemProcessor_h_ #define _SystemProcessor_h_ 1 -#include "runtime/Cluster.h" -#include "libfibre/Poller.h" - +#include <libfibre/lfbasics.h> #include <semaphore.h> +class BasePoller; +class Cluster; class Fibre; class SystemProcessor : public Context, public BaseProcessor { - pthread_t sysThread; + pthread_t sysThread; SystemSemaphore<true> idleSem; bool terminateAck; @@ -69,118 +69,4 @@ static inline SystemProcessor& CurrProcessor() { return *proc; } -class FibreCluster : public Cluster { - volatile bool paused; - SystemSemaphore<true> pauseWaitSem; - SystemSemaphore<true> pauseContSem; - -#if !TESTING_POLLER_FIBRES - size_t pollThreshold; - SystemCondition pollCond; -#endif - -public: - FibreCluster(size_t t) : paused(false), pauseContSem(1) -#if !TESTING_POLLER_FIBRES - , pollThreshold(t) -#endif - {} - -#if !TESTING_POLLER_FIBRES && !TESTING_POLLER_IDLETIMEDWAIT - ~FibreCluster() { - ScopedLock<SystemLock> al(idleLock); - pollThreshold = 0; - pollCond.signal(); - } -#endif - - size_t setProcessorIdle(BaseProcessor& proc, bool terminate) { - if (paused) { - pauseWaitSem.V(); - pauseContSem.P(); - pauseContSem.V(); - } - size_t c = Cluster::setProcessorIdle(proc, terminate); -#if !TESTING_POLLER_FIBRES && TESTING_POLLER_IDLEWAIT - if (c == pollThreshold) pollCond.signal(); -#endif - return c; - } - -#if !TESTING_POLLER_FIBRES - bool waitForPoll() { -#if TESTING_POLLER_IDLEWAIT - ScopedLock<SystemLock> al(idleLock); - if (idleCount < pollThreshold) { -#if TESTING_POLLER_IDLETIMEDWAIT - Time t; - SYSCALL(clock_gettime(CLOCK_REALTIME, &t)); - pollCond.wait(idleLock, t + Time(0,10000000)); // 10 ms = 10,000,000 ns; -#else - pollCond.wait(idleLock); -#endif // TESTING_POLLER_IDLETIMEDWAIT - return true; - } -#endif // TESTING_POLLER_IDLEWAIT - return false; - } -#endif // !TESTING_POLLER_FIBRES - - void resume() { - paused = false; - pauseContSem.V(); - } - - void pause() { - pauseContSem.P(); - paused = true; - idleLock.acquire(); - BaseProcessor* proc = idleList.front(); - while (proc != idleList.edge()) { - proc->wakeUp(); - proc = ProcessorList::next(*proc); - } - idleLock.release(); - size_t start = (&CurrProcessor().getCluster() == this) ? 1 : 0; - for (size_t i = start; i < procCount(); i += 1) pauseWaitSem.P(); - } -}; - -class PollerCluster : public FibreCluster { - Poller poller; -public: - PollerCluster(size_t t = 1) : FibreCluster(t), poller(*this) {} - ~PollerCluster() { -#if TESTING_POLLER_FIBRES - GENASSERT(poller.stopped() || procCount() > 0); -#endif - } - Poller& getPoller() { return poller; } - - void stopPoller() { -#if TESTING_POLLER_FIBRES - GENASSERT(!poller.stopped() && procCount() > 0); -#endif - poller.stop(); - } - - void pause() { - poller.pause(); - FibreCluster::pause(); - } - - void resume() { - FibreCluster::resume(); - poller.resume(); - } -}; - -static inline PollerCluster& CurrCluster() { - return reinterpret_cast<PollerCluster&>(CurrProcessor().getCluster()); -} - -static inline Poller& CurrPoller() { - return CurrCluster().getPoller(); -} - #endif /* _SystemProcessor_h_ */ diff --git a/src/libfibre/cfibre.cc b/src/libfibre/cfibre.cc index 1e031c4..737c9c4 100644 --- a/src/libfibre/cfibre.cc +++ b/src/libfibre/cfibre.cc @@ -24,10 +24,10 @@ struct _cfibre_mutex_t : public fibre_mutex_t {}; struct _cfibre_cond_t : public fibre_cond_t {}; struct _cfibre_rwlock_t : public fibre_rwlock_t {}; struct _cfibre_barrier_t : public fibre_barrier_t {}; -struct _cfibre_cluster_t : public PollerCluster {}; +struct _cfibre_cluster_t : public FibreCluster {}; struct _cfibre_sproc_t : public SystemProcessor { - _cfibre_sproc_t(PollerCluster& pc, funcvoid1_t func, ptr_t arg) : SystemProcessor(pc, func, arg) {} - _cfibre_sproc_t(PollerCluster& pc) : SystemProcessor(pc) {} + _cfibre_sproc_t(FibreCluster& c, funcvoid1_t func, ptr_t arg) : SystemProcessor(c, func, arg) {} + _cfibre_sproc_t(FibreCluster& c) : SystemProcessor(c) {} }; struct _cfibre_attr_t : public fibre_attr_t {}; @@ -82,12 +82,12 @@ extern "C" int cfibre_sproc_run(cfibre_sproc_t* sproc, void (*func)(void *), voi errno = EINVAL; return -1; } - *sproc = new _cfibre_sproc_t(*(PollerCluster*)(*sproc), func, arg); + *sproc = new _cfibre_sproc_t(*(FibreCluster*)(*sproc), func, arg); return 0; } extern "C" int cfibre_sproc_create(cfibre_sproc_t* sproc) { - *sproc = new _cfibre_sproc_t(*(PollerCluster*)(*sproc)); + *sproc = new _cfibre_sproc_t(*(FibreCluster*)(*sproc)); return 0; } @@ -138,7 +138,7 @@ extern "C" int cfibre_attr_setcluster(cfibre_attr_t *attr, cfibre_cluster_t clus } extern "C" int cfibre_attr_getcluster(const cfibre_attr_t *attr, cfibre_cluster_t *cluster) { - return fibre_attr_getcluster(*attr, (PollerCluster**)cluster); + return fibre_attr_getcluster(*attr, (FibreCluster**)cluster); } extern "C" int cfibre_create(cfibre_t *thread, const cfibre_attr_t *attr, void *(*start_routine) (void *), void *arg) { diff --git a/src/libfibre/echotest.cpp b/src/libfibre/echotest.cpp index e1d233a..5ad3abb 100644 --- a/src/libfibre/echotest.cpp +++ b/src/libfibre/echotest.cpp @@ -48,8 +48,8 @@ -----------------------------------------------------------------------------*/ static const int numaccept = 2; -PollerCluster* cluster1 = nullptr; -PollerCluster* cluster2 = nullptr; +FibreCluster* cluster1 = nullptr; +FibreCluster* cluster2 = nullptr; static bool server = false; static int numconn = numaccept; @@ -187,7 +187,7 @@ void servmain(sockaddr_in& addr) { Fibre a1(CurrCluster(), defaultStackSize, true); a1.run(servaccept, (void*)servFD); if (numaccept == 2) { - cluster2 = new PollerCluster; + cluster2 = new FibreCluster; SystemProcessor sp1(*cluster2); SystemProcessor sp2(*cluster2); Fibre a2(*cluster2, defaultStackSize, true); diff --git a/src/libfibre/fibre.h b/src/libfibre/fibre.h index 548c353..caa23d0 100644 --- a/src/libfibre/fibre.h +++ b/src/libfibre/fibre.h @@ -36,7 +36,7 @@ struct fibre_attr_t { size_t stackSize; bool detached; bool background; - PollerCluster* cluster; + FibreCluster* cluster; void init() { stackSize = defaultStackSize; detached = false; @@ -95,12 +95,12 @@ static inline int fibre_attr_getbackground(const fibre_attr_t *attr, int *backgr return 0; } -static inline int fibre_attr_setcluster(fibre_attr_t *attr, PollerCluster* cluster) { +static inline int fibre_attr_setcluster(fibre_attr_t *attr, FibreCluster* cluster) { attr->cluster = cluster; return 0; } -static inline int fibre_attr_getcluster(const fibre_attr_t *attr, PollerCluster* *cluster) { +static inline int fibre_attr_getcluster(const fibre_attr_t *attr, FibreCluster* *cluster) { *cluster = attr->cluster; return 0; } @@ -132,9 +132,9 @@ static inline int fibre_yield(void) { return 0; } -static inline int fibre_migrate(PollerCluster* pc) { - GENASSERT(pc); - Fibre::migrateSelf(*pc); +static inline int fibre_migrate(FibreCluster* c) { + GENASSERT(c); + Fibre::migrateSelf(*c); return 0; } diff --git a/src/libfibre/include/u++.h b/src/libfibre/include/u++.h index ddc7dbc..c5857b1 100644 --- a/src/libfibre/include/u++.h +++ b/src/libfibre/include/u++.h @@ -11,7 +11,7 @@ typedef uProcessor SystemProcessor; #define CurrProcessor uThisProcessor -typedef uCluster PollerCluster; +typedef uCluster FibreCluuster; #define CurrCluster uThisCluster _Task Fibre { diff --git a/src/libfibre/lfcore.cc b/src/libfibre/lfcore.cc index c10c0a3..7d1f2dd 100644 --- a/src/libfibre/lfcore.cc +++ b/src/libfibre/lfcore.cc @@ -16,6 +16,7 @@ ******************************************************************************/ #include "libfibre/Fibre.h" #include "libfibre/EventEngine.h" +#include "libfibre/FibreCluster.h" #include <execinfo.h> // see _lfAbort #include <cxxabi.h> // see _lfAbort @@ -33,7 +34,7 @@ GlobalStackList* _globalStackList = nullptr; // StackContext.h // make these pointers global static to enable gdb access static Fibre* _lfMainFibre = nullptr; static SystemProcessor* _lfMainProcessor = nullptr; -static PollerCluster* _lfMainCluster = nullptr; +static FibreCluster* _lfMainCluster = nullptr; #if TESTING_ENABLE_STATISTICS std::list<StatsObject*>* StatsObject::lst = nullptr; @@ -68,7 +69,7 @@ _Bootstrapper::_Bootstrapper() { _lfEventEngine = new EventEngine; _lfEventEngine->start(); // global pointer must be set at this point // create default cluster -> includes poller - _lfMainCluster = new PollerCluster; + _lfMainCluster = new FibreCluster; // create main SP _lfMainProcessor = new SystemProcessor(*_lfMainCluster, _friend<_Bootstrapper>()); // create main fibre and main SP's idle fibre using dedicated interface diff --git a/src/libfibre/webserver.cpp b/src/libfibre/webserver.cpp index 6bdd5ea..0b68542 100644 --- a/src/libfibre/webserver.cpp +++ b/src/libfibre/webserver.cpp @@ -357,10 +357,10 @@ int main(int argc, char** argv) { // set additional clusters and processors unsigned int clusterCount = (threadCount - 1) / clusterSize + 1; - PollerCluster** cluster = new PollerCluster*[clusterCount]; + FibreCluster** cluster = new FibreCluster*[clusterCount]; cluster[0] = &CurrCluster(); for (unsigned int c = 1; c < clusterCount; c += 1) { - cluster[c] = new PollerCluster; + cluster[c] = new FibreCluster; } SystemProcessor** sproc = new SystemProcessor*[threadCount]; sproc[0] = &CurrProcessor(); -- GitLab