cmogstored.git  about / heads / tags
alternative mogstored implementation for MogileFS
blob af755137a63a6a65be7362d182557b206f2bec27 3671 bytes (raw)
$ git show HEAD:trywrite.c	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
 
/*
 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
 */
#include "cmogstored.h"

struct mog_wbuf {
	size_t len;
	size_t off;
	unsigned char buf[FLEXIBLE_ARRAY_MEMBER];
};

static void * wbuf_newv(size_t total, struct iovec *iov, int iovcnt)
{
	struct mog_wbuf *wbuf = malloc(sizeof(struct mog_wbuf) + total);
	void *dst;
	int i;

	if (!wbuf) return MOG_WR_ERROR;

	dst = wbuf->buf;

	wbuf->len = total;
	wbuf->off = 0;

	for (i = 0; i < iovcnt; i++)
		dst = mempcpy(dst, iov[i].iov_base, iov[i].iov_len);

	return wbuf;
}

static void * wbuf_new(void *buf, size_t len)
{
	struct iovec iov;

	iov.iov_base = buf;
	iov.iov_len = len;

	return wbuf_newv(len, &iov, 1);
}

MOG_NOINLINE static void sysbug(const char *fn, ssize_t bytes)
{
	syslog(LOG_ERR, "%s returned %zd bytes written but: %m", fn, bytes);
}

enum mog_write_state mog_tryflush(int fd, struct mog_wbuf **x)
{
	struct mog_wbuf *wbuf = *x;
	unsigned char *ptr = wbuf->buf + wbuf->off;
	size_t len = wbuf->len - wbuf->off;

	for (;;) {
		ssize_t w = write(fd, ptr, len);

		if (w == len) {
			mog_free_and_null(x);
			return MOG_WRSTATE_DONE;
		}
		if (w >= 0) {
			wbuf->off += w;
			ptr += w;
			len -= w;

			continue;
		}

		assert(w < 0 && "no error from write(2)");

		switch (errno) {
		case_EAGAIN: return MOG_WRSTATE_BUSY;
		case EINTR: continue;
		}

		mog_free_and_null(x);
		return MOG_WRSTATE_ERR;
	}
}

/*
 * returns
 * - NULL on full write
 * - MOG_WR_ERROR on error (and sets errno)
 * - address to a new mog_wbuf with unbuffered contents on partial write
 */
void * mog_trywritev(int fd, struct iovec *iov, int iovcnt)
{
	ssize_t len = 0;
	ssize_t w;
	int i;

	for (i = 0; i < iovcnt; i++)
		len += iov[i].iov_len;

	if (len == 0)
		return NULL;
retry:
	w = writev(fd, iov, iovcnt);

	if (w == len) {
		return NULL;
	} else if (w <= 0) {
		switch (errno) {
		case_EAGAIN:
			TRACE(CMOGSTORED_WRITE_BUFFERED(fd, len));
			return wbuf_newv(len, iov, iovcnt);
		case EINTR: goto retry;
		case 0: sysbug("writev", w);
		}
		return MOG_WR_ERROR;
	} else {
		struct iovec *new_iov = iov;

		len -= w;

		 /* skip over iovecs we've already written completely */
		for (i = 0; i < iovcnt; i++, new_iov++) {
			if (w == 0)
				break;
			/*
			 * partially written iovec,
			 * modify and retry with current iovec in front
			 */
			if (new_iov->iov_len > (size_t)w) {
				unsigned char *base = new_iov->iov_base;

				new_iov->iov_len -= w;
				base += w;
				new_iov->iov_base = (void *)base;
				break;
			}

			w -= new_iov->iov_len;
		}

		/* retry without the already-written iovecs */
		iovcnt -= i;
		iov = new_iov;
		goto retry;
	}
}

/*
 * returns
 * - NULL on full write
 * - MOG_WR_ERROR on error (and sets errno)
 * - address to a new mog_wbuf with unbuffered contents on partial write
 */
void * mog_trysend(int fd, void *buf, size_t len, off_t more)
{
	if (MOG_MSG_MORE) {
		int flags = more > 0 ? MOG_MSG_MORE : 0;

		while (len > 0) {
			ssize_t w = send(fd, buf, len, flags);

			if (w == (ssize_t)len)
				return NULL; /* all done */
			if (w > 0) {
				buf = (char *)buf + w;
				len -= w;
				continue;
			}

			/*
			 * we bail on w == 0, too.  send should normally
			 * return zero, but in case there's a kernel bug
			 * we should not infinite loop
			 */
			switch (errno) {
			case_EAGAIN:
				TRACE(CMOGSTORED_WRITE_BUFFERED(fd, len));
				return wbuf_new(buf, len);
			case EINTR: continue;
			case 0: sysbug("send", w);
			}
			return MOG_WR_ERROR;
		}

		return NULL;
	} else {
		struct iovec iov;

		iov.iov_base = buf;
		iov.iov_len = len;

		return mog_trywritev(fd, &iov, 1);
	}
}

git clone https://yhbt.net/cmogstored.git