cmogstored.git  about / heads / tags
alternative mogstored implementation for MogileFS
blob e183cc303a3918c64e299cd8c878b36ba0b10a97 2899 bytes (raw)
$ git show socket_alive:mgmt_parser.rl	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
 
/*
 * Copyright (C) 2012-2013, Eric Wong <normalperson@yhbt.net>
 * License: GPLv3 or later (see COPYING for details)
 */
#include "cmogstored.h"
#include "mgmt.h"

/*
 * only set fsck prio if we're still accepting connections, graceful
 * shutdown in single-threaded mode uses normal (fair) prio
 */
static void set_prio_fsck(struct mog_mgmt *mgmt)
{
	if (mgmt->svc->mgmt_fd >= 0)
		mgmt->prio = MOG_PRIO_FSCK;
}

%%{
	machine mgmt_parser;

	eor = '\r'?'\n';
	path = "/"[a-zA-Z0-9/\.\-]*;
	reason = ' '("fsck" @ { set_prio_fsck(mgmt); } | [a-zA-Z0-9_]+);
	invalid_line := (
		[ \t]*
		([^ \t\r]+) > { mgmt->mark[0] = fpc - buf; }
		(any-'\n')* > { mgmt->mark[1] = fpc - buf; }
		'\n'
	) @ {
		mog_mgmt_fn_unknown(mgmt, buf);
		really_done = 1;
		fbreak;
	};
	size = (
		"size "(path) > { mgmt->mark[0] = fpc - buf; }
		eor > { mgmt->mark[1] = fpc - buf; }
		@ { mog_mgmt_fn_size(mgmt, buf); fbreak; }
	);
	digest = (
		(
			"MD5" @ { mgmt->alg = GC_MD5; }
			|
			"SHA-1" @ { mgmt->alg = GC_SHA1; }
		)
		" "
		(path) > { mgmt->mark[0] = fpc - buf; }
		( reason? eor) > { mgmt->mark[1] = fpc - buf; }
		@ { mog_mgmt_fn_digest(mgmt, buf); fbreak; }
	);
	watch = "watch" eor @ {
		static int have_iostat = 1;

		if (have_iostat)
			mgmt->forward = MOG_IOSTAT;
		else
			mog_mgmt_fn_watch_err(mgmt);
		fbreak;
	};
	aio_threads = (
		"server aio_threads"i ' '?'='(' ')?
		(digit+) > { mgmt->mark[0] = fpc - buf; }
		eor > { mgmt->mark[1] = fpc - buf; }
		@ { mog_mgmt_fn_aio_threads(mgmt, buf); fbreak; }
	);
	blank = [ \t]* eor @ { mog_mgmt_fn_blank(mgmt); fbreak; };
	shutdown = "shutdown" (" "+"graceful")? eor @ {
		cmogstored_quit();
		fbreak;
	};

	command = (digest|size|watch|aio_threads|shutdown|blank);
	main := command $! {
		p = buf;
		fhold;
		fgoto invalid_line;
	};
}%%

%% write data;

void mog_mgmt_reset_parser(struct mog_mgmt *mgmt)
{
	int cs;
	%% write init;
	mgmt->cs = cs;
	mgmt->mark[0] = mgmt->mark[1] = 0;
}

void mog_mgmt_init(struct mog_mgmt *mgmt, struct mog_svc *svc)
{
	memset(mgmt, 0, sizeof(struct mog_mgmt));
	mog_mgmt_reset_parser(mgmt);
	mgmt->svc = svc;
}

enum mog_parser_state
mog_mgmt_parse(struct mog_mgmt *mgmt, char *buf, size_t len)
{
	char *p, *pe, *eof = NULL;
	int cs = mgmt->cs;
	int really_done = 0;
	size_t off = mgmt->offset;

	assert(mgmt->wbuf == NULL && "unwritten data in buffer");
	assert(off <= len && "mgmt offset past end of buffer");

	p = buf + off;
	pe = buf + len;

	assert((void *)(pe - p) == (void *)(len - off) &&
	       "pointers aren't same distance");

	%% write exec;

	if (really_done)
		cs = mgmt_parser_first_final;

	mgmt->cs = cs;
	mgmt->offset = p - buf;

	if (cs == mgmt_parser_error)
		return MOG_PARSER_ERROR;

	assert(p <= pe && "buffer overflow after mgmt parse");
	assert(mgmt->offset <= len && "offset longer than len");

	if (mgmt->cs == mgmt_parser_first_final) return MOG_PARSER_DONE;
	return MOG_PARSER_CONTINUE;
}

git clone https://yhbt.net/cmogstored.git