cmogstored.git  about / heads / tags
alternative mogstored implementation for MogileFS
blob 23bbe96110f23c857719eedb88f8fa3d173d33e8 2909 bytes (raw)
$ git show gl-env:notify.c	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
 
/*
 * Copyright (C) 2012-2016 all contributors <cmogstored-public@bogomips.org>
 * License: GPLv3 or later (see COPYING for details)
 */
#include "cmogstored.h"

static int notes[MOG_NOTIFY_MAX];
static struct mog_fd *notify_mfd;
static time_t usage_file_updated_at;
static time_t usage_file_interval = 10;
struct mog_queue *mog_notify_queue;

void mog_notify_init(void)
{
	const char *interval = getenv("MOG_DISK_USAGE_INTERVAL");

	if (interval) {
		int i = atoi(interval);

		if (i > 0)
			usage_file_interval = (time_t)i;
	}

	assert(mog_notify_queue == NULL && "notify queue already initialized");
	assert(notify_mfd == NULL && "notify_mfd already initialized");

	mog_notify_queue = mog_queue_new();
	notify_mfd = mog_selfwake_new();
	if (notify_mfd) {
		struct mog_selfwake *notify = &notify_mfd->as.selfwake;
		assert(notify->writer && "notify writer not initialized");
		notify->queue = mog_notify_queue;
		mog_idleq_add(notify->queue, notify_mfd, MOG_QEV_RD);
	}
}

static void global_mkusage(void)
{
	mog_mkusage_all();
	usage_file_updated_at = time(NULL);
}

static inline bool note_xchg(enum mog_notification note, int from, int to)
{
	return __sync_bool_compare_and_swap(&notes[note], from, to);
}

static void note_run(void)
{
	if (note_xchg(MOG_NOTIFY_DEVICE_REFRESH, 1, 0))
		global_mkusage();

	if (note_xchg(MOG_NOTIFY_AIO_THREADS, 1, 0))
		mog_svc_aio_threads_handler();
}

/* drain the pipe and process notifications */
static void note_queue_step(struct mog_fd *mfd)
{
	mog_selfwake_drain(mfd);
	note_run();
	mog_idleq_push(mfd->as.selfwake.queue, mfd, MOG_QEV_RD);
}

static void notify_queue_step(struct mog_fd *mfd)
{
	switch (mfd->fd_type) {
	case MOG_FD_TYPE_SELFWAKE: note_queue_step(mfd); return;
	case MOG_FD_TYPE_IOSTAT: mog_iostat_queue_step(mfd); return;
	default:
		assert(0 && mfd->fd_type && "bad fd_type in queue");
	}
}

/* this is the main loop of cmogstored */
void mog_notify_wait(bool need_usage_file)
{
	time_t next = usage_file_updated_at + usage_file_interval;
	time_t now = time(NULL);
	time_t timeout = next - now;
	struct mog_fd *mfd;

	if (next <= now)
		global_mkusage();

	/*
	 * epoll_wait() with timeout==0 can avoid some slow paths,
	 * so take anything that's already ready before sleeping
	 */
	while ((mfd = mog_idleq_wait(mog_notify_queue, 0)))
		notify_queue_step(mfd);

	if (need_usage_file == false)
		timeout = -1;
	else if (timeout > 0)
		timeout *= 1000;
	else
		timeout = 0;

	mfd = mog_idleq_wait_intr(mog_notify_queue, timeout);
	if (mfd)
		notify_queue_step(mfd);
	else if (errno == EINTR)
		note_run();
}

/* this is async-signal safe */
void mog_notify(enum mog_notification note)
{
	switch (note) {
	case MOG_NOTIFY_DEVICE_REFRESH:
	case MOG_NOTIFY_AIO_THREADS:
		note_xchg(note, 0, 1);
		mog_selfwake_interrupt();
		break;
	case MOG_NOTIFY_SIGNAL: break;
	default: assert(0 && "bad note passed");
	}
	mog_selfwake_trigger(notify_mfd);
}

git clone https://yhbt.net/cmogstored.git