cmogstored.git  about / heads / tags
alternative mogstored implementation for MogileFS
blob 16f0d7a44423305b3fd5800ecc167a7b72ac0683 2223 bytes (raw)
$ git show HEAD:queue_loop.c	# shows this blob on the CLI

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
 
/*
 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
 */
#include "cmogstored.h"

static struct mog_fd *queue_xchg_maybe(struct mog_queue *q, struct mog_fd *mfd)
{
	/*
	 * idle, just-ready clients are the most important
	 * We use a zero timeout here since epoll_wait() is
	 * optimizes for the non-blocking case.
	 */
	struct mog_fd *recent_mfd = mog_idleq_wait(q, 0);

	if (recent_mfd) {
		/*
		 * We got a more important client, push
		 * active_mfd into the active queue for another
		 * thread to service while we service a more
		 * recently-active client.
		 */
		mog_activeq_push(q, mfd);
		return recent_mfd;
	}

	/*
	 * keep processing the currently-active mfd in this thread
	 * if no new work came up
	 */
	return mfd;
}

/* passed as a start_routine to pthread_create */
void * mog_queue_loop(void *arg)
{
	struct mog_queue *q = arg;
	struct mog_fd *mfd = NULL;

	syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread ready",
	       (unsigned long)pthread_self());

	for (;;) {
		while (mfd == NULL)
			mfd = mog_idleq_wait(q, -1);
		switch (mog_queue_step(mfd)) {
		case MOG_NEXT_ACTIVE:
			mfd = queue_xchg_maybe(q, mfd);
			break;
		case MOG_NEXT_WAIT_RD:
			mfd = mog_queue_xchg(q, mfd, MOG_QEV_RD);
			break;
		case MOG_NEXT_WAIT_WR:
			mfd = mog_queue_xchg(q, mfd, MOG_QEV_WR);
			break;
		case MOG_NEXT_IGNORE:
		case MOG_NEXT_CLOSE:
			/* already hanndled */
			mfd = mog_idleq_wait(q, -1);
		}
	}

	return NULL;
}

static void queue_quit_step(struct mog_fd *mfd)
{
	switch (mfd->fd_type) {
	case MOG_FD_TYPE_MGMT: mog_mgmt_quit_step(mfd); return;
	case MOG_FD_TYPE_HTTP:
	case MOG_FD_TYPE_HTTPGET:
		mog_http_quit_step(mfd); return;
	case MOG_FD_TYPE_FILE:
	case MOG_FD_TYPE_QUEUE:
	case MOG_FD_TYPE_SVC:
		assert(0 && "invalid fd_type in queue_quit_step");
	default:
		break;
	}
}

/* called at shutdown when only one thread is active */
void mog_queue_quit_loop(struct mog_queue *queue)
{
	struct mog_fd *mfd;

	while (mog_nr_active_at_quit) {
		assert(mog_nr_active_at_quit <= (size_t)INT_MAX
		       && "mog_nr_active_at_quit underflow");

		if ((mfd = mog_idleq_wait_intr(queue, -1)))
			queue_quit_step(mfd);
	}
}

git clone https://yhbt.net/cmogstored.git