Commit 01b7e76b authored by Martin Karsten's avatar Martin Karsten
Browse files

- port to libfibre: cfibre, socket, sleep APIs (commit 484c5f16f71a619534b26341d3d69a61e8a4dd48)

- generic user-level threading shim layer
- script 'configure2.sh' to patch Makefile for libfibre
- dedicated set of worker pthreads; optional cpu-core affinity
- support for multiple libfibre clusters, default disabled
- re-enable stop-the-world hash table expansion
- re-enable application-level timeout mechanism
- re-enable stats aggregation
- disable mutex for thread-local stats
- re-introduce 'nreqs' yielding parameter
parent 64bc63a5
# Preface
This is a variation of the original Memcached repository. This version has
been changed to support a thread-per-session execution model.
# Memcached
Memcached is a high performance multithreaded event-based key/value cache
......
......@@ -23,10 +23,9 @@
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <pthread.h>
static pthread_cond_t maintenance_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t maintenance_lock = PTHREAD_MUTEX_INITIALIZER;
static uthread_cond_t maintenance_cond;
static uthread_mutex_t maintenance_lock;
typedef uint32_t ub4; /* unsigned 4-byte quantities */
typedef unsigned char ub1; /* unsigned 1-byte quantities */
......@@ -56,6 +55,8 @@ static bool expanding = false;
static unsigned int expand_bucket = 0;
void assoc_init(const int hashtable_init) {
uthread_cond_init(&maintenance_cond, NULL);
uthread_mutex_init(&maintenance_lock, NULL);
if (hashtable_init) {
hashpower = hashtable_init;
}
......@@ -119,7 +120,6 @@ static item** _hashitem_before (const char *key, const size_t nkey, const uint32
/* grows the hashtable to the next power of 2. */
static void assoc_expand(void) {
return;
old_hashtable = primary_hashtable;
primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
if (primary_hashtable) {
......@@ -140,11 +140,11 @@ static void assoc_expand(void) {
}
void assoc_start_expand(uint64_t curr_items) {
if (pthread_mutex_trylock(&maintenance_lock) == 0) {
if (uthread_mutex_trylock(&maintenance_lock) == 0) {
if (curr_items > (hashsize(hashpower) * 3) / 2 && hashpower < HASHPOWER_MAX) {
pthread_cond_signal(&maintenance_cond);
uthread_cond_signal(&maintenance_cond);
}
pthread_mutex_unlock(&maintenance_lock);
uthread_mutex_unlock(&maintenance_lock);
}
}
......@@ -232,7 +232,7 @@ static void *assoc_maintenance_thread(void *arg) {
}
} else {
usleep(10*1000);
uthread_usleep(10*1000);
}
if (item_lock) {
......@@ -243,7 +243,7 @@ static void *assoc_maintenance_thread(void *arg) {
if (!expanding) {
/* We are done expanding.. just wait for next invocation */
pthread_cond_wait(&maintenance_cond, &maintenance_lock);
uthread_cond_wait(&maintenance_cond, &maintenance_lock);
/* assoc_expand() swaps out the hash table entirely, so we need
* all threads to not hold any references related to the hash
* table while this happens.
......@@ -252,9 +252,9 @@ static void *assoc_maintenance_thread(void *arg) {
* wait times.
*/
if (do_run_maintenance_thread) {
// pause_threads(PAUSE_ALL_THREADS);
uthread_pause(uthread_cluster_self());
assoc_expand();
// pause_threads(RESUME_ALL_THREADS);
uthread_resume(uthread_cluster_self());
}
}
}
......@@ -262,7 +262,7 @@ static void *assoc_maintenance_thread(void *arg) {
return NULL;
}
static pthread_t maintenance_tid;
static uthread_t maintenance_tid;
int start_assoc_maintenance_thread() {
int ret;
......@@ -273,23 +273,28 @@ int start_assoc_maintenance_thread() {
hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
}
}
if ((ret = pthread_create(&maintenance_tid, NULL,
uthread_attr_t attr;
uthread_attr_init(&attr);
uthread_attr_setcluster(&attr, maint_cluster);
uthread_attr_setbackground(&attr, 1);
if ((ret = uthread_create(&maintenance_tid, &attr,
assoc_maintenance_thread, NULL)) != 0) {
fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
uthread_attr_destroy(&attr);
return -1;
}
uthread_attr_destroy(&attr);
return 0;
}
void stop_assoc_maintenance_thread() {
mutex_lock(&maintenance_lock);
do_run_maintenance_thread = 0;
pthread_cond_signal(&maintenance_cond);
uthread_cond_signal(&maintenance_cond);
mutex_unlock(&maintenance_lock);
/* Wait for the maintenance thread to stop */
pthread_join(maintenance_tid, NULL);
uthread_join(maintenance_tid, NULL);
}
struct assoc_iterator {
......
......@@ -23,7 +23,7 @@ cache_t* cache_create(const char *name, size_t bufsize, size_t align,
char* nm = strdup(name);
void** ptr = calloc(initial_pool_size, sizeof(void*));
if (ret == NULL || nm == NULL || ptr == NULL ||
pthread_mutex_init(&ret->mutex, NULL) == -1) {
uthread_mutex_init(&ret->mutex, NULL) == -1) {
free(ret);
free(nm);
free(ptr);
......@@ -46,9 +46,9 @@ cache_t* cache_create(const char *name, size_t bufsize, size_t align,
}
void cache_set_limit(cache_t *cache, int limit) {
pthread_mutex_lock(&cache->mutex);
uthread_mutex_lock(&cache->mutex);
cache->limit = limit;
pthread_mutex_unlock(&cache->mutex);
uthread_mutex_unlock(&cache->mutex);
}
static inline void* get_object(void *ptr) {
......@@ -70,15 +70,15 @@ void cache_destroy(cache_t *cache) {
}
free(cache->name);
free(cache->ptr);
pthread_mutex_destroy(&cache->mutex);
uthread_mutex_destroy(&cache->mutex);
free(cache);
}
void* cache_alloc(cache_t *cache) {
void *ret;
pthread_mutex_lock(&cache->mutex);
uthread_mutex_lock(&cache->mutex);
ret = do_cache_alloc(cache);
pthread_mutex_unlock(&cache->mutex);
uthread_mutex_unlock(&cache->mutex);
return ret;
}
......@@ -119,9 +119,9 @@ void* do_cache_alloc(cache_t *cache) {
}
void cache_free(cache_t *cache, void *ptr) {
pthread_mutex_lock(&cache->mutex);
uthread_mutex_lock(&cache->mutex);
do_cache_free(cache, ptr);
pthread_mutex_unlock(&cache->mutex);
uthread_mutex_unlock(&cache->mutex);
}
void do_cache_free(cache_t *cache, void *ptr) {
......
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#ifndef CACHE_H
#define CACHE_H
#include <pthread.h>
#include "uthread.h"
#ifndef NDEBUG
/* may be used for debug purposes */
......@@ -34,7 +34,7 @@ typedef void cache_destructor_t(void* obj, void* notused);
*/
typedef struct {
/** Mutex to protect access to the structure */
pthread_mutex_t mutex;
uthread_mutex_t mutex;
/** Name of the cache objects in this cache (provided by the caller) */
char *name;
/** List of pointers to available buffers in this cache */
......
#!/bin/sh
BASEDIR=$(dirname "$0")
if [ "$1" = "skiplib" ]; then
skiplib=skiplib
shift
fi
[ -z "$1" ] && {
echo "usage: $0 [skiplib] <path to libfibre> [<path to liburing>]"
exit 1
}
cd $1
PTLF=$(pwd)
cd -
shift
if [ "$(uname)" = "Linux" ]; then
SED=sed
MAKE=make
count=$(nproc)
FLIBS="-lfibre"
if [ -z "$1" ]; then
[ -f /usr/include/liburing.h ] && FLIBS="$FLIBS -luring"
else
[ -f $1/liburing.so ] && FLIBS="$FLIBS -Wl,-rpath=$1 -luring"
shift
fi
else
SED=gsed
MAKE=gmake
count=$(sysctl kern.smp.cpus|cut -c16- || echo 1)
FLIBS="-lfibre -lexecinfo"
fi
[ "$skiplib" = "skiplib" ] || $MAKE -j $count -C $PTLF lib
LDFLAGS="-L $PTLF/src -Wl,-rpath=$PTLF/src" LIBS="$FLIBS" \
CFLAGS="-I$PTLF/src -g -O2 -fno-omit-frame-pointer -DUSE_LIBFIBRE=1" $BASEDIR/configure $*
$SED -i -e "s|-levent ||" Makefile
......@@ -95,8 +95,8 @@ static crawler crawlers[LARGEST_ID];
static int crawler_count = 0;
static volatile int do_run_lru_crawler_thread = 0;
static int lru_crawler_initialized = 0;
static pthread_mutex_t lru_crawler_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t lru_crawler_cond = PTHREAD_COND_INITIALIZER;
static uthread_mutex_t lru_crawler_lock;
static uthread_cond_t lru_crawler_cond;
#ifdef EXTSTORE
/* TODO: pass this around */
static void *storage;
......@@ -140,36 +140,36 @@ static int crawler_expired_init(crawler_module_t *cm, void *data) {
return -1;
}
// init lock.
pthread_mutex_init(&d->lock, NULL);
uthread_mutex_init(&d->lock, NULL);
d->is_external = false;
d->start_time = current_time;
cm->data = d;
}
pthread_mutex_lock(&d->lock);
uthread_mutex_lock(&d->lock);
memset(&d->crawlerstats, 0, sizeof(crawlerstats_t) * POWER_LARGEST);
for (int x = 0; x < POWER_LARGEST; x++) {
d->crawlerstats[x].start_time = current_time;
d->crawlerstats[x].run_complete = false;
}
pthread_mutex_unlock(&d->lock);
uthread_mutex_unlock(&d->lock);
return 0;
}
static void crawler_expired_doneclass(crawler_module_t *cm, int slab_cls) {
struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
pthread_mutex_lock(&d->lock);
uthread_mutex_lock(&d->lock);
d->crawlerstats[slab_cls].end_time = current_time;
d->crawlerstats[slab_cls].run_complete = true;
pthread_mutex_unlock(&d->lock);
uthread_mutex_unlock(&d->lock);
}
static void crawler_expired_finalize(crawler_module_t *cm) {
struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
pthread_mutex_lock(&d->lock);
uthread_mutex_lock(&d->lock);
d->end_time = current_time;
d->crawl_complete = true;
pthread_mutex_unlock(&d->lock);
uthread_mutex_unlock(&d->lock);
if (!d->is_external) {
free(d);
......@@ -181,7 +181,7 @@ static void crawler_expired_finalize(crawler_module_t *cm) {
*/
static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv, int i) {
struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
pthread_mutex_lock(&d->lock);
uthread_mutex_lock(&d->lock);
crawlerstats_t *s = &d->crawlerstats[i];
int is_flushed = item_is_flushed(search);
#ifdef EXTSTORE
......@@ -232,7 +232,7 @@ static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv
}
}
}
pthread_mutex_unlock(&d->lock);
uthread_mutex_unlock(&d->lock);
}
static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, int i) {
......@@ -342,7 +342,7 @@ static void lru_crawler_class_done(int i) {
do_item_unlinktail_q((item *)&crawlers[i]);
do_item_stats_add_crawl(i, crawlers[i].reclaimed,
crawlers[i].unfetched, crawlers[i].checked);
pthread_mutex_unlock(&lru_locks[i]);
uthread_mutex_unlock(&lru_locks[i]);
if (active_crawler_mod.mod->doneclass != NULL)
active_crawler_mod.mod->doneclass(&active_crawler_mod, i);
}
......@@ -363,14 +363,14 @@ static void item_crawl_hash(void) {
if (it == NULL) {
// - sleep bits from orig loop
if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) {
pthread_mutex_unlock(&lru_crawler_lock);
uthread_mutex_unlock(&lru_crawler_lock);
usleep(settings.lru_crawler_sleep);
pthread_mutex_lock(&lru_crawler_lock);
uthread_mutex_lock(&lru_crawler_lock);
crawls_persleep = settings.crawls_persleep;
} else if (!settings.lru_crawler_sleep) {
// TODO: only cycle lock every N?
pthread_mutex_unlock(&lru_crawler_lock);
pthread_mutex_lock(&lru_crawler_lock);
uthread_mutex_unlock(&lru_crawler_lock);
uthread_mutex_lock(&lru_crawler_lock);
}
continue;
}
......@@ -407,13 +407,13 @@ static void *item_crawler_thread(void *arg) {
int i;
int crawls_persleep = settings.crawls_persleep;
pthread_mutex_lock(&lru_crawler_lock);
pthread_cond_signal(&lru_crawler_cond);
uthread_mutex_lock(&lru_crawler_lock);
uthread_cond_signal(&lru_crawler_cond);
settings.lru_crawler = true;
if (settings.verbose > 2)
fprintf(stderr, "Starting LRU crawler background thread\n");
while (do_run_lru_crawler_thread) {
pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
uthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
if (crawler_count == -1) {
item_crawl_hash();
......@@ -439,7 +439,7 @@ static void *item_crawler_thread(void *arg) {
lru_crawler_class_done(i);
continue;
}
pthread_mutex_lock(&lru_locks[i]);
uthread_mutex_lock(&lru_locks[i]);
search = do_item_crawl_q((item *)&crawlers[i]);
if (search == NULL ||
(crawlers[i].remaining && --crawlers[i].remaining < 1)) {
......@@ -453,7 +453,7 @@ static void *item_crawler_thread(void *arg) {
* other callers can incr the refcount
*/
if ((hold_lock = item_trylock(hv)) == NULL) {
pthread_mutex_unlock(&lru_locks[i]);
uthread_mutex_unlock(&lru_locks[i]);
continue;
}
/* Now see if the item is refcount locked */
......@@ -461,7 +461,7 @@ static void *item_crawler_thread(void *arg) {
refcount_decr(search);
if (hold_lock)
item_trylock_unlock(hold_lock);
pthread_mutex_unlock(&lru_locks[i]);
uthread_mutex_unlock(&lru_locks[i]);
continue;
}
......@@ -470,7 +470,7 @@ static void *item_crawler_thread(void *arg) {
/* Interface for this could improve: do the free/decr here
* instead? */
if (!active_crawler_mod.mod->needs_lock) {
pthread_mutex_unlock(&lru_locks[i]);
uthread_mutex_unlock(&lru_locks[i]);
}
active_crawler_mod.mod->eval(&active_crawler_mod, search, hv, i);
......@@ -478,18 +478,18 @@ static void *item_crawler_thread(void *arg) {
if (hold_lock)
item_trylock_unlock(hold_lock);
if (active_crawler_mod.mod->needs_lock) {
pthread_mutex_unlock(&lru_locks[i]);
uthread_mutex_unlock(&lru_locks[i]);
}
if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) {
pthread_mutex_unlock(&lru_crawler_lock);
usleep(settings.lru_crawler_sleep);
pthread_mutex_lock(&lru_crawler_lock);
uthread_mutex_unlock(&lru_crawler_lock);
uthread_usleep(settings.lru_crawler_sleep);
uthread_mutex_lock(&lru_crawler_lock);
crawls_persleep = settings.crawls_persleep;
} else if (!settings.lru_crawler_sleep) {
// TODO: only cycle lock every N?
pthread_mutex_unlock(&lru_crawler_lock);
pthread_mutex_lock(&lru_crawler_lock);
uthread_mutex_unlock(&lru_crawler_lock);
uthread_mutex_lock(&lru_crawler_lock);
}
}
} // while
......@@ -515,7 +515,7 @@ static void *item_crawler_thread(void *arg) {
stats_state.lru_crawler_running = false;
STATS_UNLOCK();
}
pthread_mutex_unlock(&lru_crawler_lock);
uthread_mutex_unlock(&lru_crawler_lock);
if (settings.verbose > 2)
fprintf(stderr, "LRU crawler thread stopping\n");
settings.lru_crawler = false;
......@@ -523,19 +523,19 @@ static void *item_crawler_thread(void *arg) {
return NULL;
}
static pthread_t item_crawler_tid;
static uthread_t item_crawler_tid;
int stop_item_crawler_thread(bool wait) {
int ret;
pthread_mutex_lock(&lru_crawler_lock);
uthread_mutex_lock(&lru_crawler_lock);
if (do_run_lru_crawler_thread == 0) {
pthread_mutex_unlock(&lru_crawler_lock);
uthread_mutex_unlock(&lru_crawler_lock);
return 0;
}
do_run_lru_crawler_thread = 0;
pthread_cond_signal(&lru_crawler_cond);
pthread_mutex_unlock(&lru_crawler_lock);
if (wait && (ret = pthread_join(item_crawler_tid, NULL)) != 0) {
uthread_cond_signal(&lru_crawler_cond);
uthread_mutex_unlock(&lru_crawler_lock);
if (wait && (ret = uthread_join(item_crawler_tid, NULL)) != 0) {
fprintf(stderr, "Failed to stop LRU crawler thread: %s\n", strerror(ret));
return -1;
}
......@@ -558,18 +558,23 @@ int start_item_crawler_thread(void) {
if (settings.lru_crawler)
return -1;
pthread_mutex_lock(&lru_crawler_lock);
uthread_mutex_lock(&lru_crawler_lock);
do_run_lru_crawler_thread = 1;
if ((ret = pthread_create(&item_crawler_tid, NULL,
uthread_attr_t attr;
uthread_attr_init(&attr);
uthread_attr_setcluster(&attr, maint_cluster);
uthread_attr_setbackground(&attr, 1);
if ((ret = uthread_create(&item_crawler_tid, &attr,
item_crawler_thread, NULL)) != 0) {
fprintf(stderr, "Can't create LRU crawler thread: %s\n",
strerror(ret));
pthread_mutex_unlock(&lru_crawler_lock);
uthread_mutex_unlock(&lru_crawler_lock);
return -1;
}
uthread_attr_destroy(&attr);
/* Avoid returning until the crawler has actually started */
pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
pthread_mutex_unlock(&lru_crawler_lock);
uthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
uthread_mutex_unlock(&lru_crawler_lock);
return 0;
}
......@@ -581,7 +586,7 @@ static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
uint32_t sid = id;
int starts = 0;
pthread_mutex_lock(&lru_locks[sid]);
uthread_mutex_lock(&lru_locks[sid]);
if (crawlers[sid].it_flags == 0) {
if (settings.verbose > 2)
fprintf(stderr, "Kicking LRU crawler off for LRU %u\n", sid);
......@@ -611,7 +616,7 @@ static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
crawler_count++;
starts++;
}
pthread_mutex_unlock(&lru_locks[sid]);
uthread_mutex_unlock(&lru_locks[sid]);
return starts;
}
......@@ -636,30 +641,30 @@ int lru_crawler_start(uint8_t *ids, uint32_t remaining,
int starts = 0;
bool is_running;
static rel_time_t block_ae_until = 0;
pthread_mutex_lock(&lru_crawler_lock);
uthread_mutex_lock(&lru_crawler_lock);
STATS_LOCK();
is_running = stats_state.lru_crawler_running;
STATS_UNLOCK();
if (do_run_lru_crawler_thread == 0) {
pthread_mutex_unlock(&lru_crawler_lock);
uthread_mutex_unlock(&lru_crawler_lock);
return -2;
}
if (is_running &&
!(type == CRAWLER_AUTOEXPIRE && active_crawler_type == CRAWLER_AUTOEXPIRE)) {
pthread_mutex_unlock(&lru_crawler_lock);
uthread_mutex_unlock(&lru_crawler_lock);
block_ae_until = current_time + 60;
return -1;
}
if (type == CRAWLER_AUTOEXPIRE && block_ae_until > current_time) {
pthread_mutex_unlock(&lru_crawler_lock);
uthread_mutex_unlock(&lru_crawler_lock);
return -1;
}
/* hash table walk only supported with metadump for now. */
if (type != CRAWLER_METADUMP && ids == NULL) {
pthread_mutex_unlock(&lru_crawler_lock);
uthread_mutex_unlock(&lru_crawler_lock);
return -2;
}
......@@ -673,11 +678,11 @@ int lru_crawler_start(uint8_t *ids, uint32_t remaining,
}
if (active_crawler_mod.mod->needs_client) {
if (c == NULL || sfd == 0) {
pthread_mutex_unlock(&lru_crawler_lock);
uthread_mutex_unlock(&lru_crawler_lock);
return -2;
}
if (lru_crawler_set_client(&active_crawler_mod, c, sfd) != 0) {
pthread_mutex_unlock(&lru_crawler_lock);
uthread_mutex_unlock(&lru_crawler_lock);
return -2;
}
}
......@@ -702,9 +707,9 @@ int lru_crawler_start(uint8_t *ids, uint32_t remaining,
stats_state.lru_crawler_running = true;
stats.lru_crawler_starts++;
STATS_UNLOCK();
pthread_cond_signal(&lru_crawler_cond);
uthread_cond_signal(&lru_crawler_cond);
}
pthread_mutex_unlock(&lru_crawler_lock);
uthread_mutex_unlock(&lru_crawler_lock);
return starts;
}
......@@ -760,6 +765,11 @@ int init_lru_crawler(void *arg) {
#ifdef EXTSTORE
storage = arg;
#endif
if (uthread_cond_init(&lru_crawler_cond, NULL) != 0) {
fprintf(stderr, "Can't initialize lru crawler condition\n");
return -1;
}
uthread_mutex_init(&lru_crawler_lock, NULL);
active_crawler_mod.c.c = NULL;
active_crawler_mod.mod = NULL;
active_crawler_mod.data = NULL;
......
......@@ -15,7 +15,7 @@ typedef struct {
} crawlerstats_t;
struct crawler_expired_data {
pthread_mutex_t lock;
uthread_mutex_t lock;
crawlerstats_t crawlerstats[POWER_LARGEST];
/* redundant with crawlerstats_t so we can get overall start/stop/done */
rel_time_t start_time;
......
......@@ -6,7 +6,6 @@
// end FIXME
#include <stdlib.h>
#include <limits.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/uio.h>
......@@ -17,6 +16,7 @@
#include <assert.h>
#include "extstore.h"
#include "config.h"
#include "uthread.h"
// TODO: better if an init option turns this on/off.
#ifdef EXTSTORE_DEBUG
......@@ -28,20 +28,22 @@
#define E_DEBUG(...)
#endif
#define STAT_L(e) pthread_mutex_lock(&e->stats_mutex);
#define STAT_UL(e) pthread_mutex_unlock(&e->stats_mutex);
#define STAT_L(e) uthread_mutex_lock(&e->stats_mutex);
#define STAT_UL(e) uthread_mutex_unlock(&e->stats_mutex);
#define STAT_INCR(e, stat, amount) { \
pthread_mutex_lock(&e->stats_mutex); \
uthread_mutex_lock(&e->stats_mutex); \
e->stats.stat += amount; \
pthread_mutex_unlock(&e->stats_mutex); \
uthread_mutex_unlock(&e->stats_mutex); \
}
#define STAT_DECR(e, stat, amount) { \
pthread_mutex_lock(&e->stats_mutex); \
uthread_mutex_lock(&e->stats_mutex); \
e->stats.stat -= amount; \
pthread_mutex_unlock(&e->stats_mutex); \
uthread_mutex_unlock(&e->stats_mutex); \
}
extern uthread_cluster_t maint_cluster;
typedef struct __store_wbuf {
struct __store_wbuf *next;
char *buf;
......@@ -54,7 +56,7 @@ typedef struct __store_wbuf {
} _store_wbuf;
typedef struct _store_page {
pthread_mutex_t mutex; /* Need to be held for most operations */
uthread_mutex_t mutex; /* Need to be held for most operations */
uint64_t obj_count; /* _delete can decrease post-closing */
uint64_t bytes_used; /* _delete can decrease post-closing */
uint64_t offset; /* starting address of page within fd */
......@@ -75,21 +77,21 @@ typedef struct _store_page {
typedef struct store_engine store_engine;
typedef struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
uthread_mutex_t mutex;
uthread_cond_t cond;
obj_io