cmogstored.git  about / heads / tags
alternative mogstored implementation for MogileFS
blob b3b920099d1b26263efab9dbde86c045a59d014b 8515 bytes (raw)
$ git show HEAD:svc.c	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
 
/*
 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
 */

#define _GNU_SOURCE 1 /* needed for _ATFILE_SOURCE on glibc 2.5 - 2.9 */
#include <dirent.h> /* needed for FreeBSD */
#include "cmogstored.h"

/* same default as MogileFS upstream */
static pthread_mutex_t svc_lock = PTHREAD_MUTEX_INITIALIZER;
static Hash_table *by_docroot; /* enforce one mog_svc per docroot: */
static mode_t mog_umask;

/*
 * maintain an internal queue of requests for the "server aio_threads = N"
 * command in the side channel.  The worker handling the client request must
 * tell the main thread do change thread counts asynchronously (because
 * the worker thread handling the request may die from a thread count
 * reduction, so we have a worker thread make a fire-and-forget request
 * to the notify thread.
 */
static pthread_mutex_t aio_threads_lock = PTHREAD_MUTEX_INITIALIZER;
static SIMPLEQ_HEAD(sq, mog_svc) aio_threads_qhead =
				SIMPLEQ_HEAD_INITIALIZER(aio_threads_qhead);

static void svc_free(void *ptr)
{
	struct mog_svc *svc = ptr;

	if (closedir(svc->dir) < 0)
		syslog(LOG_ERR, "closedir(%s) failed with: %m", svc->docroot);
	CHECK(int, 0, pthread_mutex_destroy(&svc->devstats_lock));
	mog_free(svc->docroot);
	if (svc->by_st_dev)
		hash_free(svc->by_st_dev);
	if (svc->by_mog_devid)
		hash_free(svc->by_mog_devid);
	free(svc);
}

static size_t svc_hash(const void *x, size_t tablesize)
{
	const struct mog_svc *svc = x;

	return hash_string(svc->docroot, tablesize);
}

static bool svc_cmp(const void *a, const void *b)
{
	const struct mog_svc *svc_a = a;
	const struct mog_svc *svc_b = b;

	return strcmp(svc_a->docroot, svc_b->docroot) == 0;
}

static void svc_atexit(void) /* called atexit */
{
	hash_free(by_docroot);
}

static void svc_once(void)
{
	by_docroot = hash_initialize(7, NULL, svc_hash, svc_cmp, svc_free);
	mog_oom_if_null(by_docroot);

	mog_umask = umask(0);
	umask(mog_umask);
	atexit(svc_atexit);
}

bool mog_svc_atfork_child(void *svc_ptr, void *parent)
{
	struct mog_svc *svc = svc_ptr;
	pid_t ppid = *((pid_t *)parent);
	const char *failfn;

	if (closedir(svc->dir) < 0) {
		failfn = "closedir";
		goto err;
	}

	svc->dir = opendir(svc->docroot);
	if (svc->dir == NULL) {
		failfn = "opendir";
		goto err;
	}

	svc->docroot_fd = dirfd(svc->dir);
	if (svc->docroot_fd < 0) {
		failfn = "dirfd";
		goto err;
	}
	return true;
err:
	syslog(LOG_ERR, "%s(%s) failed with: %m", failfn, svc->docroot);
	kill(ppid, SIGTERM);
	return false;
}

struct mog_svc * mog_svc_new(const char *docroot)
{
	struct mog_svc *svc;
	DIR *dir;
	int fd;

	if (!docroot) docroot = MOG_DEFAULT_DOCROOT;

	docroot = mog_canonpath_die(docroot, CAN_EXISTING);

	dir = opendir(docroot);
	if (dir == NULL) {
		syslog(LOG_ERR, "opendir(%s) failed with: %m", docroot);
		mog_free(docroot);
		return NULL;
	}

	fd = dirfd(dir);
	if (fd < 0) {
		syslog(LOG_ERR, "dirfd(%s) failed with: %m", docroot);
		mog_free(docroot);
		return NULL;
	}

	CHECK(int, 0, pthread_mutex_lock(&svc_lock));

	if (!by_docroot)
		svc_once();

	svc = xzalloc(sizeof(struct mog_svc));
	svc->persist_client = 1;
	svc->docroot = docroot;
	svc->docroot_fd = fd;
	svc->dir = dir;
	svc->put_perms = (~mog_umask) & 0666;
	svc->mkcol_perms = (~mog_umask) & 0777;
	svc->thr_per_dev = 10;
	svc->idle_timeout = 5;
	CHECK(int, 0, pthread_mutex_init(&svc->devstats_lock, NULL));
	CHECK(int, 0, pthread_mutex_init(&svc->by_mog_devid_lock, NULL));
	svc->by_mog_devid = hash_initialize(7, NULL, mog_dev_hash,
					mog_dev_cmp, mog_dev_free);
	mog_oom_if_null(svc->by_mog_devid);

	switch (hash_insert_if_absent(by_docroot, svc, NULL)) {
	case 0:
		svc_free(svc);
		svc = NULL;
	case 1: break;
	default: mog_oom();
	}

	CHECK(int, 0, pthread_mutex_unlock(&svc_lock));

	return svc;
}

size_t mog_svc_each(Hash_processor processor, void *data)
{
	size_t rv;

	CHECK(int, 0, pthread_mutex_lock(&svc_lock));
	rv = hash_do_for_each(by_docroot, processor, data);
	CHECK(int, 0, pthread_mutex_unlock(&svc_lock));

	return rv;
}

static bool cloexec_disable(struct mog_fd *mfd) /* vfork-safe */
{
	if (mfd)
		CHECK(int, 0, mog_set_cloexec(mfd->fd, false));
	return true;
}

static bool svc_cloexec_off_i(void *svcptr, void *unused) /* vfork-safe */
{
	struct mog_svc *svc = svcptr;

	return (cloexec_disable(svc->mgmt_mfd)
	        && cloexec_disable(svc->http_mfd)
	        && cloexec_disable(svc->httpget_mfd));
}

/*
 * Only call this from a freshly forked upgrade child process.
 * This holds no locks to avoid potential deadlocks in post-fork mutexes
 */
void mog_svc_upgrade_prepare(void) /* vfork-safe */
{
	(void)hash_do_for_each(by_docroot, svc_cloexec_off_i, NULL);
}

/* this is only called by the main (notify) thread */
void mog_svc_thrpool_rescale(struct mog_svc *svc, unsigned ndev_new)
{
	unsigned size = ndev_new * svc->thr_per_dev;
	struct mog_thrpool *tp = &svc->queue->thrpool;

	/* respect user-setting */
	if (svc->user_set_aio_threads) {
		mog_svc_dev_user_rescale(svc, ndev_new);
		if (tp->n_threads >= ndev_new)
			return;

		syslog(LOG_WARNING,
			"server aio_threads=%u is less than devcount=%u",
			tp->n_threads, ndev_new);

		return;
	}

	if (size < svc->thr_per_dev)
		size = svc->thr_per_dev;

	if (svc->nmogdev)
		syslog(LOG_INFO,
		       "devcount(%u->%u), updating server aio_threads=%u",
		       svc->nmogdev, ndev_new, size);
	mog_thrpool_set_size(tp, size);
}

/* Hash iterator function */
bool mog_svc_start_each(void *svc_ptr, void *main_ptr)
{
	struct mog_svc *svc = svc_ptr;
	struct mog_main *mog_main = main_ptr;
	struct mog_accept *ac;
	size_t athr = (size_t)num_processors(NPROC_CURRENT);
	struct mog_queue *q = mog_queue_new();
	size_t nthr = svc->nmogdev * svc->thr_per_dev;

	if (!nthr)
		nthr = svc->thr_per_dev;

	/*
	 * try to distribute accept() callers between workers more evenly
	 * with wake-one accept() behavior by trimming down on acceptors
	 * having too many acceptor threads does not make sense, these
	 * threads are only bounded by lock contention and local bus speeds.
	 * Increasing threads here just leads to lock contention inside the
	 * kernel (accept/accept4/EPOLL_CTL_ADD)
	 */
	athr = mog_main->worker_processes > 1 ? 1 : MIN(2, athr);

	svc->queue = q;
	mog_thrpool_start(&q->thrpool, nthr, mog_queue_loop, q);

	if (svc->mgmt_mfd) {
		mog_main->have_mgmt = true;
		ac = &svc->mgmt_mfd->as.accept;

		/*
		 * mgmt port is rarely used and always persistent, so it
		 * does not need multiple threads for blocking accept()
		 */
		mog_thrpool_start(&ac->thrpool, 1, mog_accept_loop, ac);
	}

	if (svc->http_mfd) {
		ac = &svc->http_mfd->as.accept;
		mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac);
	}

	if (svc->httpget_mfd) {
		ac = &svc->httpget_mfd->as.accept;
		mog_thrpool_start(&ac->thrpool, athr, mog_accept_loop, ac);
	}

	return true;
}

/*
 * Fire and forget, we must run the actual thread count manipulation
 * in the main notify thread because we may end up terminating the
 * thread which invoked this.
 *
 * Called by threads inside the thrpool to wake-up the main/notify thread.
 */
void mog_svc_aio_threads_enqueue(struct mog_svc *svc, unsigned size)
{
	unsigned prev_enq;

	CHECK(int, 0, pthread_mutex_lock(&aio_threads_lock));

	prev_enq = svc->user_req_aio_threads;
	svc->user_req_aio_threads = size;
	if (!prev_enq)
		/* put into the queue so main thread can process it */
		SIMPLEQ_INSERT_TAIL(&aio_threads_qhead, svc, qentry);

	CHECK(int, 0, pthread_mutex_unlock(&aio_threads_lock));

	/* wake up the main thread so it can process the queue */
	mog_notify(MOG_NOTIFY_AIO_THREADS);
}

/* this runs in the main (notify) thread */
void mog_svc_aio_threads_handler(void)
{
	struct mog_svc *svc;

	/* guard against requests bundled in one wakeup by looping here */
	for (;;) {
		unsigned req_size = 0;

		CHECK(int, 0, pthread_mutex_lock(&aio_threads_lock));
		svc = SIMPLEQ_FIRST(&aio_threads_qhead);
		if (svc) {
			SIMPLEQ_REMOVE_HEAD(&aio_threads_qhead, qentry);
			req_size = svc->user_req_aio_threads;
			svc->user_req_aio_threads = 0;
		}
		CHECK(int, 0, pthread_mutex_unlock(&aio_threads_lock));

		/*
		 * spurious wakeup is possible since we loop here
		 * (and we must loop, see above comment)
		 */
		if (svc == NULL || req_size == 0)
			return;

		syslog(LOG_INFO, "server aio_threads=%u", req_size);
		svc->user_set_aio_threads = req_size;
		if (svc->nmogdev)
			mog_svc_dev_user_rescale(svc, svc->nmogdev);
		mog_thrpool_set_size(&svc->queue->thrpool, req_size);
	}
}

git clone https://yhbt.net/cmogstored.git