diff --git a/src/generic/IntrusiveContainers.h b/src/generic/IntrusiveContainers.h index 551de88804a94c7fac8ad9687b4d2b775f0efcab..3748e8c0d8344fb49de3de8b989aefc5fb1dddaf 100644 --- a/src/generic/IntrusiveContainers.h +++ b/src/generic/IntrusiveContainers.h @@ -356,7 +356,7 @@ public: typedef LT Link; private: - T* head; + T* volatile head; T* tail; public: @@ -429,7 +429,7 @@ private: if (Blocking) { // BLOCKING: Link* expected = &stub; // check if tail also points at stub -> empty? Link* xchg = (Link*)(uintptr_t(expected) | 1); // if yes, mark queue empty - bool empty = __atomic_compare_exchange_n(&tail, &expected, xchg, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED); + bool empty = __atomic_compare_exchange_n((Link**)&tail, &expected, xchg, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED); if (empty) return false; // queue is empty and is marked now if (uintptr_t(expected) & 1) return false; // queue is empty and was marked before } else { // NONBLOCKING: @@ -450,7 +450,7 @@ public: bool push(Link& first, Link& last) { last.link[NUM].next = nullptr; - Link* prev = __atomic_exchange_n(&tail, &last, __ATOMIC_SEQ_CST); // swing tail to last of new element(s) + Link* prev = __atomic_exchange_n((Link**)&tail, &last, __ATOMIC_SEQ_CST); // swing tail to last of new element(s) bool empty = false; if (Blocking) { // BLOCKING: empty = uintptr_t(prev) & 1; // check empty marking diff --git a/src/libfibre/EventScope.h b/src/libfibre/EventScope.h index 5925303a1f91dbc6a019d38779eaa4c2d7016ea8..e8e886165ec921d7dec44cb356cf64c341dd3ce1 100644 --- a/src/libfibre/EventScope.h +++ b/src/libfibre/EventScope.h @@ -57,13 +57,13 @@ class EventScope { SystemProcessor* mainProcessor; Fibre* mainFibre; - static void* create(void* arg) { - funcvoid0_t cloneFunc = (funcvoid0_t)arg; + static void* createScope(void* arg) { + funcvoid0_t scopeFunc = (funcvoid0_t)arg; #if __linux__ SYSCALL(unshare(CLONE_FILES)); #endif EventScope* es = new EventScope(); - cloneFunc(); + scopeFunc(); delete es; return nullptr; } @@ -100,9 +100,9 @@ public: typedef pthread_t Handle; - static Handle clone(funcvoid0_t cloneFunc) { + static Handle clone(funcvoid0_t scopeFunc) { Handle scopeThread; - SYSCALL(pthread_create(&scopeThread, nullptr, create, (ptr_t)cloneFunc)); + SYSCALL(pthread_create(&scopeThread, nullptr, createScope, (ptr_t)scopeFunc)); return scopeThread; } diff --git a/src/libfibre/Poller.cc b/src/libfibre/Poller.cc index 1d508a632f43a0e248277bcb1eb1cab5dddd7a2f..1c67cfe3b1b2fb383f624fa0373fe64b4c4a780f 100644 --- a/src/libfibre/Poller.cc +++ b/src/libfibre/Poller.cc @@ -121,18 +121,23 @@ void PollerFibre::stop() { } inline int PollerFibre::blockingPoll() { - stats->blocks.count(); - eventScope.blockPollFD(pollFD); - stats->polls.count(); + Fibre::yield(); + for(;;) { + stats->polls.count(); #if __FreeBSD__ - static const timespec ts = Time::zero(); - int evcnt = kevent(pollFD, nullptr, 0, events, maxPoll, &ts); + static const timespec ts = Time::zero(); + int evcnt = kevent(pollFD, nullptr, 0, events, maxPoll, &ts); #else // __linux__ below - int evcnt = epoll_wait(pollFD, events, maxPoll, 0); + int evcnt = epoll_wait(pollFD, events, maxPoll, 0); #endif - if (evcnt < 0) { GENASSERT1(errno == EINTR, errno); } // gracefully handle EINTR - if (paused) pauseSem.P(); - return evcnt; + if (evcnt < 0) { GENASSERT1(errno == EINTR, errno); } // gracefully handle EINTR + if (evcnt > 0) { + if (paused) pauseSem.P(); + return evcnt; + } + stats->blocks.count(); + eventScope.blockPollFD(pollFD); + } } #else diff --git a/src/libfibre/webserver.cpp b/src/libfibre/webserver.cpp index 74237fc8b5c740e9afc059f9a67c9cc6d60706b6..8707f129471aaa1988624c3e6de1eb7798d6b802 100644 --- a/src/libfibre/webserver.cpp +++ b/src/libfibre/webserver.cpp @@ -59,6 +59,7 @@ static void _SYSCALLabort() { abort(); } #include "syscall_macro.h" #define SYSCALL(call) SYSCALL_CMP(call,==,0,0) #define SYSCALLIO(call) SYSCALL_CMP(call,>=,0,0) +#define TRY_SYSCALL(call,code) SYSCALL_CMP(call,==,0,code) #endif /* SYSCALL */ #include VARIANT @@ -67,7 +68,7 @@ static void _SYSCALLabort() { abort(); } #include "uSocket.h" #else #define lfSocket socket -#define lfBind bind +#define lfBind ::bind #define lfListen listen #define lfAccept accept #define lfClose close @@ -124,7 +125,7 @@ static volatile size_t connectionFibres = 0; static void exitHandler(int sig) { if (sig == SIGINT) cout << endl; - cout << "threads: " << threadCount << " cluster size: " << clusterSize << " listeners: " << listenerCount; + cout << "threads: " << threadCount << " cluster size: " << clusterSize << " listeners: " << listenerCount << " event scopes: " << scopeCount; if (affinityFlag) cout << " affinity"; cout << endl << "fibres: " << connectionFibres << endl; exit(0); @@ -280,7 +281,7 @@ closeAndOut: #if defined __U_CPLUSPLUS__ delete (uSocketAccept*)connFD; #else - SYSCALL(lfClose((uintptr_t)connFD)); + TRY_SYSCALL(lfClose((uintptr_t)connFD),ECONNRESET); #endif return false; }