Commit 0da35b7f authored by Martin Karsten's avatar Martin Karsten
Browse files

- streamline EventScope creation

  -> simplify OsProcessor
parent 4dfdd705
......@@ -44,11 +44,10 @@ public:
// special constructor and start routine for bootstrapping event scope
Cluster(EventScope& es, _friend<EventScope>, size_t p = 1) : Cluster(es, _friend<Cluster>(), p) {}
void startPoller(_friend<EventScope>) { for (size_t p = 0; p < pollCount; p += 1) pollVec[p].start(); }
void startPolling(_friend<EventScope>) { for (size_t p = 0; p < pollCount; p += 1) pollVec[p].start(); }
EventScope& getEventScope() { return scope; }
ClusterPoller& getPoller(size_t hint) { return pollVec[hint % pollCount]; }
size_t getPollerCount() { return pollCount; }
void pause() {
ringLock.acquire();
......
......@@ -61,11 +61,18 @@ class EventScope {
void init() {
stats = new ConnectionStats(this);
struct rlimit rl;
SYSCALL(getrlimit(RLIMIT_NOFILE, &rl)); // get hard limit for file descriptor count
SYSCALL(getrlimit(RLIMIT_NOFILE, &rl)); // get hard limit for file descriptor count
fdCount = rl.rlim_max + MasterPoller::extraTimerFD;
fdSyncVector = new SyncFD[fdCount]; // create vector of R/W sync points
fdSyncVector = new SyncFD[fdCount]; // create vector of R/W sync points
masterPoller = new MasterPoller(*this, fdCount - 1, _friend<EventScope>()); // start master poller & timer handling
mainCluster->startPoller(_friend<EventScope>()); // start main cluster's poller
mainCluster->startPolling(_friend<EventScope>()); // start main cluster's poller
}
static void split(void* This) {
#if __linux__
SYSCALL(unshare(CLONE_FILES));
#endif
reinterpret_cast<EventScope*>(This)->init();
}
public:
......@@ -73,9 +80,8 @@ public:
EventScope(size_t p = 1, void* cd = nullptr) : diskCluster(nullptr), clientData(cd) {
mainCluster = new Cluster(*this, _friend<EventScope>(), p); // delayed master poller start
mainProcessor = new OsProcessor(*mainCluster, *this, _friend<EventScope>());
// OsProcessor calls split(), which calls init()
mainProcessor->waitUntilRunning(); // wait for new pthread running
mainProcessor = new OsProcessor(*mainCluster, split, this, _friend<EventScope>());
mainProcessor->waitUntilRunning(); // wait for new pthread to finish initialization
}
EventScope(_friend<_Bootstrapper> fb, size_t p = 1) : diskCluster(nullptr), clientData(nullptr) {
mainCluster = new Cluster(*this, _friend<EventScope>(), p); // delayed master poller start
......@@ -89,13 +95,6 @@ public:
delete[] fdSyncVector;
}
static void split(EventScope* This, _friend<OsProcessor>) {
#if __linux__
SYSCALL(unshare(CLONE_FILES));
#endif
This->init();
}
Cluster& addDiskCluster() {
RASSERT0(!diskCluster);
diskCluster = new Cluster;
......
......@@ -106,7 +106,7 @@ public:
Fibre(OsProcessor &sp, _friend<OsProcessor>)
: StackContext(sp), stackSize(0) { initDebug(); }
// explicit final notifcation for idle loop or main loop (bootstrap) on pthread stack
// explicit final notification for idle loop or main loop (bootstrap) on pthread stack
void endDirect(_friend<OsProcessor>) {
done.post();
}
......
......@@ -21,7 +21,8 @@
#include <limits.h> // PTHREAD_STACK_MIN
inline void OsProcessor::setupContext(Cluster& cl) {
inline void OsProcessor::setupContext() {
Cluster& cl = reinterpret_cast<Cluster&>(scheduler);
Context::currProc = this;
Context::currCluster = &cl;
Context::currScope = &cl.getEventScope();
......@@ -35,73 +36,50 @@ inline void OsProcessor::setupContext(Cluster& cl) {
#endif
}
void OsProcessor::setupFakeContext(StackContext* sc, EventScope* es, _friend<PollerThread>) {
Context::currStack = sc;
Context::currProc = nullptr;
Context::currCluster = nullptr;
Context::currScope = es;
void OsProcessor::idleLoopStartFibre(OsProcessor* This) {
This->idleLoop();
}
template<typename T>
inline void OsProcessor::idleLoopCreateFibre(void (*initFunc)(T*, _friend<OsProcessor>), T* arg) {
setupContext(reinterpret_cast<Cluster&>(scheduler));
// idle loop takes over pthread stack - create fibre without stack
Context::currStack = idleStack = new Fibre(*this, _friend<OsProcessor>());
if (initFunc) initFunc(arg, _friend<OsProcessor>());
if (initFibre) yieldDirect(*initFibre);
running.post();
idleLoop();
reinterpret_cast<Fibre*>(idleStack)->endDirect(_friend<OsProcessor>());
}
void OsProcessor::idleLoopStartFibre(OsProcessor* sp) {
sp->running.post();
sp->idleLoop();
}
void* OsProcessor::idleLoopStartPthread(void* sp) {
OsProcessor* This = reinterpret_cast<OsProcessor*>(sp);
This->idleLoopCreateFibre();
return nullptr;
}
void* OsProcessor::idleLoopStartEventScope(void* sp) {
OsProcessor* This = reinterpret_cast<OsProcessor*>(sp);
This->idleLoopCreateFibre(EventScope::split, reinterpret_cast<EventScope*>(This->idleStack));
return nullptr;
}
inline void OsProcessor::startPthreadHelper(funcptr1_t idleLoopStarter) {
inline void OsProcessor::idleLoopCreatePthread(funcvoid1_t initFunc, ptr_t arg) {
scheduler.addProcessor(*this); // potentially blocking caller fibre
if (initFunc) {
initFibre = new Fibre(*this);
initFibre->setup((ptr_t)initFunc, arg);
}
pthread_attr_t attr;
SYSCALL(pthread_attr_init(&attr));
SYSCALL(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED));
#if __linux__ // FreeBSD jemalloc segfaults when trying to use minimum stack
SYSCALL(pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN));
#endif
SYSCALL(pthread_create(&sysThread, &attr, idleLoopStarter, this));
SYSCALL(pthread_create(&sysThread, &attr, (funcptr1_t)idleLoopStartPthread, this));
SYSCALL(pthread_attr_destroy(&attr));
}
ptr_t OsProcessor::idleLoopStartPthread(OsProcessor* This) {
This->setupContext();
// idle loop takes over pthread stack - create fibre without stack
Context::currStack = This->idleStack = new Fibre(*This, _friend<OsProcessor>());
if (This->initFibre) This->yieldDirect(*This->initFibre);
This->idleLoop();
reinterpret_cast<Fibre*>(This->idleStack)->endDirect(_friend<OsProcessor>());
return nullptr;
}
OsProcessor::OsProcessor(funcvoid1_t initFunc, ptr_t arg) : OsProcessor(::CurrCluster(), initFunc, arg) {}
OsProcessor::OsProcessor(Cluster& cl, funcvoid1_t initFunc, ptr_t arg) : BaseProcessor(cl), initFibre(nullptr) {
RASSERT0(&::CurrEventScope() == &cl.getEventScope());
if (initFunc) {
initFibre = new Fibre(*this);
initFibre->setup((ptr_t)initFunc, (ptr_t)arg);
}
startPthreadHelper(idleLoopStartPthread); // create pthread running idleLoop
idleLoopCreatePthread(initFunc, arg); // create pthread running idleLoop
}
OsProcessor::OsProcessor(Cluster& cl, EventScope& scope, _friend<EventScope>) : BaseProcessor(cl), initFibre(nullptr) {
idleStack = (StackContext*)&scope; // HACK
startPthreadHelper(idleLoopStartEventScope); // create pthread running idleLoop
OsProcessor::OsProcessor(Cluster& cl, funcvoid1_t initFunc, ptr_t arg, _friend<EventScope>) : BaseProcessor(cl), initFibre(nullptr) {
idleLoopCreatePthread(initFunc, arg); // create pthread running idleLoop
}
OsProcessor::OsProcessor(Cluster& cl, _friend<_Bootstrapper>) : BaseProcessor(cl), initFibre(nullptr) {
sysThread = pthread_self();
setupContext(cl);
setupContext();
idleStack = new Fibre(*this);
idleStack->setup((ptr_t)idleLoopStartFibre, this);
// main fibre takes over pthread stack - create fibre without stack
......@@ -109,3 +87,9 @@ OsProcessor::OsProcessor(Cluster& cl, _friend<_Bootstrapper>) : BaseProcessor(cl
Context::currStack = mainFibre;
scheduler.addProcessor(*this); // first processor -> should not block, but need currStack set for ringLock
}
void OsProcessor::waitUntilRunning() {
RASSERT0(initFibre);
delete initFibre;
initFibre = nullptr;
}
......@@ -30,7 +30,6 @@ typedef Barrier<InternalLock> FibreBarrier;
class OsProcessor : public Context, public BaseProcessor {
pthread_t sysThread;
SyncPoint<InternalLock> running;
Fibre* initFibre;
Fibre* maintenanceFibre;
Benaphore<OsSemaphore> haltNotify; // benaphore better for spinning
......@@ -39,36 +38,28 @@ class OsProcessor : public Context, public BaseProcessor {
PollerFibre* pollFibre;
#endif
inline void setupContext(Cluster& fc);
template<typename T = void>
inline void idleLoopCreateFibre(void (*initFunc)(T*, _friend<OsProcessor>) = nullptr, T* arg = nullptr);
static void idleLoopStartFibre(OsProcessor*);
static void* idleLoopStartPthread(void*);
static void* idleLoopStartEventScope(void*);
inline void startPthreadHelper(funcptr1_t idleLoopStarter);
inline void setupContext();
static void idleLoopStartFibre(OsProcessor* This);
inline void idleLoopCreatePthread(funcvoid1_t initFunc = nullptr, ptr_t arg = nullptr);
static ptr_t idleLoopStartPthread(OsProcessor* This);
public:
// regular constructors: create pthread and use for idle loop
OsProcessor(funcvoid1_t initFunc = nullptr, ptr_t arg = nullptr);
OsProcessor(Cluster& cluster, funcvoid1_t initFunc = nullptr, ptr_t arg = nullptr);
// dedicated constructor for event scope: pthread executes EventScope::split before idle
OsProcessor(Cluster& cluster, EventScope& scope, _friend<EventScope>);
// dedicated constructor for event scope: pthread executes initFunc before idle
OsProcessor(Cluster& cluster, funcvoid1_t initFunc, ptr_t arg, _friend<EventScope>);
// dedicated constructor for bootstrap: pthread becomes mainFibre
OsProcessor(Cluster& cluster, _friend<_Bootstrapper>);
~OsProcessor() { RABORT("Cannot delete OsProcessor"); }
// dedicated support routine to set up dummy context for poller pthreads
static void setupFakeContext(StackContext* sc, EventScope* es, _friend<PollerThread>);
void waitUntilRunning();
#if TESTING_PROCESSOR_POLLER
BasePoller& getPoller() { RASSERT0(pollFibre); return *pollFibre; }
#endif
pthread_t getSysID() { return sysThread; }
void waitUntilRunning() { running.wait(); }
StackContext* suspend() {
#if TESTING_HALT_SPIN
......
......@@ -56,7 +56,6 @@ inline void BasePoller::notifyAll(int evcnt) {
template<typename T>
inline void PollerThread::pollLoop(T& This) {
OsProcessor::setupFakeContext((StackContext*)&This, &This.eventScope, _friend<PollerThread>());
while (!This.pollTerminate) {
This.prePoll(_friend<PollerThread>());
This.stats->blocks.count();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment