...
 
Commits (4)
......@@ -71,7 +71,7 @@ mthreadtest: threadtest.cpp include/mordor.h
gwebserver: webserver.go
# go get -u github.com/valyala/fasthttp
# go get -u github.com/valyala/fasthttp/reuseport
# go get -u github.com/valyala/fasthttp/reuseport
# go build -gcflags='-l=4' -o $@ $<
go build -o $@ $<
......
......@@ -165,7 +165,7 @@ void servaccept() {
}
static void servaccept2() {
Context::CurrEventScope().registerFD<true,false,true,true>(servFD);
Context::CurrEventScope().registerFD<true,false,false,true>(servFD);
new OsProcessor[2];
servaccept();
}
......
......@@ -42,12 +42,11 @@ class EventScope {
struct SyncFD {
SyncSem RD;
SyncSem WR;
#if TESTING_LAZY_FD_REGISTRATION && __linux__
#if TESTING_LAZY_FD_REGISTRATION
FibreMutex regLock;
#endif
BasePoller* poller;
size_t status;
SyncFD() : poller(nullptr), status(0) {}
SyncFD() : status(0) {}
#endif
} *fdSyncVector;
int fdCount;
......@@ -133,14 +132,21 @@ public:
template<bool Input, bool Output, bool Lazy, bool Cluster>
void registerFD(int fd) {
static_assert(Input || Output, "must set Input or Output in registerFD()");
const size_t target = (Input ? BasePoller::Input : 0) | (Output ? BasePoller::Output : 0);
#if TESTING_LAZY_FD_REGISTRATION
if (Lazy) return;
#endif
#if TESTING_LAZY_FD_REGISTRATION
RASSERT0(fd >= 0 && fd < fdCount);
SyncFD& fdsync = fdSyncVector[fd];
const size_t target = (Input ? BasePoller::Input : 0) | (Output ? BasePoller::Output : 0);
if ((fdsync.status & target) == target) return; // outside of lock: faster, but double regs possible...
ScopedLock<FibreMutex> sl(fdsync.regLock);
bool change = fdsync.status; // already registered for polling?
fdsync.status |= target;
#endif
#if TESTING_PROCESSOR_POLLER
BasePoller& cp = Cluster ? Context::CurrCluster().getPoller(fd) : Context::CurrProcessor().getPoller();
#else
......@@ -148,40 +154,30 @@ public:
#endif
#if TESTING_LAZY_FD_REGISTRATION
#if __FreeBSD__ // can atomically check flags and set poller
size_t prev = __atomic_fetch_or(&fdsync.status, target, __ATOMIC_RELAXED);
if ((prev & target) == target) return;
const bool change = false;
BasePoller* pp = nullptr;
__atomic_compare_exchange_n(&fdsync.poller, &pp, &cp, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
#else // Linux: serialize concurrent registrations - EPOLL_CTL_ADD vs. _MOD
if ((fdsync.status & target) == target) return; // outside of lock: faster, but double regs possible...
ScopedLock<FibreMutex> sl(fdsync.regLock);
fdsync.status |= target;
bool change = fdsync.poller; // already registered for polling?
if (!fdsync.poller) fdsync.poller = &cp; // else set poller
#endif
#else // TESTING_LAZY_FD_REGISTRATION
RASSERT0(!fdsync.poller);
fdsync.status |= target;
const bool change = false;
fdsync.poller = &cp;
cp.setupFD(fd, fdsync.status, change); // add or modify poll settings
#else
cp.setupFD(fd, target, false);
#endif
fdsync.poller->setupFD(fd, fdsync.status, change); // add or modify poll settings
}
template<bool RemoveFromPollSet = false>
void deregisterFD(int fd) {
RASSERT0(fd >= 0 && fd < fdCount);
SyncFD& fdsync = fdSyncVector[fd];
#if TESTING_LAZY_FD_REGISTRATION && __linux__
ScopedLock<FibreMutex> sl(fdsync.regLock);
#endif
// if (fdsync.poller) fdsync.poller->resetFD(fd);
fdsync.poller = nullptr;
fdsync.status = 0;
RASSERT0(fdsync.RD.empty());
RASSERT0(fdsync.WR.empty());
#if TESTING_LAZY_FD_REGISTRATION
ScopedLock<FibreMutex> sl(fdsync.regLock);
fdsync.status = 0;
#endif
if (RemoveFromPollSet) { // only called from lfConnect
#if TESTING_PROCESSOR_POLLER
BasePoller& cp = Context::CurrProcessor().getPoller();
#else
BasePoller& cp = Context::CurrCluster().getPoller(fd);
#endif
cp.resetFD(fd);
}
}
void registerPollFD(int fd) {
......@@ -243,13 +239,13 @@ public:
}
template<bool Input>
static inline bool NBtest() {
static inline bool TestEAGAIN() {
int serrno = _SysErrno();
Context::CurrEventScope().stats->resets.count((int)(serrno == ECONNRESET));
#if __FreeBSD__
// workaround - suspect: https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=129169 - or similar?
return serrno == EAGAIN || (Input == false && serrno == ENOTCONN);
#else // __linux__
// if (serrno == ECONNRESET) Context::CurrEventScope().stats->resets.count();
return serrno == EAGAIN;
#endif
}
......@@ -257,19 +253,21 @@ public:
template<bool Input, bool Yield, typename T, class... Args>
T syncIO( T (*iofunc)(int, Args...), int fd, Args... a) {
RASSERT0(fd >= 0 && fd < fdCount);
T ret;
if (Yield) Fibre::yield();
ret = iofunc(fd, a...);
if (ret >= 0 || !NBtest<Input>()) return ret;
T ret = iofunc(fd, a...);
if (ret >= 0 || !TestEAGAIN<Input>()) return ret;
#if TESTING_LAZY_FD_REGISTRATION
registerFD<Input,!Input,false,false>(fd);
#endif
Fibre::yield();
SyncSem& sem = Input ? fdSyncVector[fd].RD : fdSyncVector[fd].WR;
for (;;) {
sem.P();
ret = iofunc(fd, a...);
if (ret >= 0 || !NBtest<Input>()) return ret;
if (ret >= 0 || !TestEAGAIN<Input>()) break;
}
sem.V();
return ret;
}
};
......@@ -339,11 +337,14 @@ inline int lfTryAccept(int fd, sockaddr *addr, socklen_t *addrlen, int flags = 0
inline int lfConnect(int fd, const sockaddr *addr, socklen_t addrlen) {
int ret = connect(fd, addr, addrlen);
if (ret >= 0) {
Context::CurrEventScope().registerFD<true,true,true,false>(fd); // register lazily
Context::CurrEventScope().stats->cliconn.count();
Context::CurrEventScope().registerFD<true,true,true,false>(fd);
} else if (_SysErrno() == EINPROGRESS) {
Context::CurrEventScope().registerFD<true,true,false,false>(fd);
Context::CurrEventScope().block<false>(fd);
Context::CurrEventScope().registerFD<true,true,false,false>(fd); // register immediately
Context::CurrEventScope().block<false>(fd); // wait for connect to complete
#if TESTING_LAZY_FD_REGISTRATION
Context::CurrEventScope().deregisterFD<true>(fd); // revert to lazy registration
#endif
socklen_t sz = sizeof(ret);
SYSCALL(getsockopt(fd, SOL_SOCKET, SO_ERROR, &ret, &sz));
RASSERT(ret == 0, ret);
......
......@@ -21,29 +21,31 @@
#include <limits.h> // PTHREAD_STACK_MIN
namespace Context {
static thread_local StackContext* volatile currStack = nullptr;
static thread_local BaseProcessor* volatile currProc = nullptr;
static thread_local Cluster* volatile currCluster = nullptr;
static thread_local EventScope* volatile currScope = nullptr;
StackContext* Context::CurrStack() { RASSERT0(currStack); return currStack; }
BaseProcessor& Context::CurrProcessor() { RASSERT0(currProc); return *currProc; }
Cluster& Context::CurrCluster() { RASSERT0(currCluster); return *currCluster; }
EventScope& Context::CurrEventScope() { RASSERT0(currScope); return *currScope; }
StackContext* CurrStack() { RASSERT0(currStack); return currStack; }
BaseProcessor& CurrProcessor() { RASSERT0(currProc); return *currProc; }
Cluster& CurrCluster() { RASSERT0(currCluster); return *currCluster; }
EventScope& CurrEventScope() { RASSERT0(currScope); return *currScope; }
void Context::setCurrStack(StackContext& s, _friend<StackContext>) { currStack = &s; }
void setCurrStack(StackContext& s, _friend<StackContext>) { currStack = &s; }
}
inline void OsProcessor::setupContext() {
Cluster& cl = reinterpret_cast<Cluster&>(scheduler);
currProc = this;
currCluster = &cl;
currScope = &cl.getEventScope();
Context::currProc = this;
Context::currCluster = &cl;
Context::currScope = &cl.getEventScope();
handoverStack = nullptr;
maintenanceFibre = new Fibre(*this);
maintenanceFibre->setPriority(TopPriority);
maintenanceFibre->run(Cluster::maintenance, &cl);
#if TESTING_PROCESSOR_POLLER
pollFibre = new PollerFibre(*currScope, *this, false);
pollFibre = new PollerFibre(*Context::currScope, *this, false);
pollFibre->start();
#endif
}
......@@ -71,7 +73,7 @@ inline void OsProcessor::idleLoopCreatePthread(funcvoid1_t initFunc, ptr_t arg)
ptr_t OsProcessor::idleLoopStartPthread(OsProcessor* This) {
This->setupContext();
// idle loop takes over pthread stack - create fibre without stack
currStack = This->idleStack = new Fibre(*This, _friend<OsProcessor>());
Context::currStack = This->idleStack = new Fibre(*This, _friend<OsProcessor>());
if (This->initFibre) This->yieldDirect(*This->initFibre);
This->idleLoop();
reinterpret_cast<Fibre*>(This->idleStack)->endDirect(_friend<OsProcessor>());
......@@ -98,13 +100,13 @@ OsProcessor::OsProcessor(Cluster& cl, _friend<_Bootstrapper>) : BaseProcessor(cl
idleStack = new Fibre(*this);
idleStack->setup((ptr_t)idleLoopStartFibre, this);
// main fibre takes over pthread stack - create fibre without stack
currStack = new Fibre(*this, _friend<OsProcessor>());
Context::currStack = new Fibre(*this, _friend<OsProcessor>());
scheduler.addProcessor(*this); // first processor -> should not block, but need currStack set for ringLock
}
void OsProcessor::setupFakeContext(StackContext* sc, EventScope* es, _friend<BaseThreadPoller>) {
currStack = sc;
currProc = nullptr;
currCluster = nullptr;
currScope = es;
Context::currStack = sc;
Context::currProc = nullptr;
Context::currCluster = nullptr;
Context::currScope = es;
}
......@@ -76,8 +76,7 @@ public:
class BaseSuspender {
protected:
void prepareSuspend(StackContext& cs) {
cs.prepareSuspend(_friend<BaseSuspender>());
void prepareSuspend() {
RuntimeDisablePreemption();
}
void doSuspend(StackContext& cs) {
......@@ -91,7 +90,7 @@ public:
StackContext& stack;
TimeoutInfo(StackContext& stack = *Context::CurrStack()) : stack(stack) {}
void suspendRelative(const Time& timeout) {
prepareSuspend(stack);
prepareSuspend();
prepareRelative(timeout);
doSuspend(stack);
}
......@@ -117,13 +116,13 @@ protected:
public:
BlockingInfo(Lock& l) : lock(l) {}
void suspend(StackContext& stack = *Context::CurrStack()) {
prepareSuspend(stack);
prepareSuspend();
lock.release();
doSuspend(stack);
}
void suspend(BlockedStackList& queue, StackContext& stack = *Context::CurrStack()) {
prepareSuspend(stack);
setupResumeRace(stack);
prepareSuspend();
queue.push_back(stack);
lock.release();
doSuspend(stack);
......@@ -137,8 +136,8 @@ class TimeoutBlockingInfo : public BlockingInfo<Lock>, public TimeoutInfo {
public:
TimeoutBlockingInfo(Lock& l, StackContext& stack = *Context::CurrStack()) : BaseBI(l), TimeoutInfo(stack), timedOut(false) {}
bool suspendAbsolute(BlockedStackList& queue, const Time& timeout, const Time& now) {
prepareSuspend(stack);
BaseBI::setupResumeRace(stack);
prepareSuspend();
queue.push_back(stack);
BaseTimer::prepareAbsolute(timeout, now);
BaseBI::lock.release();
......
......@@ -129,6 +129,10 @@ private:
public:
MCSLock() : tail(nullptr) {}
bool test() const { return tail != nullptr; }
bool tryAcquire(Node& n) {
n.next = nullptr;
return ((tail == nullptr) && _CAS(&tail, (Node*)nullptr, &n,__ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
}
void acquire(Node& n) {
n.next = nullptr;
Node* prev = __atomic_exchange_n(&tail, &n, __ATOMIC_SEQ_CST);
......
......@@ -19,8 +19,7 @@
#include "runtime-glue/RuntimeStack.h"
StackContext::StackContext(BaseProcessor& proc, bool aff)
: stackPointer(0), processor(&proc), priority(DefPriority), affinity(aff),
suspendState(Running), resumeInfo(nullptr) {
: stackPointer(0), processor(&proc), priority(DefPriority), affinity(aff), runState(1), resumeInfo(nullptr) {
#if TESTING_SHARED_READYQUEUE
affinity = true;
#endif
......@@ -71,12 +70,8 @@ void StackContext::postMigrate(StackContext* prevStack) {
// if resumption already triggered -> resume right away
void StackContext::postSuspend(StackContext* prevStack) {
CHECK_PREEMPTION(0);
SuspendState prevState = Prepared;
bool suspended = __atomic_compare_exchange_n( &prevStack->suspendState, &prevState, Suspended, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED );
if (!suspended) {
RASSERT(prevState == Running, FmtHex(prevStack), prevState);
prevStack->resumeInternal();
}
// check if previous stack is already resumed?
if (__atomic_sub_fetch( &prevStack->runState, 1, __ATOMIC_RELAXED ) > 0) prevStack->resumeInternal();
}
// destroy stack
......
......@@ -66,9 +66,8 @@ class StackContext : public DoubleLink<StackContext,StackLinkCount> {
size_t priority; // scheduling priority
bool affinity; // affinity prohibits re-staging
enum SuspendState { Running, Prepared, Suspended };
SuspendState volatile suspendState;
ResumeInfo* volatile resumeInfo; // race: unblock vs. timeout
size_t volatile runState; // runState == 0 => parked
ResumeInfo* volatile resumeInfo; // race: unblock vs. timeout
StackContext(const StackContext&) = delete;
const StackContext& operator=(const StackContext&) = delete;
......@@ -93,7 +92,7 @@ protected:
StackContext(BaseProcessor& proc, bool aff = false); // main constructor
StackContext(Scheduler&, bool bg = false); // uses delegation
~StackContext() {
RASSERT(suspendState == Running, FmtHex(this), suspendState);
RASSERT(runState == 1, FmtHex(this), runState);
RASSERT(resumeInfo == nullptr, FmtHex(this));
}
......@@ -131,28 +130,19 @@ public:
size_t spin = SpinStart;
while (spin <= SpinEnd) {
for (size_t i = 0; i < spin; i += 1) Pause();
if (suspendState == Running) return;
if (runState) return; // resumed already? skip suspend
spin += spin;
}
suspendInternal();
}
// Running -> Prepared; Prepared -> Suspended is attempted in postSuspend()
void prepareSuspend(_friend<BaseSuspender>) {
RASSERT(suspendState == Running, FmtHex(this), suspendState);
__atomic_store_n( &suspendState, Prepared, __ATOMIC_RELAXED );
}
// Prepared/Suspended -> Running; resume stack, if necessary
// if suspended (runState == 0), resume
void resume() {
SuspendState prevState = __atomic_exchange_n( &suspendState, Running, __ATOMIC_RELAXED );
RASSERT(prevState != Running, FmtHex(this), prevState);
if (prevState == Suspended) resumeInternal();
if (__atomic_fetch_add( &runState, 1, __ATOMIC_RELAXED ) == 0) resumeInternal();
}
// set ResumeInfo to facilitate later resume race
void setupResumeRace(ResumeInfo& ri, _friend<ResumeInfo>) {
RASSERT(suspendState == Prepared, FmtHex(this), suspendState);
__atomic_store_n( &resumeInfo, &ri, __ATOMIC_RELAXED );
}
......