From a1ee7c3269b051bf3ae16334c88a7379eeee32eb Mon Sep 17 00:00:00 2001
From: Martin Karsten <mkarsten@uwaterloo.ca>
Date: Mon, 2 Jul 2018 20:02:48 -0400
Subject: [PATCH] - libfibre: split lazy poll registration into Read vs. Write
 side - fix some FreeBSD idiosyncrasies

---
 src/libfibre/EventEngine.h      | 64 ++++++++++++++++++---------------
 src/libfibre/Poller.cc          |  2 ++
 src/libfibre/Poller.h           | 46 ++++++++++++++++++------
 src/libfibre/include/pthreads.h |  8 ++++-
 4 files changed, 81 insertions(+), 39 deletions(-)

diff --git a/src/libfibre/EventEngine.h b/src/libfibre/EventEngine.h
index b3b6157..dcdd7f6 100644
--- a/src/libfibre/EventEngine.h
+++ b/src/libfibre/EventEngine.h
@@ -42,11 +42,12 @@ class EventEngine {
   struct SyncRW {
     SyncIO RD;
     SyncIO WR;
-#if TESTING_LAZY_FD_REGISTRATION
-    bool registered;
-#endif
+    int status;
+    SyncRW() : status(0) {}
   } *fdSyncVector;
 
+  int fdcount;
+
   // file operations are not considered blocking in terms of select/poll/epoll
   // therefore, all file operations are executed on dedicated Cluster/SP(s)
   DiskCluster     diskCluster;
@@ -58,8 +59,7 @@ public:
   EventEngine() : fdSyncVector(0), diskCluster(0), diskProcessor(diskCluster) {
     struct rlimit rl;
     SYSCALL(getrlimit(RLIMIT_NOFILE, &rl));  // get hard limit for file descriptor count
-    unsigned long fdcount = rl.rlim_max;
-    fdcount = masterPoller.initTimerHandling(fdcount);
+    fdcount = masterPoller.initTimerHandling(rl.rlim_max);
     fdSyncVector = new SyncRW[fdcount];      // create vector of R and W sync points
   }
 
@@ -73,62 +73,67 @@ public:
 
   void setTimer(const Time& timeout) { masterPoller.setTimer(timeout); }
 
+  template<bool Input, bool Output, bool Lazy>
   void registerFD(int fd) {
 #if TESTING_LAZY_FD_REGISTRATION
-    fdSyncVector[fd].registered = false;
-#else
-    fdSyncVector[fd].RD.reset();
-    fdSyncVector[fd].WR.reset();
-    CurrPoller().registerFD(fd);
+    if (!Lazy) return;
 #endif
+    GENASSERT(fd >= 0 && fd < fdcount);
+    CurrPoller().registerFD<Input,Output>(fd, fdSyncVector[fd].status);
   }
 
-  void registerLazyFD(int fd) {
-#if TESTING_LAZY_FD_REGISTRATION
-    fdSyncVector[fd].registered = true;
+  void deregisterFD(int fd) {
+    GENASSERT(fd >= 0 && fd < fdcount);
+    fdSyncVector[fd].status = 0;
     fdSyncVector[fd].RD.reset();
     fdSyncVector[fd].WR.reset();
-    CurrPoller().registerFD(fd);
-#endif
   }
 
 #if TESTING_POLLER_FIBRES
   void registerPollFD(int fd) {
+    GENASSERT(fd >= 0 && fd < fdcount);
     masterPoller.setupPollFD<false>(fd); // set using ONESHOT to reduce polling
   }
   void blockPollFD(int fd) {
+    GENASSERT(fd >= 0 && fd < fdcount);
     masterPoller.setupPollFD<true>(fd);  // reset using ONESHOT to reduce polling
     fdSyncVector[fd].RD.sem.P();
   }
   void unblockPollFD(int fd, _friend<Poller>) {
+    GENASSERT(fd >= 0 && fd < fdcount);
     fdSyncVector[fd].RD.sem.V();
   }
 #endif
 
   void suspendFD(int fd) {
+    GENASSERT(fd >= 0 && fd < fdcount);
     fdSyncVector[fd].RD.sem.P_fake();
     fdSyncVector[fd].WR.sem.P_fake();
   }
 
   void resumeFD(int fd) {
+    GENASSERT(fd >= 0 && fd < fdcount);
     fdSyncVector[fd].RD.sem.V();
     fdSyncVector[fd].WR.sem.V();
   }
 
   template<bool Input>
   void block(int fd) {
+    GENASSERT(fd >= 0 && fd < fdcount);
     SyncIO& sync = Input ? fdSyncVector[fd].RD : fdSyncVector[fd].WR;
     sync.sem.P();
   }
 
   template<bool Input>
   bool tryblock(int fd) {
+    GENASSERT(fd >= 0 && fd < fdcount);
     SyncIO& sync = Input ? fdSyncVector[fd].RD : fdSyncVector[fd].WR;
     return sync.sem.tryP();
   }
 
   template<bool Input>
   void unblock(int fd, ProcessorResumeSet& procSet, _friend<BasePoller>) {
+    GENASSERT(fd >= 0 && fd < fdcount);
     SyncIO& sync = Input ? fdSyncVector[fd].RD : fdSyncVector[fd].WR;
 #if TESTING_BULK_RESUME
     sync.sem.V_bulk(procSet);
@@ -147,6 +152,7 @@ public:
 
   template<bool Input, bool Yield, bool Lock, typename T, class... Args>
   T syncIO( T (*iofunc)(int, Args...), int fd, Args... a) {
+    GENASSERT(fd >= 0 && fd < fdcount);
     SyncIO& sync = Input ? fdSyncVector[fd].RD : fdSyncVector[fd].WR;
     if (Yield) Fibre::yield();
     if (Lock) sync.lock.acquire();
@@ -157,10 +163,10 @@ public:
         return ret;
       }
 #if TESTING_LAZY_FD_REGISTRATION
-      if (!fdSyncVector[fd].registered) {
-        fdSyncVector[fd].registered = true;
-        CurrPoller().registerFD(fd);
-      }
+      bool registered;
+      if (Input) registered = (fdSyncVector[fd].status & BasePoller::Read);
+      else       registered = (fdSyncVector[fd].status & BasePoller::Write);
+      if (!registered) registerFD<Input,!Input,true>(fd);
 #endif
       sync.sem.P();
     }
@@ -187,10 +193,11 @@ T lfDirectIO( T (*diskfunc)(int, Args...), int fd, Args... a) {
   return _lfEventEngine->directIO(diskfunc, fd, a...);
 }
 
-// socket creation: do not register SOCK_STREAM yet (cf. listen, connect) -> mandatory for FreeBSD
+// socket creation: do not register SOCK_STREAM yet (cf. listen, connect) -> mandatory for FreeBSD!
 static inline int lfSocket(int domain, int type, int protocol) {
   int ret = socket(domain, type | SOCK_NONBLOCK, protocol);
-  if (ret >= 0 && type != SOCK_STREAM) _lfEventEngine->registerFD(ret);
+  if (type != SOCK_STREAM)
+  if (ret >= 0 && type != SOCK_STREAM) _lfEventEngine->registerFD<true,true,false>(ret);
   return ret;
 }
 
@@ -198,7 +205,7 @@ static inline int lfSocket(int domain, int type, int protocol) {
 static inline int lfBind(int fd, const sockaddr *addr, socklen_t addrlen) {
   int ret = bind(fd, addr, addrlen);
   if (ret < 0 && errno == EINPROGRESS) {
-    _lfEventEngine->registerLazyFD(fd);
+    _lfEventEngine->registerFD<true,false,true>(fd);
     _lfEventEngine->block<true>(fd);
     socklen_t sz = sizeof(ret);
     SYSCALL(getsockopt(fd, SOL_SOCKET, SO_ERROR, &ret, &sz));
@@ -209,30 +216,30 @@ static inline int lfBind(int fd, const sockaddr *addr, socklen_t addrlen) {
 // register SOCK_STREAM server fd only after 'listen' system call (cf. socket/connect)
 static inline int lfListen(int fd, int backlog) {
   int ret = listen(fd, backlog);
-  _lfEventEngine->registerFD(fd);
+  if (ret >= 0) _lfEventEngine->registerFD<true,false,false>(fd);
   return ret;
 }
 
 // nonblocking accept for accept draining: register new file descriptor for I/O events
 static inline int lfTryAccept(int fd, sockaddr *addr, socklen_t *addrlen, int flags = 0) {
   int ret = accept4(fd, addr, addrlen, flags | SOCK_NONBLOCK);
-  if (ret >= 0) _lfEventEngine->registerFD(ret);
+  if (ret >= 0) _lfEventEngine->registerFD<true,true,false>(ret);
   return ret;
 }
 
 // accept: register new file descriptor for I/O events, no yield before accept
 static inline int lfAccept(int fd, sockaddr *addr, socklen_t *addrlen, int flags = 0) {
   int ret = _lfEventEngine->syncIO<true,false,true>(accept4, fd, addr, addrlen, flags | SOCK_NONBLOCK);
-  if (ret >= 0) _lfEventEngine->registerFD(ret);
+  if (ret >= 0) _lfEventEngine->registerFD<true,true,false>(ret);
   return ret;
 }
 
 // see man 3 connect for EINPROGRESS; register SOCK_STREAM fd now (cf. socket/listen)
 static inline int lfConnect(int fd, const sockaddr *addr, socklen_t addrlen) {
-  _lfEventEngine->registerFD(fd);
+  _lfEventEngine->registerFD<true,true,false>(fd);
   int ret = connect(fd, addr, addrlen);
   if (ret < 0 && errno == EINPROGRESS) {
-    _lfEventEngine->registerLazyFD(fd);
+    _lfEventEngine->registerFD<false,true,true>(fd);
     _lfEventEngine->block<false>(fd);
     socklen_t sz = sizeof(ret);
     SYSCALL(getsockopt(fd, SOL_SOCKET, SO_ERROR, &ret, &sz));
@@ -243,11 +250,12 @@ static inline int lfConnect(int fd, const sockaddr *addr, socklen_t addrlen) {
 // dup: duplicate file descriptor -> not necessarily a good idea (on Linux?) - think twice about it!
 static inline int lfDup(int fd) {
   int ret = dup(fd);
-  if (ret >= 0) _lfEventEngine->registerFD(ret);
+  if (ret >= 0) _lfEventEngine->registerFD<true,true,false>(ret);
   return ret;
 }
 
 static inline int lfClose(int fd) {
+  _lfEventEngine->deregisterFD(fd);
   return close(fd);
 }
 
diff --git a/src/libfibre/Poller.cc b/src/libfibre/Poller.cc
index 4e1460a..0a25236 100644
--- a/src/libfibre/Poller.cc
+++ b/src/libfibre/Poller.cc
@@ -91,7 +91,9 @@ inline int MasterPoller::blockingPoll() {
     SYSCALL(clock_gettime(CLOCK_REALTIME, &currTime));
     defaultTimerQueue->checkExpiry(currTime);
   }
+#if __linux__
 skipTimeout:
+#endif
   return PollerThread::blockingPoll();
 }
 
diff --git a/src/libfibre/Poller.h b/src/libfibre/Poller.h
index 055ebc6..4daebd6 100644
--- a/src/libfibre/Poller.h
+++ b/src/libfibre/Poller.h
@@ -29,6 +29,10 @@ class Fibre;
 template<typename> class FibreClusterGeneric;
 
 class BasePoller {
+public: // RegistrationStatus
+  static const int Read  = 0x1;
+  static const int Write = 0x2;
+
 protected:
   static const int maxPoll = 1024;
 #if __FreeBSD__
@@ -60,23 +64,42 @@ public:
 
   void pause() { paused = true; }
 
-  template<bool ReadWrite = true>
-  void registerFD(int fd) {
+  template<bool Input, bool Output>
+  void registerFD(int fd, int& status) {
+    static_assert(Input || Output, "must set Read or Write in registerFD()");
 #if __FreeBSD__
     struct kevent ev[2];
-    EV_SET(&ev[0], fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, 0);
-    if (ReadWrite) EV_SET(&ev[1], fd, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, 0);
-    SYSCALL(kevent(pollFD, ev, ReadWrite ? 2 : 1, nullptr, 0, nullptr));
+    int idx = 0;
+    if (Input) {
+      status |= Read;
+      EV_SET(&ev[idx], fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, 0);
+      idx += 1;
+    }
+    if (Output) {
+      status |= Write;
+      EV_SET(&ev[idx], fd, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, 0);
+      idx += 1;
+    }
+    SYSCALL(kevent(pollFD, ev, idx, nullptr, 0, nullptr));
 #else // __linux__ below
+    bool update = status;
+    if (Input) status |= Read;
+    if (Output) status |= Write;
     epoll_event ev;
+    ev.events = EPOLLET;
     // EPOLLERR, EPOLLHUP not actually needed?
-    ev.events = EPOLLIN | EPOLLRDHUP | EPOLLPRI | EPOLLERR | EPOLLHUP | EPOLLET;
-    if (ReadWrite) ev.events |= EPOLLOUT;
+    if (status & Read) {
+      ev.events |= EPOLLIN | EPOLLRDHUP | EPOLLPRI | EPOLLERR | EPOLLHUP;
+    }
+    if (status & Write) {
+      ev.events |= EPOLLOUT;
+    }
     ev.data.fd = fd;
-    SYSCALL(epoll_ctl(pollFD, EPOLL_CTL_ADD, fd, &ev));
+    SYSCALL(epoll_ctl(pollFD, update ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev));
 #endif
   }
 
+#if 0 // unused
   void deregisterFD(int fd) {
 #if __FreeBSD__
     struct kevent ev[2];
@@ -87,6 +110,7 @@ public:
     SYSCALL(epoll_ctl(pollFD, EPOLL_CTL_DEL, fd, nullptr));
 #endif
   }
+#endif
 };
 
 class PollerThread : public BasePoller {
@@ -112,8 +136,9 @@ public:
     SYSCALL(kevent(pollFD, &ev, 1, nullptr, 0, nullptr));
     SYSCALL(pthread_join(pollThread, nullptr));
 #else // __linux__ below
+    int dummy = 0;
     int efd = SYSCALLIO(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK));
-    registerFD(efd);
+    registerFD<true,false>(efd, dummy);
     uint64_t val = 1;
     val = SYSCALL_EQ(write(efd, &val, sizeof(val)), sizeof(val));
     SYSCALL(pthread_join(pollThread, nullptr));
@@ -140,8 +165,9 @@ public:
     timerFD = fd;
     fd += 1;
 #else
+    int dummy = 0;
     timerFD = SYSCALLIO(timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC));
-    registerFD(timerFD);
+    registerFD<true,false>(timerFD, dummy);
 #endif
     return fd;
   }
diff --git a/src/libfibre/include/pthreads.h b/src/libfibre/include/pthreads.h
index b45c8ba..6d69c08 100644
--- a/src/libfibre/include/pthreads.h
+++ b/src/libfibre/include/pthreads.h
@@ -22,7 +22,13 @@ public:
     return pthread_setaffinity_np(tid, cpusetsize, cpuset);
   }
 
-  static void yield() { SYSCALL(pthread_yield()); }
+  static void yield() {
+#if __FreeBSD__
+    pthread_yield();
+#else
+    SYSCALL(pthread_yield());
+#endif
+  }
 };
 
 class FibreMutex {
-- 
GitLab