Commit df8c8f8a authored by Martin Karsten's avatar Martin Karsten

- streamline (lazy) event registration

parent 0024a419
......@@ -42,12 +42,11 @@ class EventScope {
struct SyncFD {
SyncSem RD;
SyncSem WR;
#if TESTING_LAZY_FD_REGISTRATION && __linux__
#if TESTING_LAZY_FD_REGISTRATION
FibreMutex regLock;
#endif
BasePoller* poller;
size_t status;
SyncFD() : poller(nullptr), status(0) {}
SyncFD() : status(0) {}
#endif
} *fdSyncVector;
int fdCount;
......@@ -130,64 +129,55 @@ public:
if (timerQueue.checkExpiry(currTime, newTime)) setTimer(newTime);
}
template<bool Cluster>
BasePoller* CP(int fd) {
#if TESTING_PROCESSOR_POLLER
BasePoller& cp = Cluster ? Context::CurrCluster().getPoller(fd) : Context::CurrProcessor().getPoller();
#else
BasePoller& cp = Context::CurrCluster().getPoller(fd);
#endif
return &cp;
}
template<bool Input, bool Output, bool Lazy, bool Cluster>
void registerFD(int fd) {
static_assert(Input || Output, "must set Input or Output in registerFD()");
const size_t target = (Input ? BasePoller::Input : 0) | (Output ? BasePoller::Output : 0);
#if TESTING_LAZY_FD_REGISTRATION
if (Lazy) return;
#endif
#if TESTING_LAZY_FD_REGISTRATION
RASSERT0(fd >= 0 && fd < fdCount);
SyncFD& fdsync = fdSyncVector[fd];
const size_t target = (Input ? BasePoller::Input : 0) | (Output ? BasePoller::Output : 0);
#if TESTING_LAZY_FD_REGISTRATION
#if __FreeBSD__ // can atomically check flags and set poller
size_t prev = __atomic_fetch_or(&fdsync.status, target, __ATOMIC_RELAXED);
if ((prev & target) == target) return;
const bool change = false;
BasePoller* pp = nullptr;
__atomic_compare_exchange_n(&fdsync.poller, &pp, CP<Cluster>(fd), false, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
#else // Linux: serialize concurrent registrations - EPOLL_CTL_ADD vs. _MOD
if ((fdsync.status & target) == target) return; // outside of lock: faster, but double regs possible...
if ((fdsync.status & target) == target) return; // outside of lock: faster, but double regs possible...
ScopedLock<FibreMutex> sl(fdsync.regLock);
bool change = fdsync.status; // already registered for polling?
fdsync.status |= target;
bool change = fdsync.poller; // already registered for polling?
if (!fdsync.poller) fdsync.poller = CP<Cluster>(fd); // else set poller
#endif
#else // TESTING_LAZY_FD_REGISTRATION
RASSERT0(!fdsync.poller);
fdsync.status |= target;
const bool change = false;
fdsync.poller = CP<Cluster>(fd);
#if TESTING_PROCESSOR_POLLER
BasePoller& cp = Cluster ? Context::CurrCluster().getPoller(fd) : Context::CurrProcessor().getPoller();
#else
BasePoller& cp = Context::CurrCluster().getPoller(fd);
#endif
fdsync.poller->setupFD(fd, fdsync.status, change); // add or modify poll settings
#if TESTING_LAZY_FD_REGISTRATION
cp.setupFD(fd, fdsync.status, change); // add or modify poll settings
#else
cp.setupFD(fd, target, false);
#endif
}
template<bool RemoveFromPollSet = false>
void deregisterFD(int fd) {
RASSERT0(fd >= 0 && fd < fdCount);
SyncFD& fdsync = fdSyncVector[fd];
#if TESTING_LAZY_FD_REGISTRATION && __linux__
ScopedLock<FibreMutex> sl(fdsync.regLock);
#endif
if (RemoveFromPollSet && fdsync.poller) fdsync.poller->resetFD(fd);
fdsync.poller = nullptr;
fdsync.status = 0;
RASSERT0(fdsync.RD.empty());
RASSERT0(fdsync.WR.empty());
#if TESTING_LAZY_FD_REGISTRATION
ScopedLock<FibreMutex> sl(fdsync.regLock);
fdsync.status = 0;
#endif
if (RemoveFromPollSet) { // only called from lfConnect
#if TESTING_PROCESSOR_POLLER
BasePoller& cp = Context::CurrProcessor().getPoller();
#else
BasePoller& cp = Context::CurrCluster().getPoller(fd);
#endif
cp.resetFD(fd);
}
}
void registerPollFD(int fd) {
......@@ -251,11 +241,11 @@ public:
template<bool Input>
static inline bool TestEAGAIN() {
int serrno = _SysErrno();
Context::CurrEventScope().stats->resets.count((int)(serrno == ECONNRESET));
#if __FreeBSD__
// workaround - suspect: https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=129169 - or similar?
return serrno == EAGAIN || (Input == false && serrno == ENOTCONN);
#else // __linux__
// if (serrno == ECONNRESET) Context::CurrEventScope().stats->resets.count();
return serrno == EAGAIN;
#endif
}
......@@ -269,12 +259,15 @@ public:
#if TESTING_LAZY_FD_REGISTRATION
registerFD<Input,!Input,false,false>(fd);
#endif
Fibre::yield();
SyncSem& sem = Input ? fdSyncVector[fd].RD : fdSyncVector[fd].WR;
for (;;) {
sem.P();
ret = iofunc(fd, a...);
if (ret >= 0 || !TestEAGAIN<Input>()) return ret;
if (ret >= 0 || !TestEAGAIN<Input>()) break;
}
sem.V();
return ret;
}
};
......@@ -344,15 +337,13 @@ inline int lfTryAccept(int fd, sockaddr *addr, socklen_t *addrlen, int flags = 0
inline int lfConnect(int fd, const sockaddr *addr, socklen_t addrlen) {
int ret = connect(fd, addr, addrlen);
if (ret >= 0) {
Context::CurrEventScope().registerFD<true,true,true,false>(fd); // register lazily
Context::CurrEventScope().stats->cliconn.count();
Context::CurrEventScope().registerFD<true,true,true,false>(fd);
} else if (_SysErrno() == EINPROGRESS) {
Context::CurrEventScope().registerFD<false,true,false,false>(fd);
Context::CurrEventScope().block<false>(fd);
Context::CurrEventScope().registerFD<true,true,false,false>(fd); // register immediately
Context::CurrEventScope().block<false>(fd); // wait for connect to complete
#if TESTING_LAZY_FD_REGISTRATION
Context::CurrEventScope().deregisterFD<true>(fd); // revert to lazy registration
#else
Context::CurrEventScope().registerFD<true,false,false,false>(fd); // upgrade to R/W registration
#endif
socklen_t sz = sizeof(ret);
SYSCALL(getsockopt(fd, SOL_SOCKET, SO_ERROR, &ret, &sz));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment