cmogstored.git  about / heads / tags
alternative mogstored implementation for MogileFS
blob 9525a77c568e25dc2494d8cf1eba8b97923a80e1 7808 bytes (raw)
$ git show wip-1.3:svc.c	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
 
/*
 * Copyright (C) 2012-2013, Eric Wong <normalperson@yhbt.net>
 * License: GPLv3 or later (see COPYING for details)
 */

#define _GNU_SOURCE 1 /* needed for _ATFILE_SOURCE on glibc 2.5 - 2.9 */
#include <dirent.h> /* needed for FreeBSD */
#include "cmogstored.h"

/* same default as MogileFS upstream */
static pthread_mutex_t svc_lock = PTHREAD_MUTEX_INITIALIZER;
static Hash_table *by_docroot; /* enforce one mog_svc per docroot: */
static mode_t mog_umask;

/*
 * maintain an internal queue of requests for the "server aio_threads = N"
 * command in the side channel.  The worker handling the client request must
 * tell the main thread do change thread counts asynchronously (because
 * the worker thread handling the request may die from a thread count
 * reduction, so we have a worker thread make a fire-and-forget request
 * to the notify thread.
 */
static pthread_mutex_t aio_threads_lock = PTHREAD_MUTEX_INITIALIZER;
static SIMPLEQ_HEAD(sq, mog_svc) aio_threads_qhead =
				SIMPLEQ_HEAD_INITIALIZER(aio_threads_qhead);

static void svc_free(void *ptr)
{
	struct mog_svc *svc = ptr;

	if (closedir(svc->dir) < 0)
		syslog(LOG_ERR, "closedir(%s) failed with: %m", svc->docroot);
	CHECK(int, 0, pthread_mutex_destroy(&svc->devstats_lock));
	mog_free(svc->docroot);
	if (svc->by_st_dev)
		hash_free(svc->by_st_dev);
	if (svc->by_mog_devid)
		hash_free(svc->by_mog_devid);
	free(svc);
}

static size_t svc_hash(const void *x, size_t tablesize)
{
	const struct mog_svc *svc = x;

	return hash_string(svc->docroot, tablesize);
}

static bool svc_cmp(const void *a, const void *b)
{
	const struct mog_svc *svc_a = a;
	const struct mog_svc *svc_b = b;

	return strcmp(svc_a->docroot, svc_b->docroot) == 0;
}

static void svc_atexit(void) /* called atexit */
{
	hash_free(by_docroot);
}

static void svc_once(void)
{
	by_docroot = hash_initialize(7, NULL, svc_hash, svc_cmp, svc_free);
	mog_oom_if_null(by_docroot);

	mog_umask = umask(0);
	umask(mog_umask);
	atexit(svc_atexit);
}

struct mog_svc * mog_svc_new(const char *docroot)
{
	struct mog_svc *svc;
	DIR *dir;
	int fd;

	if (!docroot) docroot = MOG_DEFAULT_DOCROOT;

	docroot = mog_canonpath_die(docroot, CAN_EXISTING);

	dir = opendir(docroot);
	if (dir == NULL) {
		syslog(LOG_ERR, "opendir(%s) failed with: %m", docroot);
		mog_free(docroot);
		return NULL;
	}

	fd = dirfd(dir);
	if (fd < 0) {
		syslog(LOG_ERR, "dirfd(%s) failed with: %m", docroot);
		mog_free(docroot);
		return NULL;
	}

	CHECK(int, 0, pthread_mutex_lock(&svc_lock));

	if (!by_docroot)
		svc_once();

	svc = xzalloc(sizeof(struct mog_svc));
	svc->docroot = docroot;
	svc->docroot_fd = fd;
	svc->dir = dir;
	svc->put_perms = (~mog_umask) & 0666;
	svc->mkcol_perms = (~mog_umask) & 0777;
	svc->sem_max = svc->thr_per_dev = 10;
	svc->idle_timeout = 5;
	CHECK(int, 0, pthread_mutex_init(&svc->devstats_lock, NULL));
	CHECK(int, 0, pthread_mutex_init(&svc->by_mog_devid_lock, NULL));
	svc->by_mog_devid = hash_initialize(7, NULL, mog_dev_hash,
					mog_dev_cmp, mog_dev_free);
	mog_oom_if_null(svc->by_mog_devid);

	switch (hash_insert_if_absent(by_docroot, svc, NULL)) {
	case 0:
		svc_free(svc);
		svc = NULL;
	case 1: break;
	default: mog_oom();
	}

	CHECK(int, 0, pthread_mutex_unlock(&svc_lock));

	return svc;
}

size_t mog_svc_each(Hash_processor processor, void *data)
{
	size_t rv;

	CHECK(int, 0, pthread_mutex_lock(&svc_lock));
	rv = hash_do_for_each(by_docroot, processor, data);
	CHECK(int, 0, pthread_mutex_unlock(&svc_lock));

	return rv;
}

static bool cloexec_disable(struct mog_fd *mfd)
{
	if (mfd)
		CHECK(int, 0, mog_set_cloexec(mfd->fd, false));
	return true;
}

static bool svc_cloexec_off_i(void *svcptr, void *unused)
{
	struct mog_svc *svc = svcptr;

	return (cloexec_disable(svc->mgmt_mfd)
	        && cloexec_disable(svc->http_mfd)
	        && cloexec_disable(svc->httpget_mfd));
}

/*
 * Only call this from a freshly forked upgrade child process.
 * This holds no locks to avoid potential deadlocks in post-fork mutexes
 */
void mog_svc_upgrade_prepare(void)
{
	(void)hash_do_for_each(by_docroot, svc_cloexec_off_i, NULL);
}

/* this is only called by the main (notify) thread */
void mog_svc_thrpool_rescale(struct mog_svc *svc, size_t ndev_new)
{
	size_t nthr = ndev_new * svc->thr_per_dev;
	struct mog_thrpool *tp = &svc->queue->thrpool;

	/* always respect user setting */
	if (svc->user_set_aio_threads) {
		mog_svc_dev_user_rescale(svc, ndev_new);
		return;
	}
	/* autoscale based on (currently-fixed) svc->thr_per_dev */

	nthr = MAX(nthr, svc->thr_per_dev);

	if (svc->nmogdev)
		syslog(LOG_INFO,
			"devcount(%zu->%zu), updating server aio_threads=%zu",
			svc->nmogdev, ndev_new, nthr);

	mog_thrpool_set_size(tp, nthr);
}

/* Hash iterator function */
bool mog_svc_start_each(void *svc_ptr, void *main_ptr)
{
	struct mog_svc *svc = svc_ptr;
	struct mog_main *mog_main = main_ptr;
	struct mog_accept *ac;
	size_t athr = (size_t)num_processors(NPROC_CURRENT);
	struct mog_queue *q = mog_queue_new();
	size_t nthr = svc->nmogdev * svc->thr_per_dev;

	if (!nthr)
		nthr = svc->thr_per_dev;

	/*
	 * try to distribute accept() callers between workers more evenly
	 * with wake-one accept() behavior by trimming down on acceptors
	 * having too many acceptor threads does not make sense, these
	 * threads are only bounded by lock contention and local bus speeds.
	 * Increasing threads here just leads to lock contention inside the
	 * kernel (accept/accept4/EPOLL_CTL_ADD)
	 */
	athr = mog_main->worker_processes > 1 ? 1 : MIN(2, athr);

	svc->queue = q;
	mog_thrpool_start(&q->thrpool, nthr, mog_queue_loop, q);

	if (svc->mgmt_mfd) {
		mog_main->have_mgmt = true;
		ac = &svc->mgmt_mfd->as.accept;

		/*
		 * mgmt port is rarely used and always persistent, so it
		 * does not need multiple threads for blocking accept()
		 */
		mog_thrpool_start(&ac->thrpool, 1, mog_accept_loop, ac);
	}

	if (svc->http_mfd) {
		ac = &svc->http_mfd->as.accept;
		mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac);
	}

	if (svc->httpget_mfd) {
		ac = &svc->httpget_mfd->as.accept;
		mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac);
	}

	return true;
}

/*
 * Fire and forget, we must run the actual thread count manipulation
 * in the main notify thread because we may end up terminating the
 * thread which invoked this.
 *
 * Called by threads inside the thrpool to wake-up the main/notify thread.
 */
void mog_svc_aio_threads_enqueue(struct mog_svc *svc, size_t size)
{
	size_t prev_enq;

	CHECK(int, 0, pthread_mutex_lock(&aio_threads_lock));

	prev_enq = svc->user_req_aio_threads;
	svc->user_req_aio_threads = size;
	if (!prev_enq)
		/* put into the queue so main thread can process it */
		SIMPLEQ_INSERT_TAIL(&aio_threads_qhead, svc, qentry);

	CHECK(int, 0, pthread_mutex_unlock(&aio_threads_lock));

	/* wake up the main thread so it can process the queue */
	mog_notify(MOG_NOTIFY_AIO_THREADS);
}

/* this runs in the main (notify) thread */
void mog_svc_aio_threads_handler(void)
{
	struct mog_svc *svc;

	/* guard against requests bundled in one wakeup by looping here */
	for (;;) {
		size_t req_size = 0;

		CHECK(int, 0, pthread_mutex_lock(&aio_threads_lock));
		svc = SIMPLEQ_FIRST(&aio_threads_qhead);
		if (svc) {
			SIMPLEQ_REMOVE_HEAD(&aio_threads_qhead, qentry);
			req_size = svc->user_req_aio_threads;
			svc->user_req_aio_threads = 0;
		}
		CHECK(int, 0, pthread_mutex_unlock(&aio_threads_lock));

		/*
		 * spurious wakeup is possible since we loop here
		 * (and we must loop, see above comment)
		 */
		if (svc == NULL || req_size == 0)
			return;

		syslog(LOG_INFO, "server aio_threads=%zu", req_size);
		svc->user_set_aio_threads = req_size;
		mog_thrpool_set_size(&svc->queue->thrpool, req_size);

		/* reduce semaphore counts */
		if (svc->nmogdev)
			mog_svc_dev_user_rescale(svc, svc->nmogdev);
	}
}

git clone https://yhbt.net/cmogstored.git