cmogstored.git  about / heads / tags
alternative mogstored implementation for MogileFS
blob 98fa8b21d2132bb75f92527ffc44e1553354fbbb 7033 bytes (raw)
$ git show HEAD:ioq.c	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
 
/*
 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
 */
#include "cmogstored.h"
/*
 * This is a semaphore-like API with explicit queueing and activation,
 * so contended scheduling/wakeups happen via epoll/kqueue and there
 * is never blocking of threads (other than the mutex which only protects
 * small memory-only operations)
 *
 * The main operations are mog_ioq_ready and mog_ioq_next
 *
 * Flow:
 *
 *     mog_ioq_ready ---> true --> normal dispatch --> mog_ioq_next
 *          |    ^                 (mog_ioq_unblock)         |
 *          |    |                                           |
 *          |     `-------<--------\                         |
 *        false                    |                         |
 *          |                      |                         V
 *          |                      |                         |
 *          V                      |                         |
 *   SIMPLEQ_INSERT_TAIL(push)     ^                         |
 *          ||                     |                         V
 *          VV                     |                         /
 *           \\                    |                        /
 *            \\                   |                       /
 *             \\                  |                      V
 *              `===(wait for)==========>===> SIMPLEQ_{FIRST,REMOVE_HEAD}(pop)
 *                                 |                   |
 *                                 |                   V
 *                                 |                   |
 *                                 ^       add to kqueue/epoll ready list
 *                                 |                   |
 *                                 |                   V
 *                                 |                   /
 *                                 `---------<---------'
 *
 * mog_ioq_next is automatically called when a thread releases a regular file.
 */
__thread struct mog_ioq *mog_ioq_current;

void mog_ioq_init(struct mog_ioq *ioq, struct mog_svc *svc, unsigned val)
{
	ioq->cur = val;
	ioq->max = val;
	ioq->svc = svc;
	ioq->contended = false;
	SIMPLEQ_INIT(&ioq->ioq_head);
	CHECK(int, 0, pthread_mutex_init(&ioq->mtx, NULL));
}

/*
 * we do not need this, yet, but this will allow us to have multi-threaded
 * shutdown in the future (we currently drop into single-threaded mode)
 */
void mog_ioq_requeue_prepare(struct mog_ioq *ioq)
{
	assert(ioq->cur >= ioq->max &&
		"we should only get here when idle before mog_fdmap_requeue");

	SIMPLEQ_INIT(&ioq->ioq_head);
}

/*
 * this is only a hint, so no explicit memory barriers or atomics
 */
static inline void ioq_set_contended(struct mog_ioq *ioq)
{
	ioq->contended = true;
}

/*
 * This is like sem_trywait.  Each thread is only allowed to acquire
 * one ioq at once.
 *
 * If this returns false, the caller _must_ return MOG_NEXT_IGNORE to
 * prevent the mfd from being added to an epoll/kqueue watch list.
 * Adding the mfd to an epoll/kqueue watch list in the same thread/context
 * where this function returns true is a guaranteed bug.
 *
 * mfd is the client socket, not the open (regular) file
 */
bool mog_ioq_ready(struct mog_ioq *ioq, struct mog_fd *mfd)
{
	bool good;

	assert(mog_ioq_current == NULL && "already holding mog_ioq_current");
	CHECK(int, 0, pthread_mutex_lock(&ioq->mtx));

	good = ioq->cur > 0;
	if (good) {
		--ioq->cur;
		mog_ioq_current = ioq;
	} else {
		TRACE(CMOGSTORED_IOQ_BLOCKED(mfd->fd));
		mfd->ioq_blocked = 1;
		SIMPLEQ_INSERT_TAIL(&ioq->ioq_head, mfd, ioqent);
		ioq_set_contended(ioq);
	}

	CHECK(int, 0, pthread_mutex_unlock(&ioq->mtx));

	return good;
}

/*
 * analogous to sem_post, this wakes up the next waiter
 * check_ioq may be NULL, if non-NULL, it validates against mog_ioq_current
 */
void mog_ioq_next(struct mog_ioq *check_ioq)
{
	struct mog_fd *mfd = NULL;

	if (mog_ioq_current == NULL)
		return;

	CHECK(int, 0, pthread_mutex_lock(&mog_ioq_current->mtx));

	assert((check_ioq == NULL) ||
	       (check_ioq == mog_ioq_current && "ioq mismatch (tls vs check)"));

	mog_ioq_current->cur++;
	if (mog_ioq_current->cur <= mog_ioq_current->max) {
		/* wake up any waiters */
		mfd = SIMPLEQ_FIRST(&mog_ioq_current->ioq_head);
		if (mfd)
			SIMPLEQ_REMOVE_HEAD(&mog_ioq_current->ioq_head, ioqent);
	} else {
		/* mog_ioq_adjust was called and lowered our capacity */
		mog_ioq_current->cur--;
		ioq_set_contended(mog_ioq_current);
	}
	CHECK(int, 0, pthread_mutex_unlock(&mog_ioq_current->mtx));

	/* wake up the next sleeper on this queue */
	if (mfd) {
		TRACE(CMOGSTORED_IOQ_RESCHEDULE(mfd->fd));
		mog_activeq_push(mog_ioq_current->svc->queue, mfd);
	}
	/*
	 * We may not touch mfd after mog_activeq_push.  Another
	 * thread may already have it.  In the worst case, it's been
	 * closed due to epoll/kqueue running out-of-space and another
	 * system call (open/accept) may have already reused the FD
	 */

	mog_ioq_current = NULL;
}

/*
 * Returns true if the currently held ioq is contended.
 * This releases the contended flag if it is set, so the caller
 * is expected to yield the current thread shortly afterwards.
 * This is only a hint.
 */
bool mog_ioq_contended(void)
{
	struct mog_ioq *cur = mog_ioq_current;

	/* assume contended for non /devXXX* paths */
	if (!cur)
		return true;

	/*
	 * we only want to minimize the threads hitting true, so we use
	 * an atomic exchange and hope for the best.  This is only a hint.
	 */
	return __sync_bool_compare_and_swap(&cur->contended, true, false);
}

/*
 * called by the main/notify thread if the user has ever set
 * "server aio_threads = XX" via sidechannel.
 */
void mog_ioq_adjust(struct mog_ioq *ioq, unsigned value)
{
	struct mog_fd *mfd = NULL;
	unsigned prev;

	assert(value > 0 && "mog_ioq_adjust value must be non-zero");
	CHECK(int, 0, pthread_mutex_lock(&ioq->mtx));
	prev = ioq->max;
	ioq->max = value;

	if (ioq->cur > ioq->max) {
		/* capacity reduced, get some threads to yield themselves */
		ioq_set_contended(ioq);
	} else {
		unsigned diff = value - prev;

		ioq->cur += diff;

		/*
		 * wake up all sleepers we made capacity for.
		 * unlike mog_ioq_next, we do not release ioq->mtx here
		 * to avoid infinite looping
		 */
		while (diff--) {
			mfd = SIMPLEQ_FIRST(&ioq->ioq_head);
			if (!mfd)
				break;

			SIMPLEQ_REMOVE_HEAD(&ioq->ioq_head, ioqent);
			TRACE(CMOGSTORED_IOQ_RESCHEDULE(mfd->fd));
			mog_activeq_push(ioq->svc->queue, mfd);
		}
	}
	CHECK(int, 0, pthread_mutex_unlock(&ioq->mtx));
}

void mog_ioq_destroy(struct mog_ioq *ioq)
{
	CHECK(int, 0, pthread_mutex_destroy(&ioq->mtx));
}

/*
 * If this returns true, the caller must continue processing a request
 * without checking other state associated with the mfd.
 * If this returns false (the common case), the caller continues as
 * usual.
 */
bool mog_ioq_unblock(struct mog_fd *mfd)
{
	if (mfd->ioq_blocked == 0)
		return false;

	TRACE(CMOGSTORED_IOQ_UNBLOCKED(mfd->fd));
	mfd->ioq_blocked = 0;
	return true;
}

git clone https://yhbt.net/cmogstored.git