Skip to content
Snippets Groups Projects
Poller.cc 4.80 KiB
/******************************************************************************
    Copyright  2017 Martin Karsten

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
******************************************************************************/
#include "libfibre/EventEngine.h"
#include "libfibre/Poller.h"

template<typename T>
inline void BasePoller::pollLoop(T& This, bool pollerFibre) {
#if TESTING_POLLER_FIBRES
  if (pollerFibre) _lfEventEngine->registerPollFD(This.pollFD);
  else
#else
    GENASSERT(!pollerFibre);
#endif
    SystemProcessor::setupFakeContext((StackContext*)&This, _friend<BasePoller>());
  while (!This.pollTerminate) {
    This.stats->blocks.count();
    int evcnt = This.blockingPoll();
    for (;;) { // drain all events with non-blocking epoll_wait
      This.stats->events.add(evcnt);
      ProcessorResumeSet procSet;
      for (int e = 0; e < evcnt; e += 1) {
#if __FreeBSD__
        struct kevent& ev = This.events[e];
        if (ev.filter == EVFILT_READ || ev.filter == EVFILT_TIMER) {
          _lfEventEngine->unblock<true>(ev.ident, procSet, _friend<BasePoller>());
        } else if (ev.filter == EVFILT_WRITE) {
          _lfEventEngine->unblock<false>(ev.ident, procSet, _friend<BasePoller>());
        }
#else // __linux__ below
        epoll_event& ev = This.events[e];
        if (ev.events & (EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP)) {
          _lfEventEngine->unblock<true>(ev.data.fd, procSet, _friend<BasePoller>());
        }
        if (ev.events & (EPOLLOUT | EPOLLRDHUP | EPOLLERR)) {
          _lfEventEngine->unblock<false>(ev.data.fd, procSet, _friend<BasePoller>());
        }
#endif
      }
      for (std::pair<VirtualProcessor* const,ResumeQueue>& p : procSet) {
        p.first->bulkResume(p.second, _friend<BasePoller>());
      }
      if (evcnt < maxPoll) break;
#if __FreeBSD__
      static const timespec ts = Time::zero();
      evcnt = kevent(This.pollFD, nullptr, 0, This.events, maxPoll, &ts);
#else // __linux__ below
      evcnt = epoll_wait(This.pollFD, This.events, maxPoll, 0);
#endif
      if (evcnt < 0) { GENASSERT1(errno == EINTR, errno); } // gracefully handle EINTR
    }
  }
}

inline int PollerThread::blockingPoll() {
#if __FreeBSD__
  int evcnt =  kevent(pollFD, nullptr, 0, events, maxPoll, nullptr); // blocking
#else // __linux__ below
  int evcnt =  epoll_wait(pollFD, events, maxPoll, -1);              // blocking
#endif
  if (evcnt < 0) { GENASSERT1(errno == EINTR, errno); } // gracefully handle EINTR
  if (paused) pauseSem.P();
  return evcnt;
}

void* MasterPoller::pollLoopSetup(void* This) {
  pollLoop(*reinterpret_cast<MasterPoller*>(This), false);
  return nullptr;
}

inline int MasterPoller::blockingPoll() {
  if (_lfEventEngine->tryblock<true>(timerFD)) {
#if __linux__
    uint64_t count;
    if (TRY_SYSCALLIO(read(timerFD, (void*)&count, sizeof(count)), EAGAIN) < 0) goto skipTimeout;
#endif
    Time currTime;
    SYSCALL(clock_gettime(CLOCK_REALTIME, &currTime));
    defaultTimerQueue->checkExpiry(currTime);
  }
skipTimeout:
  return PollerThread::blockingPoll();
}

#if TESTING_POLLER_FIBRES

void Poller::pollLoopSetup(void* This) {
  pollLoop(*reinterpret_cast<Poller*>(This), true);
}

Poller::Poller(FibreCluster& cluster) : BasePoller("PollerFibre") {
  pollFibre = new Fibre(cluster, defaultStackSize, true);
  pollFibre->setPriority(lowPriority);
  pollFibre->run(pollLoopSetup, this);
}

void Poller::stop() {
  pollTerminate = true; // set termination flag, then unblock -> terminate
  _lfEventEngine->unblockPollFD(pollFD, _friend<Poller>());
  delete pollFibre;
}

inline int Poller::blockingPoll() {
  _lfEventEngine->blockPollFD(pollFD);
#if __FreeBSD__
  static const timespec ts = Time::zero();
  int evcnt = kevent(pollFD, nullptr, 0, events, maxPoll, &ts);
#else // __linux__ below
  int evcnt =  epoll_wait(pollFD, events, maxPoll, 0);
#endif
  if (evcnt < 0) { GENASSERT1(errno == EINTR, errno); } // gracefully handle EINTR
  if (paused) pauseSem.P();
  return evcnt;
}

#else

void* Poller::pollLoopSetup(void* This) {
  pollLoop(*reinterpret_cast<Poller*>(This), false);
  return nullptr;
}

inline int Poller::blockingPoll() {
  if (!cluster.waitForPoll()) usleep(100);  // avoid trickle loop
  return PollerThread::blockingPoll();
}
#endif // TESTING_POLLER_FIBRES