-
Martin Karsten authoredMartin Karsten authored
Poller.cc 4.80 KiB
/******************************************************************************
Copyright 2017 Martin Karsten
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
******************************************************************************/
#include "libfibre/EventEngine.h"
#include "libfibre/Poller.h"
template<typename T>
inline void BasePoller::pollLoop(T& This, bool pollerFibre) {
#if TESTING_POLLER_FIBRES
if (pollerFibre) _lfEventEngine->registerPollFD(This.pollFD);
else
#else
GENASSERT(!pollerFibre);
#endif
SystemProcessor::setupFakeContext((StackContext*)&This, _friend<BasePoller>());
while (!This.pollTerminate) {
This.stats->blocks.count();
int evcnt = This.blockingPoll();
for (;;) { // drain all events with non-blocking epoll_wait
This.stats->events.add(evcnt);
ProcessorResumeSet procSet;
for (int e = 0; e < evcnt; e += 1) {
#if __FreeBSD__
struct kevent& ev = This.events[e];
if (ev.filter == EVFILT_READ || ev.filter == EVFILT_TIMER) {
_lfEventEngine->unblock<true>(ev.ident, procSet, _friend<BasePoller>());
} else if (ev.filter == EVFILT_WRITE) {
_lfEventEngine->unblock<false>(ev.ident, procSet, _friend<BasePoller>());
}
#else // __linux__ below
epoll_event& ev = This.events[e];
if (ev.events & (EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP)) {
_lfEventEngine->unblock<true>(ev.data.fd, procSet, _friend<BasePoller>());
}
if (ev.events & (EPOLLOUT | EPOLLRDHUP | EPOLLERR)) {
_lfEventEngine->unblock<false>(ev.data.fd, procSet, _friend<BasePoller>());
}
#endif
}
for (std::pair<VirtualProcessor* const,ResumeQueue>& p : procSet) {
p.first->bulkResume(p.second, _friend<BasePoller>());
}
if (evcnt < maxPoll) break;
#if __FreeBSD__
static const timespec ts = Time::zero();
evcnt = kevent(This.pollFD, nullptr, 0, This.events, maxPoll, &ts);
#else // __linux__ below
evcnt = epoll_wait(This.pollFD, This.events, maxPoll, 0);
#endif
if (evcnt < 0) { GENASSERT1(errno == EINTR, errno); } // gracefully handle EINTR
}
}
}
inline int PollerThread::blockingPoll() {
#if __FreeBSD__
int evcnt = kevent(pollFD, nullptr, 0, events, maxPoll, nullptr); // blocking
#else // __linux__ below
int evcnt = epoll_wait(pollFD, events, maxPoll, -1); // blocking
#endif
if (evcnt < 0) { GENASSERT1(errno == EINTR, errno); } // gracefully handle EINTR
if (paused) pauseSem.P();
return evcnt;
}
void* MasterPoller::pollLoopSetup(void* This) {
pollLoop(*reinterpret_cast<MasterPoller*>(This), false);
return nullptr;
}
inline int MasterPoller::blockingPoll() {
if (_lfEventEngine->tryblock<true>(timerFD)) {
#if __linux__
uint64_t count;
if (TRY_SYSCALLIO(read(timerFD, (void*)&count, sizeof(count)), EAGAIN) < 0) goto skipTimeout;
#endif
Time currTime;
SYSCALL(clock_gettime(CLOCK_REALTIME, &currTime));
defaultTimerQueue->checkExpiry(currTime);
}
skipTimeout:
return PollerThread::blockingPoll();
}
#if TESTING_POLLER_FIBRES
void Poller::pollLoopSetup(void* This) {
pollLoop(*reinterpret_cast<Poller*>(This), true);
}
Poller::Poller(FibreCluster& cluster) : BasePoller("PollerFibre") {
pollFibre = new Fibre(cluster, defaultStackSize, true);
pollFibre->setPriority(lowPriority);
pollFibre->run(pollLoopSetup, this);
}
void Poller::stop() {
pollTerminate = true; // set termination flag, then unblock -> terminate
_lfEventEngine->unblockPollFD(pollFD, _friend<Poller>());
delete pollFibre;
}
inline int Poller::blockingPoll() {
_lfEventEngine->blockPollFD(pollFD);
#if __FreeBSD__
static const timespec ts = Time::zero();
int evcnt = kevent(pollFD, nullptr, 0, events, maxPoll, &ts);
#else // __linux__ below
int evcnt = epoll_wait(pollFD, events, maxPoll, 0);
#endif
if (evcnt < 0) { GENASSERT1(errno == EINTR, errno); } // gracefully handle EINTR
if (paused) pauseSem.P();
return evcnt;
}
#else
void* Poller::pollLoopSetup(void* This) {
pollLoop(*reinterpret_cast<Poller*>(This), false);
return nullptr;
}
inline int Poller::blockingPoll() {
if (!cluster.waitForPoll()) usleep(100); // avoid trickle loop
return PollerThread::blockingPoll();
}
#endif // TESTING_POLLER_FIBRES