summaryrefslogtreecommitdiff
path: root/ext/raindrops/raindrops.c
blob: 837084c14be0be7a4a31e026fec5affbb58237e5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
#include <ruby.h>
#include <unistd.h>
#include <sys/mman.h>
#include <assert.h>
#include <errno.h>
#include <stddef.h>
#include "raindrops_atomic.h"

#ifndef SIZET2NUM
#  define SIZET2NUM(x) ULONG2NUM(x)
#endif
#ifndef NUM2SIZET
#  define NUM2SIZET(x) NUM2ULONG(x)
#endif

/*
 * most modern CPUs have a cache-line size of 64 or 128.
 * We choose a bigger one by default since our structure is not
 * heavily used
 */
static size_t raindrop_size = 128;
static size_t rd_page_size;

#define PAGE_MASK               (~(rd_page_size - 1))
#define PAGE_ALIGN(addr)        (((addr) + rd_page_size - 1) & PAGE_MASK)

/* each raindrop is a counter */
struct raindrop {
	unsigned long counter;
} __attribute__((packed));

/* allow mmap-ed regions to store more than one raindrop */
struct raindrops {
	size_t size;
	size_t capa;
	pid_t pid;
	struct raindrop *drops;
};

/* called by GC */
static void rd_free(void *ptr)
{
	struct raindrops *r = ptr;

	if (r->drops != MAP_FAILED) {
		int rv = munmap(r->drops, raindrop_size * r->capa);
		if (rv != 0)
			rb_bug("munmap failed in gc: %s", strerror(errno));
	}

	xfree(ptr);
}

static size_t rd_memsize(const void *ptr)
{
	const struct raindrops *r = ptr;

	return r->drops == MAP_FAILED ? 0 : raindrop_size * r->capa;
}

static const rb_data_type_t rd_type = {
	"raindrops",
	{ NULL, rd_free, rd_memsize, /* reserved */ },
	/* parent, data, [ flags ] */
};

/* automatically called at creation (before initialize) */
static VALUE alloc(VALUE klass)
{
	struct raindrops *r;
	VALUE rv = TypedData_Make_Struct(klass, struct raindrops, &rd_type, r);

	r->drops = MAP_FAILED;
	return rv;
}

static struct raindrops *get(VALUE self)
{
	struct raindrops *r;

	TypedData_Get_Struct(self, struct raindrops, &rd_type, r);

	if (r->drops == MAP_FAILED)
		rb_raise(rb_eStandardError, "invalid or freed Raindrops");

	return r;
}

/*
 * call-seq:
 *	Raindrops.new(size)	-> raindrops object
 *
 * Initializes a Raindrops object to hold +size+ counters.  +size+ is
 * only a hint and the actual number of counters the object has is
 * dependent on the CPU model, number of cores, and page size of
 * the machine.  The actual size of the object will always be equal
 * or greater than the specified +size+.
 */
static VALUE init(VALUE self, VALUE size)
{
	struct raindrops *r = DATA_PTR(self);
	int tries = 1;
	size_t tmp;

	if (r->drops != MAP_FAILED)
		rb_raise(rb_eRuntimeError, "already initialized");

	r->size = NUM2SIZET(size);
	if (r->size < 1)
		rb_raise(rb_eArgError, "size must be >= 1");

	tmp = PAGE_ALIGN(raindrop_size * r->size);
	r->capa = tmp / raindrop_size;
	assert(PAGE_ALIGN(raindrop_size * r->capa) == tmp && "not aligned");

retry:
	r->drops = mmap(NULL, tmp,
	                PROT_READ|PROT_WRITE, MAP_ANON|MAP_SHARED, -1, 0);
	if (r->drops == MAP_FAILED) {
		int err = errno;

		if ((err == EAGAIN || err == ENOMEM) && tries-- > 0) {
			rb_gc();
			goto retry;
		}
		rb_sys_fail("mmap");
	}
	r->pid = getpid();

	return self;
}

/*
 * mremap() is currently broken with MAP_SHARED
 * https://bugzilla.kernel.org/show_bug.cgi?id=8691
 */
#if defined(HAVE_MREMAP) && !defined(MREMAP_WORKS_WITH_MAP_SHARED)
#  undef HAVE_MREMAP
#endif

#ifdef HAVE_MREMAP
#ifndef MREMAP_MAYMOVE
#  warn MREMAP_MAYMOVE undefined
#  define MREMAP_MAYMOVE 0
#endif
static void resize(struct raindrops *r, size_t new_rd_size)
{
	size_t old_size = raindrop_size * r->capa;
	size_t new_size = PAGE_ALIGN(raindrop_size * new_rd_size);
	void *old_address = r->drops;
	void *rv;

	if (r->pid != getpid())
		rb_raise(rb_eRuntimeError, "cannot mremap() from child");

	rv = mremap(old_address, old_size, new_size, MREMAP_MAYMOVE);
	if (rv == MAP_FAILED) {
		int err = errno;

		if (err == EAGAIN || err == ENOMEM) {
			rb_gc();
			rv = mremap(old_address, old_size, new_size, 0);
		}
		if (rv == MAP_FAILED)
			rb_sys_fail("mremap");
	}
	r->drops = rv;
	r->size = new_rd_size;
	r->capa = new_size / raindrop_size;
	assert(r->capa >= r->size && "bad sizing");
}
#else /* ! HAVE_MREMAP */
/*
 * we cannot use munmap + mmap to reallocate the buffer since it may
 * already be shared by other processes, so we just fail
 */
static void resize(struct raindrops *r, size_t new_rd_size)
{
	rb_raise(rb_eRangeError, "mremap(2) is not available");
}
#endif /* ! HAVE_MREMAP */

/*
 * call-seq:
 *	rd.size = new_size
 *
 * Increases or decreases the current capacity of our Raindrop.
 * Raises RangeError if +new_size+ is too big or small for the
 * current backing store
 */
static VALUE setsize(VALUE self, VALUE new_size)
{
	size_t new_rd_size = NUM2SIZET(new_size);
	struct raindrops *r = get(self);

	if (new_rd_size <= r->capa)
		r->size = new_rd_size;
	else
		resize(r, new_rd_size);

	return new_size;
}

/*
 * call-seq:
 *	rd.capa		-> Integer
 *
 * Returns the number of slots allocated (but not necessarily used) by
 * the Raindrops object.
 */
static VALUE capa(VALUE self)
{
	return SIZET2NUM(get(self)->capa);
}

/*
 * call-seq:
 *	rd.dup		-> rd_copy
 *
 * Duplicates and snapshots the current state of a Raindrops object.
 */
static VALUE init_copy(VALUE dest, VALUE source)
{
	struct raindrops *dst = DATA_PTR(dest);
	struct raindrops *src = get(source);

	init(dest, SIZET2NUM(src->size));
	memcpy(dst->drops, src->drops, raindrop_size * src->size);

	return dest;
}

static unsigned long *addr_of(VALUE self, VALUE index)
{
	struct raindrops *r = get(self);
	unsigned long off = FIX2ULONG(index) * raindrop_size;

	if (off >= raindrop_size * r->size)
		rb_raise(rb_eArgError, "offset overrun");

	return (unsigned long *)((unsigned long)r->drops + off);
}

static unsigned long incr_decr_arg(int argc, const VALUE *argv)
{
	if (argc > 2 || argc < 1)
		rb_raise(rb_eArgError,
		         "wrong number of arguments (%d for 1+)", argc);

	return argc == 2 ? NUM2ULONG(argv[1]) : 1;
}

/*
 * call-seq:
 *	rd.incr(index[, number])	-> result
 *
 * Increments the value referred to by the +index+ by +number+.
 * +number+ defaults to +1+ if unspecified.
 */
static VALUE incr(int argc, VALUE *argv, VALUE self)
{
	unsigned long nr = incr_decr_arg(argc, argv);

	return ULONG2NUM(__sync_add_and_fetch(addr_of(self, argv[0]), nr));
}

/*
 * call-seq:
 *	rd.decr(index[, number])	-> result
 *
 * Decrements the value referred to by the +index+ by +number+.
 * +number+ defaults to +1+ if unspecified.
 */
static VALUE decr(int argc, VALUE *argv, VALUE self)
{
	unsigned long nr = incr_decr_arg(argc, argv);

	return ULONG2NUM(__sync_sub_and_fetch(addr_of(self, argv[0]), nr));
}

/*
 * call-seq:
 *	rd.to_ary	-> Array
 *
 * converts the Raindrops structure to an Array
 */
static VALUE to_ary(VALUE self)
{
	struct raindrops *r = get(self);
	VALUE rv = rb_ary_new2(r->size);
	size_t i;
	unsigned long base = (unsigned long)r->drops;

	for (i = 0; i < r->size; i++) {
		rb_ary_push(rv, ULONG2NUM(*((unsigned long *)base)));
		base += raindrop_size;
	}

	return rv;
}

/*
 * call-seq:
 *	rd.size		-> Integer
 *
 * Returns the number of counters a Raindrops object can hold.  Due to
 * page alignment, this is always equal or greater than the number of
 * requested slots passed to Raindrops.new
 */
static VALUE size(VALUE self)
{
	return SIZET2NUM(get(self)->size);
}

/*
 * call-seq:
 *	rd[index] = value
 *
 * Assigns +value+ to the slot designated by +index+
 */
static VALUE aset(VALUE self, VALUE index, VALUE value)
{
	unsigned long *addr = addr_of(self, index);

	*addr = NUM2ULONG(value);

	return value;
}

/*
 * call-seq:
 *	rd[index]	-> value
 *
 * Returns the value of the slot designated by +index+
 */
static VALUE aref(VALUE self, VALUE index)
{
	return  ULONG2NUM(*addr_of(self, index));
}

#ifdef __linux__
void Init_raindrops_linux_inet_diag(void);
#endif
#ifdef HAVE_TYPE_STRUCT_TCP_INFO
void Init_raindrops_tcp_info(void);
#endif

#ifndef _SC_NPROCESSORS_CONF
#  if defined _SC_NPROCESSORS_ONLN
#    define _SC_NPROCESSORS_CONF _SC_NPROCESSORS_ONLN
#  elif defined _SC_NPROC_ONLN
#    define _SC_NPROCESSORS_CONF _SC_NPROC_ONLN
#  elif defined _SC_CRAY_NCPU
#    define _SC_NPROCESSORS_CONF _SC_CRAY_NCPU
#  endif
#endif

/*
 * call-seq:
 *	rd.evaporate!	-> nil
 *
 * Releases mmap()-ed memory allocated for the Raindrops object back
 * to the OS.  The Ruby garbage collector will also release memory
 * automatically when it is not needed, but this forces release
 * under high memory pressure.
 */
static VALUE evaporate_bang(VALUE self)
{
	struct raindrops *r = get(self);
	void *addr = r->drops;

	r->drops = MAP_FAILED;
	if (munmap(addr, raindrop_size * r->capa) != 0)
		rb_sys_fail("munmap");
	return Qnil;
}

void Init_raindrops_ext(void)
{
	VALUE cRaindrops = rb_define_class("Raindrops", rb_cObject);
	long tmp = 2;

#ifdef _SC_NPROCESSORS_CONF
	tmp = sysconf(_SC_NPROCESSORS_CONF);
#endif
	/* no point in padding on single CPU machines */
	if (tmp == 1)
		raindrop_size = sizeof(unsigned long);
#ifdef _SC_LEVEL1_DCACHE_LINESIZE
	if (tmp != 1) {
		tmp = sysconf(_SC_LEVEL1_DCACHE_LINESIZE);
		if (tmp > 0)
			raindrop_size = (size_t)tmp;
	}
#endif
#if defined(_SC_PAGE_SIZE)
	rd_page_size = (size_t)sysconf(_SC_PAGE_SIZE);
#elif defined(_SC_PAGESIZE)
	rd_page_size = (size_t)sysconf(_SC_PAGESIZE);
#elif defined(HAVE_GETPAGESIZE)
	rd_page_size = (size_t)getpagesize();
#elif defined(PAGE_SIZE)
	rd_page_size = (size_t)PAGE_SIZE;
#elif defined(PAGESIZE)
	rd_page_size = (size_t)PAGESIZE;
#else
#  error unable to detect page size for mmap()
#endif
	if ((rd_page_size == (size_t)-1) || (rd_page_size < raindrop_size))
		rb_raise(rb_eRuntimeError,
			 "system page size invalid: %llu",
			 (unsigned long long)rd_page_size);

	/*
	 * The size of one page of memory for a mmap()-ed Raindrops region.
	 * Typically 4096 bytes under Linux.
	 */
	rb_define_const(cRaindrops, "PAGE_SIZE", SIZET2NUM(rd_page_size));

	/*
	 * The size (in bytes) of a slot in a Raindrops object.
	 * This is the size of a word on single CPU systems and
	 * the size of the L1 cache line size if detectable.
	 *
	 * Defaults to 128 bytes if undetectable.
	 */
	rb_define_const(cRaindrops, "SIZE", SIZET2NUM(raindrop_size));

	/*
	 * The maximum value a raindrop counter can hold
	 */
	rb_define_const(cRaindrops, "MAX", ULONG2NUM((unsigned long)-1));

	rb_define_alloc_func(cRaindrops, alloc);

	rb_define_method(cRaindrops, "initialize", init, 1);
	rb_define_method(cRaindrops, "incr", incr, -1);
	rb_define_method(cRaindrops, "decr", decr, -1);
	rb_define_method(cRaindrops, "to_ary", to_ary, 0);
	rb_define_method(cRaindrops, "[]", aref, 1);
	rb_define_method(cRaindrops, "[]=", aset, 2);
	rb_define_method(cRaindrops, "size", size, 0);
	rb_define_method(cRaindrops, "size=", setsize, 1);
	rb_define_method(cRaindrops, "capa", capa, 0);
	rb_define_method(cRaindrops, "initialize_copy", init_copy, 1);
	rb_define_method(cRaindrops, "evaporate!", evaporate_bang, 0);

#ifdef __linux__
	Init_raindrops_linux_inet_diag();
#endif
#ifdef HAVE_TYPE_STRUCT_TCP_INFO
	Init_raindrops_tcp_info();
#endif
}