...
 
Commits (2)
......@@ -38,7 +38,10 @@
partitioned kernel file descriptor tables on Linux.
*/
class EventScope {
typedef FifoSemaphore<InternalLock,true> SyncSem;
struct SyncSem {
FibreMutex mtx;
Semaphore<InternalLock,true> sem;
};
struct SyncFD {
SyncSem RD;
SyncSem WR;
......@@ -130,20 +133,20 @@ public:
}
template<bool Input, bool Output, bool Lazy, bool Cluster>
void registerFD(int fd) {
bool registerFD(int fd) {
static_assert(Input || Output, "must set Input or Output in registerFD()");
const size_t target = (Input ? BasePoller::Input : 0) | (Output ? BasePoller::Output : 0);
#if TESTING_LAZY_FD_REGISTRATION
if (Lazy) return;
if (Lazy) return false;
#endif
#if TESTING_LAZY_FD_REGISTRATION
RASSERT0(fd >= 0 && fd < fdCount);
SyncFD& fdsync = fdSyncVector[fd];
if ((fdsync.status & target) == target) return; // outside of lock: faster, but double regs possible...
if ((fdsync.status & target) == target) return false; // outside of lock: faster, but double regs possible...
ScopedLock<FibreMutex> sl(fdsync.lock);
bool change = fdsync.status; // already registered for polling?
bool change = fdsync.status; // already registered for polling?
fdsync.status |= target;
#endif
......@@ -154,18 +157,19 @@ public:
#endif
#if TESTING_LAZY_FD_REGISTRATION
cp.setupFD(fd, fdsync.status, change); // add or modify poll settings
cp.setupFD(fd, fdsync.status, change); // add or modify poll settings
#else
cp.setupFD(fd, target, false);
#endif
return true;
}
template<bool RemoveFromPollSet = false>
void deregisterFD(int fd) {
RASSERT0(fd >= 0 && fd < fdCount);
SyncFD& fdsync = fdSyncVector[fd];
RASSERT0(fdsync.RD.empty());
RASSERT0(fdsync.WR.empty());
RASSERT0(fdsync.RD.sem.empty());
RASSERT0(fdsync.WR.sem.empty());
#if TESTING_LAZY_FD_REGISTRATION
ScopedLock<FibreMutex> sl(fdsync.lock);
fdsync.status = 0;
......@@ -188,45 +192,47 @@ public:
void blockPollFD(int fd) {
RASSERT0(fd >= 0 && fd < fdCount);
masterPoller->setupPollFD(fd, true); // reset using ONESHOT to reduce polling
fdSyncVector[fd].RD.P();
ScopedLock<FibreMutex> sl(fdSyncVector[fd].RD.mtx);
fdSyncVector[fd].RD.sem.P();
}
void unblockPollFD(int fd, _friend<PollerFibre>) {
RASSERT0(fd >= 0 && fd < fdCount);
fdSyncVector[fd].RD.V();
fdSyncVector[fd].RD.sem.V();
}
void suspendFD(int fd) {
RASSERT0(fd >= 0 && fd < fdCount);
fdSyncVector[fd].RD.P_fake();
fdSyncVector[fd].WR.P_fake();
fdSyncVector[fd].RD.sem.P_fake();
fdSyncVector[fd].WR.sem.P_fake();
}
void resumeFD(int fd) {
RASSERT0(fd >= 0 && fd < fdCount);
fdSyncVector[fd].RD.V();
fdSyncVector[fd].WR.V();
fdSyncVector[fd].RD.sem.V();
fdSyncVector[fd].WR.sem.V();
}
template<bool Input>
void block(int fd) {
RASSERT0(fd >= 0 && fd < fdCount);
SyncSem& sem = Input ? fdSyncVector[fd].RD : fdSyncVector[fd].WR;
sem.P();
SyncSem& sync = Input ? fdSyncVector[fd].RD : fdSyncVector[fd].WR;
ScopedLock<FibreMutex> sl(sync.mtx);
sync.sem.P();
}
template<bool Input>
bool tryblock(int fd) {
RASSERT0(fd >= 0 && fd < fdCount);
SyncSem& sem = Input ? fdSyncVector[fd].RD : fdSyncVector[fd].WR;
return sem.tryP();
SyncSem& sync = Input ? fdSyncVector[fd].RD : fdSyncVector[fd].WR;
return sync.sem.tryP();
}
template<bool Input, bool Enqueue = true>
StackContext* unblock(int fd, _friend<BasePoller>) {
RASSERT0(fd >= 0 && fd < fdCount);
SyncSem& sem = Input ? fdSyncVector[fd].RD : fdSyncVector[fd].WR;
return sem.V<Enqueue>();
SyncSem& sync = Input ? fdSyncVector[fd].RD : fdSyncVector[fd].WR;
return sync.sem.V<Enqueue>();
}
template<typename T, class... Args>
......@@ -257,16 +263,19 @@ public:
T ret = iofunc(fd, a...);
if (ret >= 0 || !TestEAGAIN<Input>()) return ret;
#if TESTING_LAZY_FD_REGISTRATION
registerFD<Input,!Input,false,false>(fd);
if (registerFD<Input,!Input,false,false>(fd)) {
Fibre::yield();
T ret = iofunc(fd, a...);
if (ret >= 0 || !TestEAGAIN<Input>()) return ret;
}
#endif
SyncSem& sem = Input ? fdSyncVector[fd].RD : fdSyncVector[fd].WR;
sem.P_yield();
SyncSem& sync = Input ? fdSyncVector[fd].RD : fdSyncVector[fd].WR;
ScopedLock<FibreMutex> sl(sync.mtx);
for (;;) {
sync.sem.P();
ret = iofunc(fd, a...);
if (ret >= 0 || !TestEAGAIN<Input>()) break;
sem.P();
}
sem.V();
return ret;
}
};
......
......@@ -21,11 +21,11 @@
#include "runtime/BaseProcessor.h"
#include "runtime/BlockingSync.h"
typedef FifoSemaphore<InternalLock> FibreSemaphore;
typedef Mutex<InternalLock> FibreMutex;
typedef Condition<FibreMutex> FibreCondition;
typedef LockRW<InternalLock> FibreLockRW;
typedef Barrier<InternalLock> FibreBarrier;
typedef Semaphore<InternalLock> FibreSemaphore;
typedef Mutex<InternalLock> FibreMutex;
typedef Condition<FibreMutex> FibreCondition;
typedef LockRW<InternalLock> FibreLockRW;
typedef Barrier<InternalLock> FibreBarrier;
class _Bootstrapper;
class BaseThreadPoller;
......
......@@ -44,9 +44,20 @@ public:
return (c >= 1) && __atomic_compare_exchange_n(&counter, &c, c-1, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);
}
template<typename Lock2>
bool P_unlock(Lock2& l) {
if (__atomic_sub_fetch(&counter, 1, __ATOMIC_SEQ_CST) < 0) {
l.release();
sem.P();
}
return true;
}
void V() {
if (__atomic_add_fetch(&counter, 1, __ATOMIC_SEQ_CST) < 1) sem.V();
}
void release() { return V(); }
};
#endif /* _Benaphore_h_ */
......@@ -267,26 +267,26 @@ public:
};
template<typename Lock, bool Binary = false, typename BQ = BlockingQueue>
class FifoSemaphore {
class Semaphore {
protected:
Lock lock;
ssize_t counter;
BQ bq;
template<bool Yield = false, typename... Args>
bool internalP(const Args&... args) {
// need baton passing: counter unchanged, if blocking fails (timeout)
template<typename... Args>
bool internalP(bool yield, const Args&... args) {
// baton passing: counter unchanged, if blocking fails (timeout)
if (counter < 1) return bq.block(lock, args...);
counter -= 1;
lock.release();
if (Yield) StackContext::yield();
if (yield) StackContext::yield();
return true;
}
public:
explicit FifoSemaphore(ssize_t c = 0) : counter(c) {}
explicit Semaphore(ssize_t c = 0) : counter(c) {}
// baton passing requires serialization at destruction
~FifoSemaphore() { ScopedLock<Lock> sl(lock); }
~Semaphore() { ScopedLock<Lock> sl(lock); }
bool empty() { return bq.empty(); }
bool open() { return counter >= 1; }
ssize_t getValue() { return counter; }
......@@ -298,19 +298,19 @@ public:
}
template<typename... Args>
bool P(const Args&... args) { lock.acquire(); return internalP(args...); }
bool P(const Args&... args) { lock.acquire(); return internalP(false, args...); }
bool tryP() { return P(false); }
template<typename Lock2>
bool P_unlock(Lock2& l) {
lock.acquire();
l.release();
return internalP();
return internalP(false);
}
bool P_yield() {
lock.acquire();
return internalP<true>();
return internalP(true);
}
void P_fake(size_t c = 1) {
......@@ -328,49 +328,12 @@ public:
else counter += 1;
return nullptr;
}
};
template<typename Lock, bool OwnerLock = false, typename BQ = BlockingQueue>
class FifoMutex {
Lock lock;
StackContext* owner;
BQ bq;
protected:
template<typename... Args>
bool internalAcquire(const Args&... args) {
StackContext* cs = Context::CurrStack();
lock.acquire();
if (!owner) {
owner = cs;
} else if (owner == cs) {
RASSERT(OwnerLock, FmtHex(owner));
} else {
return bq.block(lock, args...);
}
lock.release();
return true;
}
public:
FifoMutex() : owner(nullptr) {}
// baton passing requires serialization at destruction
~FifoMutex() { ScopedLock<Lock> sl(lock); }
bool test() const { return owner != nullptr; }
template<typename... Args>
bool acquire(const Args&... args) { return internalAcquire(args...); }
bool tryAcquire() { return acquire(false); }
void release() {
ScopedLock<Lock> al(lock);
RASSERT(owner == Context::CurrStack(), FmtHex(owner));
owner = bq.unblock();
}
void release() { return V(); }
};
template<typename Lock, bool OwnerLock = false, typename BQ = BlockingQueue>
class BargingMutex {
template<typename Lock, bool Fifo = true, bool OwnerLock = false, typename BQ = BlockingQueue>
class SimpleMutex {
Lock lock;
StackContext* owner;
BQ bq;
......@@ -379,11 +342,13 @@ protected:
template<typename... Args>
bool internalAcquire(const Args&... args) {
StackContext* cs = Context::CurrStack();
if (OwnerLock && cs == owner) return true;
RASSERT(cs != owner, FmtHex(cs), FmtHex(owner));
for (;;) {
lock.acquire();
if (!owner) break;
if (owner == cs) { RASSERT(OwnerLock, FmtHex(owner)); break; }
if (!bq.block(lock, args...)) return false;
if (!bq.block(lock, args...)) return false; // timeout
if (Fifo) return true; // owner set via baton passing
}
owner = cs;
lock.release();
......@@ -391,7 +356,9 @@ protected:
}
public:
BargingMutex() : owner(nullptr) {}
SimpleMutex() : owner(nullptr) {}
// baton passing requires serialization at destruction
~SimpleMutex() { if (!Fifo) return; ScopedLock<Lock> sl(lock); }
bool test() const { return owner != nullptr; }
template<typename... Args>
......@@ -401,8 +368,12 @@ public:
void release() {
ScopedLock<Lock> al(lock);
RASSERT(owner == Context::CurrStack(), FmtHex(owner));
owner = nullptr;
bq.unblock();
if (Fifo) {
owner = bq.unblock();
} else {
owner = nullptr;
bq.unblock();
}
}
};
......@@ -471,13 +442,13 @@ public:
template<typename Lock>
#if TESTING_MUTEX_FIFO
class Mutex : public FifoMutex<Lock> {};
class Mutex : public SimpleMutex<Lock> {};
#elif TESTING_MUTEX_BARGING
class Mutex : public BargingMutex<Lock> {};
class Mutex : public SimpleMutex<Lock, false> {};
#elif TESTING_MUTEX_SPIN
class Mutex : public SpinMutex<FifoSemaphore<Lock, true>, false, 4, 1024, 16> {};
class Mutex : public SpinMutex<Semaphore<Lock, true>, false, 4, 1024, 16> {};
#else
class Mutex : public SpinMutex<FifoSemaphore<Lock, true>, false, 0, 0, 0> {};
class Mutex : public SpinMutex<Semaphore<Lock, true>, false, 0, 0, 0> {};
#endif
// simple blocking RW lock: release alternates; new readers block when writer waits -> no starvation
......