Commit 52786249 authored by Martin Karsten's avatar Martin Karsten

- streamline lazy fd registration code and add test cases

parent 1b62ddd2
Pipeline #40418 passed with stage
in 6 minutes and 46 seconds
...@@ -166,7 +166,7 @@ void servaccept() { ...@@ -166,7 +166,7 @@ void servaccept() {
void servaccept2(void*) { void servaccept2(void*) {
Context::CurrCluster().addWorkers(2); Context::CurrCluster().addWorkers(2);
Context::CurrEventScope().registerFD<true,false,false,true>(servFD); Context::CurrEventScope().registerServerFD(servFD);
servaccept(); servaccept();
std::cout << "finishing 2nd accept loop" << std::endl; std::cout << "finishing 2nd accept loop" << std::endl;
} }
......
...@@ -155,13 +155,14 @@ public: ...@@ -155,13 +155,14 @@ public:
if (timerQueue.checkExpiry(currTime, newTime)) setTimer(newTime); if (timerQueue.checkExpiry(currTime, newTime)) setTimer(newTime);
} }
template<bool Input, bool Output, bool Lazy, bool Cluster> private:
bool registerFD(int fd) { template<bool Input, bool Output, bool Cluster>
bool internalRegisterFD(int fd, bool now) {
static_assert(Input || Output, "must set Input or Output in registerFD()"); static_assert(Input || Output, "must set Input or Output in registerFD()");
const size_t target = (Input ? BasePoller::Input : 0) | (Output ? BasePoller::Output : 0); const size_t target = (Input ? BasePoller::Input : 0) | (Output ? BasePoller::Output : 0);
#if TESTING_LAZY_FD_REGISTRATION #if TESTING_LAZY_FD_REGISTRATION
if (Lazy) return false; if (!now) return true;
RASSERT0(fd >= 0 && fd < fdCount); RASSERT0(fd >= 0 && fd < fdCount);
SyncFD& fdsync = fdSyncVector[fd]; SyncFD& fdsync = fdSyncVector[fd];
if ((fdsync.status & target) == target) return false; // outside of lock: faster, but double regs possible... if ((fdsync.status & target) == target) return false; // outside of lock: faster, but double regs possible...
...@@ -189,6 +190,15 @@ public: ...@@ -189,6 +190,15 @@ public:
return true; return true;
} }
public:
bool registerServerFD(int fd, bool now = false) {
return internalRegisterFD<true,false,true>(fd, now);
}
bool registerFD(int fd, bool now = false) {
return internalRegisterFD<true,true,false>(fd, now);
}
template<bool RemoveFromPollSet = false> template<bool RemoveFromPollSet = false>
void deregisterFD(int fd) { void deregisterFD(int fd) {
RASSERT0(fd >= 0 && fd < fdCount); RASSERT0(fd >= 0 && fd < fdCount);
...@@ -206,6 +216,18 @@ public: ...@@ -206,6 +216,18 @@ public:
#endif #endif
} }
void checkAsyncCompletion(int fd) {
registerFD(fd, true); // register immediately
block<false>(fd); // wait for completion
int ret;
socklen_t sz = sizeof(ret);
SYSCALL(getsockopt(fd, SOL_SOCKET, SO_ERROR, &ret, &sz));
RASSERT(ret == 0, ret);
#if TESTING_LAZY_FD_REGISTRATION
deregisterFD<true>(fd); // revert to lazy registration
#endif
}
void registerPollFD(int fd) { void registerPollFD(int fd) {
RASSERT0(fd >= 0 && fd < fdCount); RASSERT0(fd >= 0 && fd < fdCount);
masterPoller->setupPollFD(fd, false); // set using ONESHOT to reduce polling masterPoller->setupPollFD(fd, false); // set using ONESHOT to reduce polling
...@@ -285,7 +307,7 @@ public: ...@@ -285,7 +307,7 @@ public:
T ret = iofunc(fd, a...); T ret = iofunc(fd, a...);
if (ret >= 0 || !TestEAGAIN<Input>()) return ret; if (ret >= 0 || !TestEAGAIN<Input>()) return ret;
#if TESTING_LAZY_FD_REGISTRATION #if TESTING_LAZY_FD_REGISTRATION
if (registerFD<Input,!Input,false,false>(fd)) { if (internalRegisterFD<Input,!Input,false>(fd, true)) {
Fibre::yield(); Fibre::yield();
T ret = iofunc(fd, a...); T ret = iofunc(fd, a...);
if (ret >= 0 || !TestEAGAIN<Input>()) return ret; if (ret >= 0 || !TestEAGAIN<Input>()) return ret;
...@@ -323,16 +345,37 @@ inline T lfDirectIO( T (*diskfunc)(int, Args...), int fd, Args... a) { ...@@ -323,16 +345,37 @@ inline T lfDirectIO( T (*diskfunc)(int, Args...), int fd, Args... a) {
/** @brief Create new socket. */ /** @brief Create new socket. */
inline int lfSocket(int domain, int type, int protocol) { inline int lfSocket(int domain, int type, int protocol) {
int ret = socket(domain, type | SOCK_NONBLOCK, protocol); int ret = socket(domain, type | SOCK_NONBLOCK, protocol);
if (ret < 0) return ret;
// do not register SOCK_STREAM yet (cf. listen, connect) -> mandatory for FreeBSD! // do not register SOCK_STREAM yet (cf. listen, connect) -> mandatory for FreeBSD!
if (ret >= 0) if (type != SOCK_STREAM) Context::CurrEventScope().registerFD<true,true,true,false>(ret); if (type != SOCK_STREAM) Context::CurrEventScope().registerFD(ret);
return ret; return ret;
} }
/** @brief Bind socket to local name. */ /** @brief Bind socket to local name. */
inline int lfBind(int fd, const sockaddr *addr, socklen_t addrlen) { inline int lfBind(int fd, const sockaddr *addr, socklen_t addrlen) {
int ret = bind(fd, addr, addrlen); int ret = bind(fd, addr, addrlen);
if (ret < 0 && _SysErrno() != EINPROGRESS) return ret; if (ret >= 0) {
return 0; return ret;
} else if (_SysErrno() == EINPROGRESS) {
Context::CurrEventScope().checkAsyncCompletion(fd);
return 0;
}
return ret;
}
/** @brief Create new connection. */
inline int lfConnect(int fd, const sockaddr *addr, socklen_t addrlen) {
int ret = connect(fd, addr, addrlen);
if (ret >= 0) {
Context::CurrEventScope().registerFD(fd);
Context::CurrEventScope().stats->cliconn.count();
return ret;
} else if (_SysErrno() == EINPROGRESS) {
Context::CurrEventScope().checkAsyncCompletion(fd);
Context::CurrEventScope().stats->cliconn.count();
return 0;
}
return ret;
} }
/** @brief Set up socket listen queue. */ /** @brief Set up socket listen queue. */
...@@ -340,47 +383,25 @@ inline int lfListen(int fd, int backlog) { ...@@ -340,47 +383,25 @@ inline int lfListen(int fd, int backlog) {
int ret = listen(fd, backlog); int ret = listen(fd, backlog);
if (ret < 0) return ret; if (ret < 0) return ret;
// register SOCK_STREAM server fd only after 'listen' system call (cf. socket/connect) // register SOCK_STREAM server fd only after 'listen' system call (cf. socket/connect)
Context::CurrEventScope().registerFD<true,false,false,true>(fd); Context::CurrEventScope().registerServerFD(fd);
return 0; return ret;
} }
/** @brief Accept new connection. New file descriptor registered for I/O events. */ /** @brief Accept new connection. New file descriptor registered for I/O events. */
inline int lfAccept(int fd, sockaddr *addr, socklen_t *addrlen, int flags = 0) { inline int lfAccept(int fd, sockaddr *addr, socklen_t *addrlen, int flags = 0) {
int ret = Context::CurrEventScope().syncIO<true,false>(accept4, fd, addr, addrlen, flags | SOCK_NONBLOCK); int ret = Context::CurrEventScope().syncIO<true,false>(accept4, fd, addr, addrlen, flags | SOCK_NONBLOCK);
if (ret >= 0) { if (ret < 0) return ret;
Context::CurrEventScope().stats->srvconn.count(); Context::CurrEventScope().stats->srvconn.count();
Context::CurrEventScope().registerFD<true,true,true,false>(ret); Context::CurrEventScope().registerFD(ret);
}
return ret; return ret;
} }
/** @brief Nonblocking accept for listen queue draining. New file descriptor registered for I/O events. */ /** @brief Nonblocking accept for listen queue draining. New file descriptor registered for I/O events. */
inline int lfTryAccept(int fd, sockaddr *addr, socklen_t *addrlen, int flags = 0) { inline int lfTryAccept(int fd, sockaddr *addr, socklen_t *addrlen, int flags = 0) {
int ret = accept4(fd, addr, addrlen, flags | SOCK_NONBLOCK); int ret = accept4(fd, addr, addrlen, flags | SOCK_NONBLOCK);
if (ret >= 0) { if (ret < 0) return ret;
Context::CurrEventScope().stats->srvconn.count(); Context::CurrEventScope().stats->srvconn.count();
Context::CurrEventScope().registerFD<true,true,true,false>(ret); Context::CurrEventScope().registerFD(ret);
}
return ret;
}
/** @brief Create new connection. */
inline int lfConnect(int fd, const sockaddr *addr, socklen_t addrlen) {
int ret = connect(fd, addr, addrlen);
if (ret >= 0) {
Context::CurrEventScope().registerFD<true,true,true,false>(fd); // register lazily
Context::CurrEventScope().stats->cliconn.count();
} else if (_SysErrno() == EINPROGRESS) {
Context::CurrEventScope().registerFD<true,true,false,false>(fd); // register immediately
Context::CurrEventScope().block<false>(fd); // wait for connect to complete
socklen_t sz = sizeof(ret);
SYSCALL(getsockopt(fd, SOL_SOCKET, SO_ERROR, &ret, &sz));
RASSERT(ret == 0, ret);
#if TESTING_LAZY_FD_REGISTRATION
Context::CurrEventScope().deregisterFD<true>(fd); // revert to lazy registration
#endif
Context::CurrEventScope().stats->cliconn.count();
}
return ret; return ret;
} }
...@@ -388,7 +409,8 @@ inline int lfConnect(int fd, const sockaddr *addr, socklen_t addrlen) { ...@@ -388,7 +409,8 @@ inline int lfConnect(int fd, const sockaddr *addr, socklen_t addrlen) {
/** @brief Clone file descriptor. */ /** @brief Clone file descriptor. */
inline int lfDup(int fd) { inline int lfDup(int fd) {
int ret = dup(fd); int ret = dup(fd);
if (ret >= 0) Context::CurrEventScope().registerFD<true,true,true,false>(ret); if (ret < 0) return ret;
Context::CurrEventScope().registerFD(ret);
return ret; return ret;
} }
......
...@@ -37,7 +37,7 @@ function run_memcached() { ...@@ -37,7 +37,7 @@ function run_memcached() {
while [ -f memcached.running ]; do while [ -f memcached.running ]; do
taskset -c $svbot-$svtop memcached/memcached -t $count -b 16384 -c 32768 -m 10240 -o hashpower=24 taskset -c $svbot-$svtop memcached/memcached -t $count -b 16384 -c 32768 -m 10240 -o hashpower=24
sleep 1 sleep 1
done | tee memcached.server.out & done | tee memcached.server.$1.out &
for ((i=0;i<5;i+=1)); do for ((i=0;i<5;i+=1)); do
sleep 3 sleep 3
mutilate -s0 -r 100000 -K fb_key -V fb_value --loadonly mutilate -s0 -r 100000 -K fb_key -V fb_value --loadonly
...@@ -50,7 +50,7 @@ function run_memcached() { ...@@ -50,7 +50,7 @@ function run_memcached() {
sleep 1 sleep 1
exit 1 exit 1
} }
done | tee memcached.client.out done | tee memcached.client.$1.out
rm -f memcached.running rm -f memcached.running
} }
...@@ -60,19 +60,38 @@ function prep_0() { ...@@ -60,19 +60,38 @@ function prep_0() {
function run_0() { function run_0() {
./apps/threadtest || exit 1 ./apps/threadtest || exit 1
run_memcached run_memcached 0
} }
function prep_1() { function prep_1() {
sed -i -e 's/.*TESTING_PROCESSOR_POLLER.*/#define TESTING_PROCESSOR_POLLER 1/' sed -i -e 's/.*TESTING_PROCESSOR_POLLER.*/#define TESTING_PROCESSOR_POLLER 1/' src/runtime-glue/testoptions.h
echo memcached echo memcached
} }
function run_1() { function run_1() {
run_memcached run_memcached 1
} }
for ((e=0;e<2;e+=1)); do function prep_2() {
sed -i -e 's/.*TESTING_LAZY_FD_REGISTRATION.*/#undef TESTING_LAZY_FD_REGISTRATION/' src/runtime-glue/testoptions.h
echo memcached
}
function run_2() {
run_memcached 2
}
function prep_3() {
sed -i -e 's/.*TESTING_LAZY_FD_REGISTRATION.*/#undef TESTING_LAZY_FD_REGISTRATION/' src/runtime-glue/testoptions.h
sed -i -e 's/.*TESTING_PROCESSOR_POLLER.*/#define TESTING_PROCESSOR_POLLER 1/' src/runtime-glue/testoptions.h
echo memcached
}
function run_3() {
run_memcached 3
}
for ((e=0;e<4;e+=1)); do
addon=$(prep_$e) addon=$(prep_$e)
pre $addon pre $addon
run_$e run_$e
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment