Commit 8bdd3734 authored by Martin Karsten's avatar Martin Karsten

- streamline synchronization init/destroy interfaces for pthread compatibility

parent 9c5564ac
Pipeline #41042 passed with stage
......@@ -142,8 +142,8 @@ public:
masterPoller = new MasterPoller(*this, fdCount, _friend<EventScope>()); // start master poller & timer handling
mainCluster->postFork1(_friend<EventScope>());
for (int f = 0; f < fdCount; f += 1) {
RASSERT(fdSyncVector[f].RD.sem.empty(), f);
RASSERT(fdSyncVector[f].WR.sem.empty(), f);
RASSERT(fdSyncVector[f].RD.sem.getValue() >= 0, f);
RASSERT(fdSyncVector[f].WR.sem.getValue() >= 0, f);
}
mainCluster->postFork2(_friend<EventScope>());
}
......@@ -228,8 +228,8 @@ public:
void deregisterFD(int fd) {
RASSERT0(fd >= 0 && fd < fdCount);
SyncFD& fdsync = fdSyncVector[fd];
RASSERT0(fdsync.RD.sem.empty());
RASSERT0(fdsync.WR.sem.empty());
fdsync.RD.sem.reinit();
fdsync.WR.sem.reinit();
#if TESTING_LAZY_FD_REGISTRATION
ScopedLock<FastMutex> sl(fdsync.lock);
fdsync.status = 0;
......
......@@ -51,11 +51,6 @@ public:
void release() {
SYSCALL(pthread_mutex_unlock(&mutex));
}
bool test() {
if (!tryAcquire()) return true;
release();
return false;
}
};
class OsCondition {
......
......@@ -225,13 +225,13 @@ inline int fibre_migrate(Cluster *cluster) {
/** @brief Initialize semaphore object. (`sem_init`) */
inline int fibre_sem_init(fibre_sem_t *sem, int pshared, unsigned int value) {
RASSERT0(pshared == 0);
sem->reset(value);
sem->init(value);
return 0;
}
/** @brief Destroy semaphore object. (`sem_destroy`) */
inline int fibre_sem_destroy(fibre_sem_t *sem) {
sem->reset();
sem->destroy();
return 0;
}
......@@ -281,6 +281,7 @@ inline int fibre_mutexattr_settype(fibre_mutexattr_t *attr, int type) {
/** @brief Initialize mutex lock. (`pthread_mutex_init`) */
inline int fibre_mutex_init(fibre_mutex_t *restrict mutex, const fibre_mutexattr_t *restrict attr) {
mutex->init();
#if TESTING_LOCK_RECURSION
if (attr && attr->type == PTHREAD_MUTEX_RECURSIVE) mutex->enableRecursion();
#else
......@@ -291,6 +292,7 @@ inline int fibre_mutex_init(fibre_mutex_t *restrict mutex, const fibre_mutexattr
/** @brief Destroy mutex lock. (`pthread_mutex_destroy`) */
inline int fibre_mutex_destroy(fibre_mutex_t *mutex) {
mutex->destroy();
return 0;
}
......@@ -319,11 +321,13 @@ inline int fibre_mutex_unlock(fibre_mutex_t *mutex) {
/** @brief Initialize condition variable. (`pthread_cond_init`) */
inline int fibre_cond_init(fibre_cond_t *restrict cond, const fibre_condattr_t *restrict attr) {
RASSERT0(attr == nullptr);
cond->init();
return 0;
}
/** @brief Destroy condition variable. (`pthread_cond_init`) */
inline int fibre_cond_destroy(fibre_cond_t *cond) {
cond->destroy();
return 0;
}
......@@ -356,11 +360,13 @@ inline int fibre_cond_broadcast(fibre_cond_t *cond) {
/** @brief Initialize rw-lock. (`pthread_rwlock_init`) */
inline int fibre_rwlock_init(fibre_rwlock_t *restrict rwlock, const fibre_rwlockattr_t *restrict attr) {
RASSERT0(attr == nullptr);
rwlock->init();
return 0;
}
/** @brief Destroy rw-lock. (`pthread_rwlock_init`) */
inline int fibre_rwlock_destroy(fibre_rwlock_t *rwlock) {
rwlock->destroy();
return 0;
}
......@@ -405,12 +411,13 @@ inline int fibre_rwlock_unlock(fibre_rwlock_t *rwlock){
/** @brief Initialize barrier. (`pthread_barrier_init`) */
inline int fibre_barrier_init(fibre_barrier_t *restrict barrier, const fibre_barrierattr_t *restrict attr, unsigned count) {
RASSERT0(attr == nullptr);
barrier->reset(count);
barrier->init(count);
return 0;
}
/** @brief Destroy barrier. (`pthread_barrier_destroy`) */
inline int fibre_barrier_destroy(fibre_barrier_t *barrier) {
barrier->destroy();
return 0;
}
......
......@@ -24,17 +24,11 @@ class Benaphore {
protected:
volatile ssize_t counter;
SemType sem;
public:
Benaphore(ssize_t c = 0) : counter(c), sem(0) {}
bool empty() { return counter >= 0; }
bool open() { return counter > 0; }
explicit Benaphore(ssize_t c = 0) : counter(c), sem(0) {}
ssize_t getValue() { return counter; }
void reset(ssize_t c = 0) {
counter = c;
sem.reset(0);
}
bool P() {
if (__atomic_sub_fetch(&counter, 1, __ATOMIC_SEQ_CST) < 0) sem.P();
return true;
......@@ -57,8 +51,6 @@ public:
void V() {
if (__atomic_add_fetch(&counter, 1, __ATOMIC_SEQ_CST) < 1) sem.V();
}
void release() { return V(); }
};
#endif /* _Benaphore_h_ */
......@@ -209,24 +209,6 @@ public:
~BlockingQueue() { RASSERT0(empty()); }
bool empty() const { return queue.empty(); }
template<typename Lock>
void reset(Lock& lock) {
RASSERT0(lock.test());
StackContext* s = queue.front();
while (s != queue.edge()) {
ResumeInfo* ri = s->raceResume();
StackContext* ns = FlexStackList::next(*s);
if (ri) {
ri->cancelTimer();
FlexStackList::remove(*s);
DBG::outl(DBG::Level::Blocking, "Stack ", FmtHex(s), " clear/resume from ", FmtHex(&queue));
s->resume();
}
s = ns;
}
lock.release();
while (!empty()) Pause(); // wait for timed out events to disappear
}
template<typename Lock>
bool block(Lock& lock, bool wait = true) {
if (wait) {
......@@ -268,13 +250,28 @@ public:
}
return nullptr;
}
void reset() { // not concurrency-safe; better hold lock
StackContext* s = queue.front();
while (s != queue.edge()) {
ResumeInfo* ri = s->raceResume();
StackContext* ns = FlexStackList::next(*s);
if (ri) {
ri->cancelTimer();
FlexStackList::remove(*s);
DBG::outl(DBG::Level::Blocking, "Stack ", FmtHex(s), " clear/resume from ", FmtHex(&queue));
s->resume();
}
s = ns;
}
}
};
template<typename Lock, bool Binary, typename BQ = BlockingQueue>
class Semaphore {
protected:
Lock lock;
ssize_t counter;
volatile ssize_t counter;
BQ bq;
template<typename... Args>
......@@ -290,15 +287,24 @@ protected:
public:
explicit Semaphore(ssize_t c = 0) : counter(c) {}
// baton passing requires serialization at destruction
~Semaphore() { ScopedLock<Lock> sl(lock); }
bool empty() { return bq.empty(); }
bool open() { return counter >= 1; }
~Semaphore() { destroy(); }
ssize_t getValue() { return counter; }
void reset(ssize_t c = 0) {
lock.acquire();
void init(ssize_t c = 0) {
ScopedLock<Lock> al(lock);
RASSERT0(bq.empty());
counter = c;
}
void destroy() {
ScopedLock<Lock> al(lock);
bq.reset();
}
void reinit(ssize_t c = 0) {
ScopedLock<Lock> al(lock);
bq.reset();
counter = c;
bq.reset(lock);
}
template<typename... Args>
......@@ -331,15 +337,15 @@ public:
else counter += 1;
return nullptr;
}
void release() { return V(); }
};
// limited: no concurrent invocations of V() allowed!
class LimitedSemaphore : public BaseSuspender {
FlexStackMPSC queue;
public:
LimitedSemaphore(ssize_t c = 0) { RASSERT(c == 0, c); }
explicit LimitedSemaphore(ssize_t c = 0) { RASSERT(c == 0, c); }
bool P() {
StackContext* cs = Context::CurrStack();
queue.push(*cs);
......@@ -347,6 +353,7 @@ public:
BaseSuspender::doSuspend(*cs);
return true;
}
template<bool Enqueue = true>
StackContext* V() {
StackContext* next;
......@@ -362,12 +369,11 @@ public:
};
class FastFifoMutex {
volatile size_t counter;
size_t counter;
LimitedSemaphore sem;
public:
FastFifoMutex() : counter(0) {}
bool test() { return counter > 0; }
bool acquire() {
if (__atomic_add_fetch(&counter, 1, __ATOMIC_SEQ_CST) == 1) return true;
......@@ -400,7 +406,7 @@ class BinaryBenaphore : public Benaphore<SemType> {
}
public:
BinaryBenaphore(ssize_t c) : Benaphore<SemType>(c) {}
explicit BinaryBenaphore(ssize_t c) : Benaphore<SemType>(c) {}
bool P(bool wait = true) { return internalP(false, wait); }
......@@ -418,8 +424,6 @@ public:
}
}
}
void release() { return V(); }
};
template<typename Lock, bool Fifo, typename BQ = BlockingQueue>
......@@ -448,8 +452,19 @@ protected:
public:
LockedMutex() : owner(nullptr) {}
// baton passing requires serialization at destruction
~LockedMutex() { if (!Fifo) return; ScopedLock<Lock> sl(lock); }
bool test() const { return owner != nullptr; }
~LockedMutex() { destroy(); }
void init() {
ScopedLock<Lock> al(lock);
RASSERT0(bq.empty());
owner = nullptr;
}
void destroy() {
ScopedLock<Lock> al(lock);
RASSERT(owner == nullptr, FmtHex(owner));
bq.reset();
}
template<typename... Args>
bool acquire(const Args&... args) { return internalAcquire<false>(args...); }
......@@ -508,7 +523,18 @@ protected:
public:
SpinMutex() : owner(nullptr), sem(1) {}
bool test() const { return owner != nullptr; }
void init() {
StackContext* old = __atomic_exchange_n(&owner, nullptr, __ATOMIC_SEQ_CST);
RASSERT(old == nullptr, FmtHex(old));
sem.init(1);
}
void destroy() {
StackContext* old = __atomic_exchange_n(&owner, nullptr, __ATOMIC_SEQ_CST);
RASSERT(old == nullptr, FmtHex(old));
sem.destroy();
}
template<typename... Args>
bool acquire(const Args&... args) { return internalAcquire<false>(args...); }
......@@ -531,6 +557,12 @@ public:
OwnerMutex() : counter(0), recursion(false) {}
void enableRecursion() { recursion = true; }
void init() {
BaseMutex::init();
counter = 0;
recursion = false;
}
template<typename... Args>
size_t acquire(const Args&... args) {
bool success = recursion
......@@ -600,6 +632,20 @@ class LockRW {
public:
LockRW() : state(0) {}
void init() {
ScopedLock<Lock> al(lock);
RASSERT0(bqR.empty());
RASSERT0(bqW.empty());
state = 0;
}
void destroy() {
ScopedLock<Lock> al(lock);
RASSERT(state == 0, state);
bqR.reset();
bqW.reset();
}
template<typename... Args>
bool acquireRead(const Args&... args) { return internalAR(args...); }
bool tryAcquireRead() { return acquireRead(false); }
......@@ -631,14 +677,21 @@ class Barrier {
size_t counter;
BQ bq;
public:
Barrier(size_t t = 1) : target(t), counter(0) { RASSERT0(t); }
void reset(size_t t = 1) {
RASSERT0(t);
lock.acquire();
explicit Barrier(size_t t = 1) : target(t), counter(0) { RASSERT0(t > 0); }
void init(size_t t = 1) {
RASSERT0(t > 0)
ScopedLock<Lock> al(lock);
RASSERT0(bq.empty())
target = t;
counter = 0;
bq.reset(lock);
}
void destroy() {
ScopedLock<Lock> al(lock);
bq.reset();
}
bool wait() {
lock.acquire();
counter += 1;
......@@ -658,14 +711,17 @@ public:
template<typename BQ = BlockingQueue>
class Condition {
BQ bq;
public:
bool empty() { return bq.empty(); }
template<typename Lock>
void reset(Lock& lock) { bq.reset(lock); }
void init() { RASSERT0(bq.empty()); }
void destroy() { bq.reset(); }
template<typename Lock>
bool wait(Lock& lock) { return bq.block(lock); }
template<typename Lock>
bool wait(Lock& lock, const Time& timeout) { return bq.block(lock, timeout); }
template<bool Broadcast = false>
void signal() { while (bq.unblock() && Broadcast); }
};
......@@ -685,11 +741,12 @@ protected:
public:
static const State Invalid = Dummy; // dummy bit never set (for ManagedArray)
SynchronizedFlag(State s = Running) : state(s) {}
void reset() { state = Running; }
explicit SynchronizedFlag(State s = Running) : state(s) {}
bool posted() const { return state == Posted; }
bool detached() const { return state == Detached; }
void init() { state = Running; }
bool wait(Lock& lock) { // returns false, if detached
if (state == Running) {
BlockingInfo<Lock> bi(lock);
......@@ -721,7 +778,7 @@ public:
template<typename Runner, typename Result, typename Lock>
class Joinable : public SynchronizedFlag<Lock> {
using Baseclass = SynchronizedFlag<Lock>;
protected:
union {
Runner* runner;
Result result;
......@@ -752,7 +809,7 @@ class SyncPoint : public SynchronizedFlag<Lock> {
Lock lock;
public:
SyncPoint(State s = Baseclass::Running) : Baseclass(s) {}
void reset() { ScopedLock<Lock> al(lock); Baseclass::reset(); }
void init() { ScopedLock<Lock> al(lock); Baseclass::init(); }
bool wait() { ScopedLock<Lock> al(lock); return Baseclass::wait(lock); }
bool post() { ScopedLock<Lock> al(lock); return Baseclass::post(); }
void detach() { ScopedLock<Lock> al(lock); Baseclass::detach(); }
......
......@@ -33,7 +33,6 @@ protected:
volatile bool locked;
public:
BinaryLock() : locked(false) {}
bool test() const { return locked; }
bool tryAcquire() {
if (locked) return false;
return !__atomic_test_and_set(&locked, __ATOMIC_SEQ_CST);
......@@ -61,7 +60,6 @@ class BinaryOwnerLock {
size_t counter;
public:
BinaryOwnerLock() : owner(noOwner), counter(0) {}
bool test() const { return owner != noOwner; }
size_t tryAcquire(T caller) {
if (owner != caller) {
if (owner != noOwner) return 0;
......@@ -99,7 +97,6 @@ class TicketLock {
size_t ticket;
public:
TicketLock() : serving(0), ticket(0) {}
bool test() const { return serving != ticket; }
bool tryAcquire() {
if (serving != ticket) return false;
size_t tryticket = serving;
......@@ -129,7 +126,6 @@ private:
Node* volatile tail;
public:
MCSLock() : tail(nullptr) {}
bool test() const { return tail != nullptr; }
bool tryAcquire(Node& n) {
n.next = nullptr;
return ((tail == nullptr) && _CAS(&tail, (Node*)nullptr, &n, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment