From 757bce4f44473df080e33b09c51fbe9ec830e21f Mon Sep 17 00:00:00 2001 From: Martin Karsten <mkarsten@uwaterloo.ca> Date: Sun, 1 Jul 2018 11:20:47 -0400 Subject: [PATCH] - tune polling loop and add separate poll signalling lock - re-introduce lfClose and cfibre_close - use FreeBSD FD registration scheme - clerical update in Fibre - remove race condition in debug support --- src/libfibre/EventEngine.h | 19 ++++++++----------- src/libfibre/Fibre.h | 16 +++++++++++----- src/libfibre/FibreCluster.h | 7 +------ src/libfibre/Poller.cc | 2 +- src/libfibre/Poller.h | 17 +++++++++++------ src/libfibre/cfibre.cc | 4 ++++ src/libfibre/cfibre.h | 1 + src/libfibre/echotest.cpp | 6 +++--- src/libfibre/webserver.cpp | 17 +++++++++-------- 9 files changed, 49 insertions(+), 40 deletions(-) diff --git a/src/libfibre/EventEngine.h b/src/libfibre/EventEngine.h index bf09f0a..b3b6157 100644 --- a/src/libfibre/EventEngine.h +++ b/src/libfibre/EventEngine.h @@ -187,13 +187,10 @@ T lfDirectIO( T (*diskfunc)(int, Args...), int fd, Args... a) { return _lfEventEngine->directIO(diskfunc, fd, a...); } -// socket creation: on FreeBSD, do not register SOCK_STREAM yet (cf. listen, connect) +// socket creation: do not register SOCK_STREAM yet (cf. listen, connect) -> mandatory for FreeBSD static inline int lfSocket(int domain, int type, int protocol) { int ret = socket(domain, type | SOCK_NONBLOCK, protocol); -#if __FreeBSD__ - if (type != SOCK_STREAM) -#endif - if (ret >= 0) _lfEventEngine->registerFD(ret); + if (ret >= 0 && type != SOCK_STREAM) _lfEventEngine->registerFD(ret); return ret; } @@ -209,12 +206,10 @@ static inline int lfBind(int fd, const sockaddr *addr, socklen_t addrlen) { return ret; } -// FreeBSD: can register SOCK_STREAM server fd only after 'listen' system call (cf. socket/connect) +// register SOCK_STREAM server fd only after 'listen' system call (cf. socket/connect) static inline int lfListen(int fd, int backlog) { int ret = listen(fd, backlog); -#if __FreeBSD__ _lfEventEngine->registerFD(fd); -#endif return ret; } @@ -232,11 +227,9 @@ static inline int lfAccept(int fd, sockaddr *addr, socklen_t *addrlen, int flags return ret; } -// see man 3 connect for EINPROGRESS; FreeBSD: register SOCK_STREAM fd now (cf. socket/listen) +// see man 3 connect for EINPROGRESS; register SOCK_STREAM fd now (cf. socket/listen) static inline int lfConnect(int fd, const sockaddr *addr, socklen_t addrlen) { -#if __FreeBSD__ _lfEventEngine->registerFD(fd); -#endif int ret = connect(fd, addr, addrlen); if (ret < 0 && errno == EINPROGRESS) { _lfEventEngine->registerLazyFD(fd); @@ -254,4 +247,8 @@ static inline int lfDup(int fd) { return ret; } +static inline int lfClose(int fd) { + return close(fd); +} + #endif /* _EventEngine_h_ */ diff --git a/src/libfibre/Fibre.h b/src/libfibre/Fibre.h index 3484825..75b426a 100644 --- a/src/libfibre/Fibre.h +++ b/src/libfibre/Fibre.h @@ -72,6 +72,13 @@ class Fibre : public StackContext { #endif } + void clearDebug() { +#if TESTING_ENABLE_DEBUGGING + ScopedLock<SystemLock> sl(*_globalStackLock); + GlobalStackList::remove(*this); +#endif + } + Fibre* runInternal(ptr_t func, ptr_t p1, ptr_t p2, ptr_t p3) { StackContext::start(func, p1, p2, p3); return this; @@ -95,16 +102,15 @@ public: : StackContext(sp), stackSize(0) { initDebug(); } // synchronize at object destruction - ~Fibre() { join(); } + ~Fibre() { + join(); + clearDebug(); + } void join() { done.wait(); } void detach() { done.detach(); } // callback from StackContext via Runtime after terminal context switch void destroy(_friend<Runtime>) { -#if TESTING_ENABLE_DEBUGGING - ScopedLock<SystemLock> sl(*_globalStackLock); - GlobalStackList::remove(*this); -#endif stackFree(); done.post(); } diff --git a/src/libfibre/FibreCluster.h b/src/libfibre/FibreCluster.h index c9807a5..2739d6b 100644 --- a/src/libfibre/FibreCluster.h +++ b/src/libfibre/FibreCluster.h @@ -87,12 +87,7 @@ public: #if !TESTING_POLLER_FIBRES bool waitForPoll() { -#if TESTING_POLLER_IDLEWAIT - ScopedLock<SystemLock> al(idleLock); - return poller.wait(idleLock, idleCount); -#else - return false; -#endif + return poller.wait(idleCount); } #endif }; diff --git a/src/libfibre/Poller.cc b/src/libfibre/Poller.cc index 50d77e3..4e1460a 100644 --- a/src/libfibre/Poller.cc +++ b/src/libfibre/Poller.cc @@ -134,7 +134,7 @@ void* Poller::pollLoopSetup(void* This) { } inline int Poller::blockingPoll() { - if (!cluster.waitForPoll()) usleep(100); // avoid trickle loop + cluster.waitForPoll(); return PollerThread::blockingPoll(); } diff --git a/src/libfibre/Poller.h b/src/libfibre/Poller.h index fb01678..055ebc6 100644 --- a/src/libfibre/Poller.h +++ b/src/libfibre/Poller.h @@ -195,8 +195,9 @@ class Poller : public PollerThread { FibreClusterGeneric<Poller>& cluster; #if TESTING_POLLER_IDLEWAIT - size_t pollThreshold; - SystemCondition pollCond; + size_t pollThreshold; + SystemLock pollLock; + SystemCondition pollCond; #endif static void* pollLoopSetup(void*); @@ -212,19 +213,21 @@ public: inline int blockingPoll(); #if TESTING_POLLER_IDLEWAIT - bool wait(SystemLock& lock, size_t count) { + bool wait(size_t count) { + ScopedLock<SystemLock> al(pollLock); if (count >= pollThreshold) return false; #if TESTING_POLLER_IDLETIMEDWAIT Time t; SYSCALL(clock_gettime(CLOCK_REALTIME, &t)); - pollCond.wait(lock, t + Time(0,10000000)); // 10 ms = 10,000,000 ns; + pollCond.wait(pollLock, t + Time(0,1000000)); // 1 ms = 1,000,000 ns; #else - pollCond.wait(lock); + pollCond.wait(pollLock); #endif // TESTING_POLLER_IDLETIMEDWAIT return true; } void signal(size_t c) { + ScopedLock<SystemLock> al(pollLock); if (c == pollThreshold) pollCond.signal(); } @@ -234,6 +237,8 @@ public: pollCond.signal(); #endif // !TESTING_POLLER_IDLETIMEDWAIT } +#else + bool wait(size_t) { return false; } #endif // TESTING_POLLER_IDLEWAIT }; @@ -247,7 +252,7 @@ public: void stop() {} void pause() {} void resume() {} - bool wait(SystemLock&, size_t) { return false; } + bool wait(size_t) { return false; } void signal(size_t) {} void finish() {} }; diff --git a/src/libfibre/cfibre.cc b/src/libfibre/cfibre.cc index 737c9c4..d0f5b2e 100644 --- a/src/libfibre/cfibre.cc +++ b/src/libfibre/cfibre.cc @@ -374,6 +374,10 @@ extern "C" int cfibre_dup(int fd) { return lfDup(fd); } +extern "C" int cfibre_close(int fd) { + return lfClose(fd); +} + extern "C" ssize_t cfibre_send(int socket, const void *buffer, size_t length, int flags) { return lfOutput(send, socket, buffer, length, flags); } diff --git a/src/libfibre/cfibre.h b/src/libfibre/cfibre.h index 9efea93..2d69184 100644 --- a/src/libfibre/cfibre.h +++ b/src/libfibre/cfibre.h @@ -139,6 +139,7 @@ int cfibre_accept(int socket, struct sockaddr *restrict address, socklen_t *rest int cfibre_accept4(int socket, struct sockaddr *restrict address, socklen_t *restrict address_len, int flags); int cfibre_connect(int socket, const struct sockaddr *address, socklen_t address_len); int cfibre_dup(int fildes); +int cfibre_close(int fildes); ssize_t cfibre_send(int socket, const void *buffer, size_t length, int flags); ssize_t cfibre_sendto(int socket, const void *message, size_t length, int flags, const struct sockaddr *dest_addr, socklen_t dest_len); ssize_t cfibre_sendmsg(int socket, const struct msghdr *message, int flags); diff --git a/src/libfibre/echotest.cpp b/src/libfibre/echotest.cpp index 5ad3abb..6a83116 100644 --- a/src/libfibre/echotest.cpp +++ b/src/libfibre/echotest.cpp @@ -132,7 +132,7 @@ finish: // std::cout << ntohs(client.sin_port) << " client closed" << std::endl; // close socket and terminate fibre - SYSCALL(close(fd)); + SYSCALL(lfClose(fd)); } void servaccept(void* arg) { @@ -215,7 +215,7 @@ void servmain(sockaddr_in& addr) { } // close server socket - SYSCALL(close(servFD)); + SYSCALL(lfClose(servFD)); } void clientconn(void* arg) { @@ -252,7 +252,7 @@ void clientconn(void* arg) { if (buf == code) __atomic_add_fetch(&clntcount, 1, __ATOMIC_RELAXED); // close socket and terminate fibre - SYSCALL(close(fd)); + SYSCALL(lfClose(fd)); } void clientmain(sockaddr_in& addr) { diff --git a/src/libfibre/webserver.cpp b/src/libfibre/webserver.cpp index 032a909..04a5193 100644 --- a/src/libfibre/webserver.cpp +++ b/src/libfibre/webserver.cpp @@ -44,9 +44,9 @@ typedef cpuset_t cpu_set_t; #define __LIBFIBRE__ #include "libfibre/Fibre.h" #include "libfibre/EventEngine.h" -typedef Mutex<SystemLock> FibreMutex; -typedef Condition<FibreMutex> FibreCondition; -typedef Barrier<SystemLock> FibreBarrier; +typedef Mutex<SystemLock> FibreMutex; +typedef Condition<FibreMutex> FibreCondition; +typedef Barrier<SystemLock> FibreBarrier; #else /* VARIANT */ @@ -67,9 +67,10 @@ static void _SYSCALLabort() { abort(); } #include "uSocket.h" #else #define lfSocket socket -#define lfBind bind +#define lfBind bind #define lfListen listen #define lfAccept accept +#define lfClose close template<typename T, class... Args> T lfInput( T (*readfunc)(int, Args...), int fd, Args... a) { return readfunc(fd, a...); @@ -266,7 +267,7 @@ closeAndOut: #if defined __U_CPLUSPLUS__ delete (uSocketAccept*)connFD; #else - close((uintptr_t)connFD); + SYSCALL(lfClose((uintptr_t)connFD)); #endif return false; } @@ -334,7 +335,7 @@ static void acceptor(void* arg) { #if defined __U_CPLUSPLUS__ if (!arg) delete servFD; #else - if ((intptr_t)arg < 0) SYSCALL(close(servFD)); + if ((intptr_t)arg < 0) SYSCALL(lfClose(servFD)); #endif } @@ -360,7 +361,7 @@ static void acceptor_loop(void* arg) { #if defined __U_CPLUSPLUS__ if (!arg) delete servFD; #else - if ((intptr_t)arg < 0) SYSCALL(close(servFD)); + if ((intptr_t)arg < 0) SYSCALL(lfClose(servFD)); #endif } @@ -484,7 +485,7 @@ int main(int argc, char** argv) { #if defined __U_CPLUSPLUS__ delete servFD; #else - SYSCALL(close(servFD)); + SYSCALL(lfClose(servFD)); #endif } -- GitLab