Commit 281653a5 authored by Martin Karsten's avatar Martin Karsten

- refactored libfibre PollerScheduler hierarchy (fixed bug w/o master poller)

parent baedff11
......@@ -57,7 +57,7 @@ class EventEngine {
// file operations are not considered blocking in terms of select/poll/epoll
// therefore, all file operations are executed on dedicated Scheduler/SP(s)
Scheduler ioScheduler;
FibreScheduler ioScheduler;
SystemProcessor ioSP;
int timerFD;
......@@ -72,7 +72,7 @@ public:
PollerThread masterPoller; // runs on ioScheduler, but with idleThreshold = 0
#endif
EventEngine() : fdSyncVector(0), ioSP(ioScheduler), timerTerminate(false) {
EventEngine() : fdSyncVector(0), ioScheduler(0), ioSP(ioScheduler), timerTerminate(false) {
struct rlimit rl;
SYSCALL(getrlimit(RLIMIT_NOFILE, &rl)); // get hard limit for file descriptor count
unsigned long fdcount = rl.rlim_max;
......
......@@ -77,7 +77,7 @@ public:
#endif
}
Fibre(FibreScheduler& sched, size_t sz = defaultStackSize, bool bg = true) : StackContext(sched, bg), stackSize(sz), stackPool(nullptr), done(SyncPoint<SystemLock>::Detached) {
Fibre(PollerScheduler& sched, size_t sz = defaultStackSize, bool bg = true) : StackContext(sched, bg), stackSize(sz), stackPool(nullptr), done(SyncPoint<SystemLock>::Detached) {
stackBottom = stackAlloc(stackSize);
StackContext::initStackPointer((vaddr)stackBottom + stackSize);
#if TESTING_ENABLE_DEBUGGING
......
......@@ -25,7 +25,7 @@
#endif
class Fibre;
class FibreScheduler;
class PollerScheduler;
class BasePoller {
protected:
......@@ -131,17 +131,17 @@ class Poller : public BasePoller {
inline void pollLoop();
static void pollLoopSetup(void*);
public:
Poller(FibreScheduler&);
Poller(PollerScheduler&);
~Poller() { if (!pollTerminate) stop(); }
void stop();
bool stopped() { return pollTerminate; }
};
#else
class Poller : public PollerThread {
FibreScheduler& sched;
PollerScheduler& sched;
static void* pollLoopSetup(void*);
public:
Poller(FibreScheduler& sched) : PollerThread(pollLoopSetup), sched(sched) {}
Poller(PollerScheduler& sched) : PollerThread(pollLoopSetup), sched(sched) {}
inline bool wait(_friend<PollerThread>);
};
#endif
......
......@@ -58,37 +58,52 @@ public:
}
};
#if TESTING_POLLER_FIBRES
class PollerThreadScheduler : public Scheduler {
public:
PollerThreadScheduler(size_t) {}
};
static inline SystemProcessor& CurrProcessor() {
SystemProcessor* proc = Context::self();
GENASSERT(proc);
return *proc;
}
#else
class FibreScheduler : public Scheduler {
volatile bool paused;
SystemSemaphore pauseWaitSem;
SystemSemaphore pauseContSem;
class PollerThreadScheduler : public Scheduler {
#if !TESTING_POLLER_FIBRES
size_t pollThreshold;
SystemCondition pollCond;
#endif
public:
PollerThreadScheduler(size_t t) : pollThreshold(t) {}
FibreScheduler(size_t t) : paused(false), pauseContSem(1)
#if !TESTING_POLLER_FIBRES
, pollThreshold(t)
#endif
{}
~PollerThreadScheduler() {
#if !TESTING_POLLER_IDLETIMEDWAIT
#if !TESTING_POLLER_FIBRES && !TESTING_POLLER_IDLETIMEDWAIT
~FibreScheduler() {
ScopedLock<SystemLock> al(idleLock);
pollThreshold = 0;
pollCond.signal();
#endif
}
#endif
#if !TESTING_POLLER_FIBRES
size_t addIdleProcessor(BaseProcessor& proc) {
if (paused) {
pauseWaitSem.V();
pauseContSem.P();
pauseContSem.V();
}
size_t c = Scheduler::addIdleProcessor(proc);
#if TESTING_POLLER_IDLEWAIT
if (c == pollThreshold) pollCond.signal();
#endif
return c;
}
#endif // !TESTING_POLLER_FIBRES
#if !TESTING_POLLER_FIBRES
bool waitForPoll() {
#if TESTING_POLLER_IDLEWAIT
idleLock.acquire();
......@@ -107,43 +122,7 @@ public:
#endif // TESTING_POLLER_IDLEWAIT
return false;
}
};
#endif // TESTING_POLLER_FIBRES
static inline FibreScheduler& CurrScheduler(); // forward declaration for pause() below
class FibreScheduler : public PollerThreadScheduler {
Poller poller;
volatile bool paused;
SystemSemaphore pauseWaitSem;
SystemSemaphore pauseContSem;
public:
FibreScheduler(size_t t = 1) : PollerThreadScheduler(t), poller(*this), paused(false), pauseContSem(1) {}
~FibreScheduler() {
#if TESTING_POLLER_FIBRES
GENASSERT(poller.stopped() || procCount() > 0);
#endif
}
Poller& getPoller() { return poller; }
void stopPoller() {
#if TESTING_POLLER_FIBRES
GENASSERT(!poller.stopped() && procCount() > 0);
#endif
poller.stop();
}
size_t addIdleProcessor(BaseProcessor& proc) {
if (paused) {
pauseWaitSem.V();
pauseContSem.P();
pauseContSem.V();
}
return PollerThreadScheduler::addIdleProcessor(proc);
}
#endif // !TESTING_POLLER_FIBRES
void resume() {
paused = false;
......@@ -160,19 +139,32 @@ public:
else break;
}
idleLock.release();
size_t start = (&CurrScheduler() == this) ? 1 : 0;
size_t start = (&CurrProcessor().getScheduler() == this) ? 1 : 0;
for (size_t i = start; i < procCount(); i += 1) pauseWaitSem.P();
}
};
static inline SystemProcessor& CurrProcessor() {
SystemProcessor* proc = Context::self();
GENASSERT(proc);
return *proc;
}
class PollerScheduler : public FibreScheduler {
Poller poller;
public:
PollerScheduler(size_t t = 1) : FibreScheduler(t), poller(*this) {}
~PollerScheduler() {
#if TESTING_POLLER_FIBRES
GENASSERT(poller.stopped() || procCount() > 0);
#endif
}
Poller& getPoller() { return poller; }
void stopPoller() {
#if TESTING_POLLER_FIBRES
GENASSERT(!poller.stopped() && procCount() > 0);
#endif
poller.stop();
}
};
static inline FibreScheduler& CurrScheduler() {
return reinterpret_cast<FibreScheduler&>(CurrProcessor().getScheduler());
static inline PollerScheduler& CurrScheduler() {
return reinterpret_cast<PollerScheduler&>(CurrProcessor().getScheduler());
}
static inline Poller& CurrPoller() {
......
......@@ -24,9 +24,9 @@ struct _cfibre_mutex_t : public fibre_mutex_t {};
struct _cfibre_cond_t : public fibre_cond_t {};
struct _cfibre_rwlock_t : public fibre_rwlock_t {};
struct _cfibre_barrier_t : public fibre_barrier_t {};
struct _cfibre_sched_t : public FibreScheduler {};
struct _cfibre_sched_t : public PollerScheduler {};
struct _cfibre_sproc_t : public SystemProcessor {
_cfibre_sproc_t(FibreScheduler& fs, funcvoid1_t func, ptr_t arg) : SystemProcessor(fs, func, arg) {}
_cfibre_sproc_t(PollerScheduler& fs, funcvoid1_t func, ptr_t arg) : SystemProcessor(fs, func, arg) {}
};
struct _cfibre_attr_t : public fibre_attr_t {};
......@@ -73,7 +73,7 @@ extern "C" int cfibre_sproc_prepare_sched(cfibre_sproc_t* sproc, cfibre_sched_t
}
extern "C" int cfibre_sproc_create(cfibre_sproc_t* sproc, void (*func)(void *), void *arg) {
*sproc = new _cfibre_sproc_t(*(FibreScheduler*)(*sproc), func, arg);
*sproc = new _cfibre_sproc_t(*(PollerScheduler*)(*sproc), func, arg);
return 0;
}
......@@ -124,7 +124,7 @@ extern "C" int cfibre_attr_setscheduler(cfibre_attr_t *attr, cfibre_sched_t sche
}
extern "C" int cfibre_attr_getscheduler(const cfibre_attr_t *attr, cfibre_sched_t *scheduler) {
return fibre_attr_getscheduler(*attr, (FibreScheduler**)scheduler);
return fibre_attr_getscheduler(*attr, (PollerScheduler**)scheduler);
}
extern "C" int cfibre_create(cfibre_t *thread, const cfibre_attr_t *attr, void *(*start_routine) (void *), void *arg) {
......
......@@ -48,8 +48,8 @@
-----------------------------------------------------------------------------*/
static const int numaccept = 2;
FibreScheduler* sched1 = nullptr;
FibreScheduler* sched2 = nullptr;
PollerScheduler* sched1 = nullptr;
PollerScheduler* sched2 = nullptr;
static bool server = false;
static int numconn = numaccept;
......@@ -189,7 +189,7 @@ void servmain(sockaddr_in& addr) {
Fibre a1(CurrScheduler());
a1.run(servaccept, (void*)servFD);
if (numaccept == 2) {
sched2 = new FibreScheduler;
sched2 = new PollerScheduler;
SystemProcessor sp1(*sched2);
SystemProcessor sp2(*sched2);
Fibre a2(*sched2);
......
......@@ -45,7 +45,7 @@ struct fibre_attr_t {
size_t stackSize;
bool detached;
bool background;
FibreScheduler* scheduler;
PollerScheduler* scheduler;
void init() {
stackSize = defaultStackSize;
detached = false;
......@@ -104,12 +104,12 @@ static inline int fibre_attr_getbackground(const fibre_attr_t *attr, int *backgr
return 0;
}
static inline int fibre_attr_setscheduler(fibre_attr_t *attr, FibreScheduler* scheduler) {
static inline int fibre_attr_setscheduler(fibre_attr_t *attr, PollerScheduler* scheduler) {
attr->scheduler = scheduler;
return 0;
}
static inline int fibre_attr_getscheduler(const fibre_attr_t *attr, FibreScheduler* *scheduler) {
static inline int fibre_attr_getscheduler(const fibre_attr_t *attr, PollerScheduler* *scheduler) {
*scheduler = attr->scheduler;
return 0;
}
......
......@@ -41,7 +41,7 @@ GlobalStackList* _globalStackList = nullptr; // StackContext.h
// make these pointers global static to enable gdb access
static Fibre* _lfMainFibre = nullptr;
static SystemProcessor* _lfMainProcessor = nullptr;
static FibreScheduler* _lfMainScheduler = nullptr;
static PollerScheduler* _lfMainScheduler = nullptr;
#if TESTING_ENABLE_STATISTICS
std::list<StatsObject*>* StatsObject::lst = nullptr;
......@@ -115,7 +115,7 @@ void* PollerThread::pollLoopSetup(void* This) {
}
#if TESTING_POLLER_FIBRES
Poller::Poller(FibreScheduler& sched) {
Poller::Poller(PollerScheduler& sched) {
pollFibre = new Fibre(sched, defaultStackSize, true);
pollFibre->run(pollLoopSetup, this);
}
......@@ -204,7 +204,7 @@ _Bootstrapper::_Bootstrapper() {
// start event demultiplexing
_lfEventEngine = new EventEngine;
// create default scheduler -> includes poller
_lfMainScheduler = new FibreScheduler;
_lfMainScheduler = new PollerScheduler;
// create main SP
_lfMainProcessor = new SystemProcessor(*_lfMainScheduler, _friend<_Bootstrapper>());
// create main fibre and main SP's idle fibre using dedicated interface
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment