Commit 19324187 authored by Martin Karsten's avatar Martin Karsten
Browse files

- streamline fork() support and fix statistics after forking

parent 231cd801
......@@ -102,7 +102,7 @@ void Cluster::preFork(_friend<EventScope>) {
RASSERT(ringCount == 1, ringCount);
}
void Cluster::postFork1(cptr_t parent, _friend<EventScope>) {
void Cluster::postFork(cptr_t parent, _friend<EventScope> fes) {
new (stats) FredStats::ClusterStats(this, parent);
for (size_t p = 0; p < iPollCount; p += 1) {
iPollVec[p].~PollerType();
......@@ -112,19 +112,22 @@ void Cluster::postFork1(cptr_t parent, _friend<EventScope>) {
oPollVec[p].~PollerType();
new (&oPollVec[p]) PollerType(scope, stagingProc, this, "O-Poller ", _friend<Cluster>());
}
stagingProc.reset(*this, fes, "Staging ");
BaseProcessor* p = placeProc;
for (size_t i = 0; i < ringCount; i += 1) {
p->reset(*this, fes);
p = ProcessorRingGlobal::next(*p);
}
#if TESTING_WORKER_IO_URING
CurrWorker().iouring->~IOUring();
new (CurrWorker().iouring) IOUring(&CurrWorker(), "W-IOUring ");
#endif
#if TESTING_WORKER_POLLER
CurrWorker().workerPoller->~WorkerPoller();
new (CurrWorker().workerPoller) WorkerPoller(Context::CurrEventScope(), &CurrWorker(), "W-Poller ");
#endif
}
void Cluster::postFork2(_friend<EventScope>) {
start();
}
Fibre* Cluster::registerWorker(_friend<EventScope>) {
Worker* worker = new Worker(*this);
Fibre* mainFibre = new Fibre(*worker, Fibre::DefaultAffinity, _friend<Cluster>(), 0); // caller continues on pthread stack
......
......@@ -125,8 +125,7 @@ public:
}
void preFork(_friend<EventScope>);
void postFork1(cptr_t parent, _friend<EventScope>);
void postFork2(_friend<EventScope>);
void postFork(cptr_t parent, _friend<EventScope>);
#if TESTING_WORKER_IO_URING
static IOUring& getWorkerUring() {
......
......@@ -244,6 +244,12 @@ public:
RASSERT0(timerQueue.empty());
RASSERT0(diskCluster == nullptr);
mainCluster->preFork(_friend<EventScope>());
for (int f = 0; f < fdCount; f += 1) {
RASSERT(fdSyncVector[f].sync[false].getValue() >= 0, f);
RASSERT(fdSyncVector[f].sync[true].getValue() >= 0, f);
RASSERT(fdSyncVector[f].poller[false] == 0, f);
RASSERT(fdSyncVector[f].poller[true] == 0, f);
}
}
void postFork() {
......@@ -253,12 +259,8 @@ public:
delete masterPoller; // FreeBSD does not copy kqueue across fork()
#endif
masterPoller = new MasterPoller(*this, fdCount, _friend<EventScope>()); // start master poller & timer handling
mainCluster->postFork1(this, _friend<EventScope>());
for (int f = 0; f < fdCount; f += 1) {
RASSERT(fdSyncVector[f].sync[false].getValue() >= 0, f);
RASSERT(fdSyncVector[f].sync[true].getValue() >= 0, f);
}
mainCluster->postFork2(_friend<EventScope>());
mainCluster->postFork(this, _friend<EventScope>());
mainCluster->startPolling(_friend<EventScope>());
}
/** Wait for the main routine of a cloned event scope. */
......@@ -355,7 +357,7 @@ public:
Time absTimeout;
if (timeout > 0) absTimeout = Runtime::Timer::now() + Time::fromMS(timeout);
for (;;) {
if (timeout < 0) { RASSERT0(sync.P()); }
if (timeout < 0) sync.P();
else if (!sync.P(absTimeout)) return 0;
ret = ::epoll_wait(epfd, events, maxevents, 0);
if (ret != 0) return ret;
......
......@@ -74,6 +74,10 @@ public:
queue[f.getPriority()].push(f);
stats->queue.add();
}
void reset(BaseProcessor& bp, _friend<EventScope>) {
new (stats) FredStats::ReadyQueueStats(this, &bp);
}
};
class BaseProcessor;
......@@ -175,6 +179,11 @@ public:
handoverFred = f;
haltSem.V(*this);
}
void reset(Scheduler& c, _friend<EventScope> token, const char* n = "Processor ") {
new (stats) FredStats::ProcessorStats(this, &c, n);
readyQueue.reset(*this, token);
}
};
#endif /* _BaseProcessor_h_ */
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment