Commit 9c5564ac authored by Martin Karsten's avatar Martin Karsten

- prepare I/O synchronization for timeouts and asynchronous cancellation

parent 1fa91d75
Pipeline #40942 passed with stage
in 8 minutes and 27 seconds
......@@ -39,7 +39,7 @@ class EventScope {
// http://pubs.opengroup.org/onlinepubs/9699919799/functions/V2_chap02.html#tag_15_14
// A fixed-size array based on 'getrlimit' is somewhat brute-force, but simple and fast.
struct SyncSem {
FastMutex mtx;
Mutex<FastMutex> mtx;
Semaphore<WorkerLock,true> sem;
};
struct SyncFD {
......@@ -261,7 +261,7 @@ public:
void blockPollFD(int fd) {
RASSERT0(fd >= 0 && fd < fdCount);
masterPoller->setupPollFD(fd, true); // reset using ONESHOT to reduce polling
ScopedLock<FastMutex> sl(fdSyncVector[fd].RD.mtx);
ScopedLock<Mutex<FastMutex>> sl(fdSyncVector[fd].RD.mtx);
fdSyncVector[fd].RD.sem.P();
}
......@@ -286,7 +286,7 @@ public:
void block(int fd) {
RASSERT0(fd >= 0 && fd < fdCount);
SyncSem& sync = Input ? fdSyncVector[fd].RD : fdSyncVector[fd].WR;
ScopedLock<FastMutex> sl(sync.mtx);
ScopedLock<Mutex<FastMutex>> sl(sync.mtx);
sync.sem.P();
}
......@@ -338,14 +338,14 @@ public:
if (ret >= 0 || !TestEAGAIN<Input>()) return ret;
}
#endif
Fibre::yield();
SyncSem& sync = Input ? fdSyncVector[fd].RD : fdSyncVector[fd].WR;
ScopedLock<FastMutex> sl(sync.mtx);
ScopedLock<Mutex<FastMutex>> sl(sync.mtx);
for (;;) {
sync.sem.P();
ret = iofunc(fd, a...);
if (ret >= 0 || !TestEAGAIN<Input>()) break;
if (ret >= 0 || !TestEAGAIN<Input>()) return ret;
sync.sem.P_yield();
}
return ret;
}
};
......
......@@ -304,6 +304,9 @@ public:
template<typename... Args>
bool P(const Args&... args) { lock.acquire(); return internalP(false, args...); }
template<typename... Args>
bool P_yield(const Args&... args) { lock.acquire(); return internalP(true, args...); }
bool tryP() { return P(false); }
template<typename Lock2>
......@@ -313,11 +316,6 @@ public:
return internalP(false);
}
bool P_yield(bool yield) {
lock.acquire();
return internalP(yield);
}
void P_fake(size_t c = 1) {
ScopedLock<Lock> al(lock);
if (Binary) counter = 0;
......@@ -394,15 +392,20 @@ class BinaryBenaphore : public Benaphore<SemType> {
using Benaphore<SemType>::counter;
using Benaphore<SemType>::sem;
public:
BinaryBenaphore(ssize_t c) : Benaphore<SemType>(c) {}
bool P(bool wait = true) {
bool internalP(bool yield, bool wait) {
if (!wait) return Benaphore<SemType>::tryP();
if (__atomic_sub_fetch(&counter, 1, __ATOMIC_SEQ_CST) < 0) sem.P();
else if (yield) StackContext::yield();
return true;
}
public:
BinaryBenaphore(ssize_t c) : Benaphore<SemType>(c) {}
bool P(bool wait = true) { return internalP(false, wait); }
bool P_yield(bool wait = true) { return internalP(true, wait); }
template<bool Enqueue = true>
StackContext* V() {
ssize_t c = counter;
......
......@@ -14,7 +14,7 @@
//#define TESTING_PLACEMENT_STAGING 1 // load-based staging vs. round-robin placement
//#define TESTING_IDLE_SPIN 65536 // spin before idle loop
//#define TESTING_HALT_SPIN 65536 // spin before halting worker thread/core
//#define TESTING_MUTEX_FIFO 1 // use fifo/baton mutex
//#define TESTING_MUTEX_FIFO 1 // use baton-passing fifo mutex
//#define TESTING_MUTEX_BARGING 1 // use blocking/barging mutex
//#define TESTING_MUTEX_SPIN 1 // spin before block in non-fifo mutex
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment