...
 
Commits (6)
......@@ -26,7 +26,7 @@ pordering: ordering.cpp
$(CXX) -pthread $< -o $@
gthreadtest: threadtest.go
/usr/bin/go build -o $@ $<
go build -o $@ $<
pthreadtest: threadtest.cpp include/pthreads.h
$(CXX) -Iinclude -DVARIANT=\"pthreads.h\" $(CXXFLAGS) -DPTHREADS $< -o $@ -lrt
......
......@@ -36,6 +36,7 @@ typedef cpuset_t cpu_set_t;
#define __LIBFIBRE__
#include "libfibre/fibre.h"
//#define FibreMutex FastMutex
#else /* VARIANT */
......@@ -46,16 +47,14 @@ typedef cpuset_t cpu_set_t;
#endif /* SYSCALL */
#include VARIANT
#include "runtime/Platform.h"
#endif /* VARIANT */
// simulate yield-based busy-locking as in qthreads, boost
#define TRY_YIELD_LOCK 0
#if defined MORDOR_MAIN || BOOST_VERSION || QTHREAD_VERSION
#if TRY_YIELD_LOCK
#error TRY_YIELD_LOCK not supported with boost, mordor, or qthread
#endif
#if defined MORDOR_MAIN || defined BOOST_VERSION || defined QTHREAD_VERSION || defined ARACHNE_H_ || defined _FIBER_FIBER_H_
#define HASTRYLOCK 0
#else
#define HASTRYLOCK 1
#endif
// configuration default settings
......@@ -72,6 +71,7 @@ static bool affinityFlag = false;
static bool calibration = false;
static bool yieldExperiment = false;
static char lockType = 'B';
// worker descriptor
struct Worker {
......@@ -109,13 +109,13 @@ static void alarmHandler(int) {
// help message
static void usage(const char* prog) {
cerr << "usage: " << prog << " -d <duration (secs)> -f <total fibres> -l <locks> -t <system threads> -u <unlocked work> -w <locked work> -s -y -a -c" << endl;
cerr << "usage: " << prog << " -d <duration (secs)> -f <total fibres> -l <locks> -t <system threads> -u <unlocked work> -w <locked work> -s -y -a -c -Y -L <lock type>" << endl;
}
// command-line option processing
static bool opts(int argc, char** argv) {
for (;;) {
int option = getopt( argc, argv, "d:f:l:t:u:w:syacYh?" );
int option = getopt( argc, argv, "d:f:l:t:u:w:syacYL:h?" );
if ( option < 0 ) break;
switch(option) {
case 'd': duration = atoi(optarg); break;
......@@ -129,6 +129,7 @@ static bool opts(int argc, char** argv) {
case 'a': affinityFlag = true; break;
case 'c': calibration = true; break;
case 'Y': yieldExperiment = true; break;
case 'L': lockType = optarg[0]; break;
case 'h':
case '?':
usage(argv[0]);
......@@ -149,6 +150,16 @@ static bool opts(int argc, char** argv) {
usage(argv[0]);
return false;
}
if (lockType >= 'a') lockType -= 32;
switch (lockType) {
#if HASTRYLOCK
case 'Y':
case 'S':
#endif
case 'B': break;
default: cerr << "lock type " << lockType << " not supported" << endl;
return false;
}
#if defined MORDOR_MAIN
if (!yieldFlag) {
cout << "Mordor always runs with -y flag set" << endl;
......@@ -268,11 +279,17 @@ static void worker(void* arg) {
// unlocked work
if (work_unlocked != (unsigned int)-1) dowork(buffer, work_unlocked);
// locked work and counters
#if TRY_YIELD_LOCK
while (!locks[lck].mutex.tryAcquire()) Fibre::yield();
#else
locks[lck].mutex.acquire();
switch (lockType) {
// regular blocking lock
case 'B': locks[lck].mutex.acquire(); break;
#if HASTRYLOCK
// plain spin lock
case 'S': while (!locks[lck].mutex.tryAcquire()) Pause(); break;
// yield-based busy-locking (as in qthreads, boost)
case 'Y': while (!locks[lck].mutex.tryAcquire()) Fibre::yield(); break;
#endif
default: cerr << "internal error: lock type" << endl; abort();
}
if (work_locked != (unsigned int)-1) dowork(buffer, work_locked);
workers[num].counter += 1;
locks[lck].counter += 1;
......@@ -347,7 +364,7 @@ int main(int argc, char** argv) {
poolScheduler = new WorkerPool(threadCount);
#elif defined BOOST_VERSION
boost_init(threadCount);
#elif defined __LIBFIBRE__ || __U_CPLUSPLUS__
#elif defined __LIBFIBRE__ || defined __U_CPLUSPLUS__
OsProcessor* proc = new OsProcessor[threadCount - 1];
#elif defined QTHREAD_VERSION
setenv("QTHREAD_STACK_SIZE", "65536", 1);
......
......@@ -373,7 +373,7 @@ static void acceptor_loop(void* arg) {
#endif
if (!CurrGarage().run((void*)arg)) {
__atomic_add_fetch(&connectionFibres, 1, __ATOMIC_RELAXED);
new Fibre(acceptor_loop, (void*)arg);
new Fibre(acceptor_loop, (void*)arg, true);
}
while (connHandler((void*)connFD));
CurrGarage().park();
......@@ -482,11 +482,11 @@ static void scopemain(void* arg) {
for (unsigned int c = 0; c < clusterCount; c += 1) {
if (listenerCount) {
for (unsigned int i = 0; i < listenerCount; i += 1) {
Fibre* f = new Fibre(acceptor, (void*)servFD);
Fibre* f = new Fibre(acceptor, (void*)servFD, true);
fibreList.push_back(f);
}
} else {
Fibre* f = new Fibre(acceptor_loop, (void*)servFD);
Fibre* f = new Fibre(acceptor_loop, (void*)servFD, true);
fibreList.push_back(f);
}
#if defined __LIBFIBRE__
......
......@@ -28,9 +28,9 @@ class Scheduler;
class ReadyQueue {
RuntimeLock readyLock;
#if TESTING_LOCKED_READYQUEUE
StackQueue<ReadyQueueLink> queue[NumPriority];
FlexStackQueue queue[NumPriority];
#else
StackMPSC<ReadyQueueLink> queue[NumPriority];
FlexStackMPSC queue[NumPriority];
#endif
ReadyQueue(const ReadyQueue&) = delete; // no copy
......
......@@ -120,7 +120,7 @@ public:
lock.release();
doSuspend(stack);
}
void suspend(BlockedStackList& queue, StackContext& stack = *Context::CurrStack()) {
void suspend(FlexStackList& queue, StackContext& stack = *Context::CurrStack()) {
setupResumeRace(stack);
prepareSuspend();
queue.push_back(stack);
......@@ -135,7 +135,7 @@ class TimeoutBlockingInfo : public BlockingInfo<Lock>, public TimeoutInfo {
bool timedOut;
public:
TimeoutBlockingInfo(Lock& l, StackContext& stack = *Context::CurrStack()) : BaseBI(l), TimeoutInfo(stack), timedOut(false) {}
bool suspendAbsolute(BlockedStackList& queue, const Time& timeout, const Time& now) {
bool suspendAbsolute(FlexStackList& queue, const Time& timeout, const Time& now) {
BaseBI::setupResumeRace(stack);
prepareSuspend();
queue.push_back(stack);
......@@ -150,7 +150,7 @@ public:
virtual void fireTimer() {
timedOut = true;
BaseBI::lock.acquire();
BlockedStackList::remove(stack);
FlexStackList::remove(stack);
BaseBI::lock.release();
TimeoutInfo::fireTimer();
}
......@@ -195,7 +195,7 @@ static inline void sleepStack(const Time& timeout) {
}
class BlockingQueue {
BlockedStackList queue;
FlexStackList queue;
BlockingQueue(const BlockingQueue&) = delete; // no copy
BlockingQueue& operator=(const BlockingQueue&) = delete; // no assignment
......@@ -211,10 +211,10 @@ public:
StackContext* s = queue.front();
while (s != queue.edge()) {
ResumeInfo* ri = s->raceResume();
StackContext* ns = BlockedStackList::next(*s);
StackContext* ns = FlexStackList::next(*s);
if (ri) {
ri->cancelTimer();
BlockedStackList::remove(*s);
FlexStackList::remove(*s);
DBG::outl(DBG::Level::Blocking, "Stack ", FmtHex(s), " clear/resume from ", FmtHex(&queue));
s->resume();
}
......@@ -252,11 +252,11 @@ public:
template<bool Enqueue = true>
StackContext* unblock() { // not concurrency-safe; better hold lock
for (StackContext* s = queue.front(); s != queue.edge(); s = BlockedStackList::next(*s)) {
for (StackContext* s = queue.front(); s != queue.edge(); s = FlexStackList::next(*s)) {
ResumeInfo* ri = s->raceResume();
if (ri) {
ri->cancelTimer();
BlockedStackList::remove(*s);
FlexStackList::remove(*s);
DBG::outl(DBG::Level::Blocking, "Stack ", FmtHex(s), " resume from ", FmtHex(&queue));
if (Enqueue) s->resume();
return s;
......@@ -332,8 +332,31 @@ public:
void release() { return V(); }
};
template<typename Lock, bool Fifo = true, bool OwnerLock = false, typename BQ = BlockingQueue>
class SimpleMutex {
class FastMutex : public BaseSuspender {
BlockStackMCS queue;
public:
bool acquire() {
StackContext* cs = Context::CurrStack();
if (!queue.push(*cs)) {
prepareSuspend();
doSuspend(*cs);
}
return true;
}
bool tryAcquire() {
StackContext* cs = Context::CurrStack();
return queue.tryPushEmpty(*cs);
}
void release() {
StackContext* cs = Context::CurrStack();
StackContext* next = queue.next(*cs);
BlockStackMCS::clear(*cs);
if (next) next->resume();
}
};
template<typename Lock, bool OwnerLock, bool Fifo, typename BQ = BlockingQueue>
class LockedMutex {
Lock lock;
StackContext* owner;
BQ bq;
......@@ -356,9 +379,9 @@ protected:
}
public:
SimpleMutex() : owner(nullptr) {}
LockedMutex() : owner(nullptr) {}
// baton passing requires serialization at destruction
~SimpleMutex() { if (!Fifo) return; ScopedLock<Lock> sl(lock); }
~LockedMutex() { if (!Fifo) return; ScopedLock<Lock> sl(lock); }
bool test() const { return owner != nullptr; }
template<typename... Args>
......@@ -382,25 +405,36 @@ class SpinMutex {
StackContext* owner;
Semaphore sem;
template<typename... Args>
bool tryOnly(const Args&... args) { return false; }
template<typename... Args>
bool tryOnly(bool wait) { return !wait; }
bool tryLock(StackContext* cs) {
StackContext* exp = nullptr;
return __atomic_compare_exchange_n(&owner, &exp, cs, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);
}
protected:
template<typename... Args>
bool internalAcquire(const Args&... args) {
StackContext* cs = Context::CurrStack();
if (OwnerLock && cs == owner) return true;
RASSERT(cs != owner, FmtHex(cs), FmtHex(owner));
if (tryOnly(args...)) return tryLock(cs);
size_t cnt = 0;
size_t spin = SpinStart;
for (;;) {
StackContext* exp = nullptr;
if (__atomic_compare_exchange_n(&owner, &exp, cs, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)) return true;
if (tryLock(cs)) return true;
if (cnt < SpinCount) {
for (size_t i = 0; i < spin; i += 1) Pause();
if (spin <= SpinEnd) spin += spin;
if (spin < SpinEnd) spin += spin;
else cnt += 1;
} else {
if (!sem.P(args...)) return false;
cnt = 0;
spin = SpinStart;
if (!sem.P(args...)) return false;
}
}
}
......@@ -440,15 +474,15 @@ public:
}
};
template<typename Lock>
template<typename Lock, bool OwnerLock = false>
#if TESTING_MUTEX_FIFO
class Mutex : public SimpleMutex<Lock> {};
class Mutex : public LockedMutex<Lock, OwnerLock, true> {};
#elif TESTING_MUTEX_BARGING
class Mutex : public SimpleMutex<Lock, false> {};
class Mutex : public LockedMutex<Lock, OwnerLock, false> {};
#elif TESTING_MUTEX_SPIN
class Mutex : public SpinMutex<Semaphore<Lock, true>, false, 4, 1024, 16> {};
class Mutex : public SpinMutex<Semaphore<Lock, true>, OwnerLock, 4, 1024, 16> {};
#else
class Mutex : public SpinMutex<Semaphore<Lock, true>, false, 0, 0, 0> {};
class Mutex : public SpinMutex<Semaphore<Lock, true>, OwnerLock, 0, 0, 0> {};
#endif
// simple blocking RW lock: release alternates; new readers block when writer waits -> no starvation
......
This diff is collapsed.
......@@ -25,7 +25,7 @@ class LoadManager {
volatile ssize_t stackCounter;
RuntimeLock procLock;
ProcessorList waitingProcs;
BlockedStackList waitingStacks;
FlexStackList waitingStacks;
LoadManagerStats* stats;
......
......@@ -115,6 +115,7 @@ public:
}
} __caligned;
// https://doi.org/10.1145/103727.103729
// pro: no cache contention -> scalability
// con: storage node, lock bouncing -> use cohorting?
// tested acquire/release memory ordering -> failure?
......
......@@ -45,6 +45,9 @@ public IntrusiveList<StackContext,NUM,StackLinkCount,DoubleLink<StackContext,Sta
template <size_t NUM> class StackQueue :
public IntrusiveQueue<StackContext,NUM,StackLinkCount,DoubleLink<StackContext,StackLinkCount>> {};
template <size_t NUM> class StackMCS :
public IntrusiveQueueMCS<StackContext,NUM,StackLinkCount,DoubleLink<StackContext,StackLinkCount>> {};
template <size_t NUM> class StackMPSC :
#if TESTING_NEMESIS_READYQUEUE
public IntrusiveQueueNemesis<StackContext,NUM,StackLinkCount,DoubleLink<StackContext,StackLinkCount>> {};
......@@ -52,11 +55,16 @@ public IntrusiveQueueNemesis<StackContext,NUM,StackLinkCount,DoubleLink<StackCon
public IntrusiveQueueStub<StackContext,NUM,StackLinkCount,DoubleLink<StackContext,StackLinkCount>> {};
#endif
static const size_t ReadyQueueLink = 0;
typedef StackList<ReadyQueueLink> BlockedStackList;
static const size_t FlexQueueLink = 0;
typedef StackList<FlexQueueLink> FlexStackList;
typedef StackQueue<FlexQueueLink> FlexStackQueue;
typedef StackMPSC<FlexQueueLink> FlexStackMPSC;
static const size_t BlockQueueLink = 1;
typedef StackMCS<BlockQueueLink> BlockStackMCS;
#if TESTING_ENABLE_DEBUGGING
static const size_t DebugListLink = 1;
static const size_t DebugListLink = 2;
typedef StackList<DebugListLink> GlobalStackList;
#endif
......