/* * Copyright (C) 2012-2020 all contributors * License: GPL-3.0+ */ #include "cmogstored.h" /* * This is a semaphore-like API with explicit queueing and activation, * so contended scheduling/wakeups happen via epoll/kqueue and there * is never blocking of threads (other than the mutex which only protects * small memory-only operations) * * The main operations are mog_ioq_ready and mog_ioq_next * * Flow: * * mog_ioq_ready ---> true --> normal dispatch --> mog_ioq_next * | ^ (mog_ioq_unblock) | * | | | * | `-------<--------\ | * false | | * | | V * | | | * V | | * SIMPLEQ_INSERT_TAIL(push) ^ | * || | V * VV | / * \\ | / * \\ | / * \\ | V * `===(wait for)==========>===> SIMPLEQ_{FIRST,REMOVE_HEAD}(pop) * | | * | V * | | * ^ add to kqueue/epoll ready list * | | * | V * | / * `---------<---------' * * mog_ioq_next is automatically called when a thread releases a regular file. */ __thread struct mog_ioq *mog_ioq_current; void mog_ioq_init(struct mog_ioq *ioq, struct mog_svc *svc, unsigned val) { ioq->cur = val; ioq->max = val; ioq->svc = svc; ioq->contended = false; SIMPLEQ_INIT(&ioq->ioq_head); CHECK(int, 0, pthread_mutex_init(&ioq->mtx, NULL)); } /* * we do not need this, yet, but this will allow us to have multi-threaded * shutdown in the future (we currently drop into single-threaded mode) */ void mog_ioq_requeue_prepare(struct mog_ioq *ioq) { assert(ioq->cur >= ioq->max && "we should only get here when idle before mog_fdmap_requeue"); SIMPLEQ_INIT(&ioq->ioq_head); } /* * this is only a hint, so no explicit memory barriers or atomics */ static inline void ioq_set_contended(struct mog_ioq *ioq) { ioq->contended = true; } /* * This is like sem_trywait. Each thread is only allowed to acquire * one ioq at once. * * If this returns false, the caller _must_ return MOG_NEXT_IGNORE to * prevent the mfd from being added to an epoll/kqueue watch list. * Adding the mfd to an epoll/kqueue watch list in the same thread/context * where this function returns true is a guaranteed bug. * * mfd is the client socket, not the open (regular) file */ bool mog_ioq_ready(struct mog_ioq *ioq, struct mog_fd *mfd) { bool good; assert(mog_ioq_current == NULL && "already holding mog_ioq_current"); CHECK(int, 0, pthread_mutex_lock(&ioq->mtx)); good = ioq->cur > 0; if (good) { --ioq->cur; mog_ioq_current = ioq; } else { TRACE(CMOGSTORED_IOQ_BLOCKED(mfd->fd)); mfd->ioq_blocked = 1; SIMPLEQ_INSERT_TAIL(&ioq->ioq_head, mfd, ioqent); ioq_set_contended(ioq); } CHECK(int, 0, pthread_mutex_unlock(&ioq->mtx)); return good; } /* * analogous to sem_post, this wakes up the next waiter * check_ioq may be NULL, if non-NULL, it validates against mog_ioq_current */ void mog_ioq_next(struct mog_ioq *check_ioq) { struct mog_fd *mfd = NULL; if (mog_ioq_current == NULL) return; CHECK(int, 0, pthread_mutex_lock(&mog_ioq_current->mtx)); assert((check_ioq == NULL) || (check_ioq == mog_ioq_current && "ioq mismatch (tls vs check)")); mog_ioq_current->cur++; if (mog_ioq_current->cur <= mog_ioq_current->max) { /* wake up any waiters */ mfd = SIMPLEQ_FIRST(&mog_ioq_current->ioq_head); if (mfd) SIMPLEQ_REMOVE_HEAD(&mog_ioq_current->ioq_head, ioqent); } else { /* mog_ioq_adjust was called and lowered our capacity */ mog_ioq_current->cur--; ioq_set_contended(mog_ioq_current); } CHECK(int, 0, pthread_mutex_unlock(&mog_ioq_current->mtx)); /* wake up the next sleeper on this queue */ if (mfd) { TRACE(CMOGSTORED_IOQ_RESCHEDULE(mfd->fd)); mog_activeq_push(mog_ioq_current->svc->queue, mfd); } /* * We may not touch mfd after mog_activeq_push. Another * thread may already have it. In the worst case, it's been * closed due to epoll/kqueue running out-of-space and another * system call (open/accept) may have already reused the FD */ mog_ioq_current = NULL; } /* * Returns true if the currently held ioq is contended. * This releases the contended flag if it is set, so the caller * is expected to yield the current thread shortly afterwards. * This is only a hint. */ bool mog_ioq_contended(void) { struct mog_ioq *cur = mog_ioq_current; /* assume contended for non /devXXX* paths */ if (!cur) return true; /* * we only want to minimize the threads hitting true, so we use * an atomic exchange and hope for the best. This is only a hint. */ return __sync_bool_compare_and_swap(&cur->contended, true, false); } /* * called by the main/notify thread if the user has ever set * "server aio_threads = XX" via sidechannel. */ void mog_ioq_adjust(struct mog_ioq *ioq, unsigned value) { struct mog_fd *mfd = NULL; unsigned prev; assert(value > 0 && "mog_ioq_adjust value must be non-zero"); CHECK(int, 0, pthread_mutex_lock(&ioq->mtx)); prev = ioq->max; ioq->max = value; if (ioq->cur > ioq->max) { /* capacity reduced, get some threads to yield themselves */ ioq_set_contended(ioq); } else { unsigned diff = value - prev; ioq->cur += diff; /* * wake up all sleepers we made capacity for. * unlike mog_ioq_next, we do not release ioq->mtx here * to avoid infinite looping */ while (diff--) { mfd = SIMPLEQ_FIRST(&ioq->ioq_head); if (!mfd) break; SIMPLEQ_REMOVE_HEAD(&ioq->ioq_head, ioqent); TRACE(CMOGSTORED_IOQ_RESCHEDULE(mfd->fd)); mog_activeq_push(ioq->svc->queue, mfd); } } CHECK(int, 0, pthread_mutex_unlock(&ioq->mtx)); } void mog_ioq_destroy(struct mog_ioq *ioq) { CHECK(int, 0, pthread_mutex_destroy(&ioq->mtx)); } /* * If this returns true, the caller must continue processing a request * without checking other state associated with the mfd. * If this returns false (the common case), the caller continues as * usual. */ bool mog_ioq_unblock(struct mog_fd *mfd) { if (mfd->ioq_blocked == 0) return false; TRACE(CMOGSTORED_IOQ_UNBLOCKED(mfd->fd)); mfd->ioq_blocked = 0; return true; }