Rename struct slab_alloc to struct slab_allocator.
[barrelfish] / usr / mem_serv_dist / mem_serv.c
1 /**
2  * \file
3  * \brief Distributed (percore) memory server
4  */
5
6 /*
7  * Copyright (c) 2007-2011, ETH Zurich.
8  * All rights reserved.
9  *
10  * This file is distributed under the terms in the attached LICENSE file.
11  * If you do not find this file, copies can be found by writing to:
12  * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
13  */
14
15 #include <stdlib.h>
16 #include <stdio.h>
17 #include <string.h>
18 #include <getopt.h>
19
20 #include <inttypes.h>
21 #include <barrelfish/barrelfish.h>
22 #include <barrelfish/dispatch.h>
23 #include <mm/mm.h>
24 #include <trace/trace.h>
25 #include <trace_definitions/trace_defs.h>
26 #include <barrelfish/morecore.h>
27 #include <barrelfish/monitor_client.h>
28 #include <barrelfish/spawn_client.h>
29 #include <skb/skb.h>
30 #include <dist/barrier.h>
31
32 #include <if/mem_defs.h>
33 #include <if/mem_rpcclient_defs.h>
34 #include <if/monitor_defs.h>
35 #include <if/spawn_rpcclient_defs.h>
36
37 #ifdef __scc__
38 #include <barrelfish_kpi/shared_mem_arch.h>
39 #endif
40
41 #include "skb.h"
42 #include "args.h"
43
44 // #include "barrier.h"
45
46 #include "mem_serv.h"
47 #include "steal.h"
48
49 /*
50  * TODO:
51  * - currently requests too much memory from initial mem_serv and other 
52  *   (non_dist) mem_serv clients may suffer
53  */
54
55 /// Globally track the total memory available
56 memsize_t mem_total = 0;
57 /// Globally track the actual memory available to allocate
58 memsize_t mem_avail = 0;
59 /// Globally track the local reserve memory available to allocate
60 memsize_t mem_local = 0;
61
62 /// MM per-core allocator instance data: B-tree to manage mem regions
63 struct mm mm_percore;
64 // static storage for MM allocator to get it started 
65 static char percore_nodebuf[SLAB_STATIC_SIZE(MINSPARENODES,
66                                              MM_NODE_SIZE(MAXCHILDBITS))];
67
68 /// MM allocator for reserve of emergency memory used within the mem_serv only
69 struct mm mm_local;
70 // static storage for MM allocator to get it started 
71 static char local_nodebuf[SLAB_STATIC_SIZE(MINSPARENODES,
72                                            MM_NODE_SIZE(MAXCHILDBITS))];
73
74 /// simple slot allocator used by MM
75 static struct slot_prealloc percore_slot_alloc;
76
77 #ifndef __scc__
78 static struct mm *mm_slots = &mm_percore;
79 #else
80 static struct mm *mm_slots = &mm_local;
81 #endif
82
83 #if 0
84 static void dump_ram_region(int index, struct mem_region* m)
85 {
86
87     uintptr_t start, limit;
88
89     start = (uintptr_t)m->mr_base;
90     limit = start + (1UL << m->mr_bits);
91
92     char prefix = ' ';
93     size_t quantity = 1UL << m->mr_bits;
94
95     if (m->mr_bits >= 30) {
96         prefix = 'G';
97         quantity >>= 30;
98     }
99     else if (m->mr_bits >= 20) {
100         prefix = 'M';
101         quantity >>= 20;
102     }
103     else if (m->mr_bits >= 10) {
104         prefix = 'K';
105         quantity >>= 10;
106     }
107
108     printf("RAM region %d: 0x%" PRIxPTR
109            " - 0x%" PRIxPTR " (%lu %cB, %u bits)\n",
110            index, start, limit, quantity, prefix, m->mr_bits);
111 }
112 #endif // 0
113
114 errval_t slab_refill(struct slab_allocator *slabs)
115 {
116     errval_t err;
117
118     // refill slab allocator if needed 
119     while (slab_freecount(slabs) <= MINSPARENODES) {
120         // debug_printf("running low on free slabs: slabs=%ld\n", 
121         //             slab_freecount(&mm_percore.slabs));
122         struct capref frame;
123         err = slot_alloc(&frame);
124         if (err_is_fail(err)) {
125             return err_push(err, LIB_ERR_SLOT_ALLOC);
126         }
127         err = frame_create(frame, BASE_PAGE_SIZE * 8, NULL);
128         if (err_is_fail(err)) {
129             slot_free(frame);
130             return err_push(err, LIB_ERR_FRAME_CREATE);
131         }
132         void *buf;
133         err = vspace_map_one_frame(&buf, BASE_PAGE_SIZE * 8, frame,
134                                    NULL, NULL);
135         if (err_is_fail(err)) {
136             cap_destroy(frame);
137             return err_push(err, LIB_ERR_VSPACE_MAP);
138         }
139         slab_grow(slabs, buf, BASE_PAGE_SIZE * 8);
140     }
141
142     return SYS_ERR_OK;
143 }
144
145 static errval_t do_free(struct mm *mm, struct capref ramcap,
146                         genpaddr_t base, uint8_t bits,
147                         memsize_t *mem_available)
148 {
149     errval_t ret;
150     memsize_t mem_to_add;
151
152     mem_to_add = (memsize_t)1 << bits;
153
154     ret = mm_free(mm, ramcap, base, bits);
155     if (err_is_fail(ret)) {
156         if (err_no(ret) == MM_ERR_NOT_FOUND) {
157             // memory wasn't there initially, add it
158             ret = mm_add(mm, ramcap, bits, base);
159             if (err_is_fail(ret)) {
160                 return err_push(ret, MM_ERR_MM_ADD);
161             }
162             mem_total += mem_to_add;
163         } else {
164             return err_push(ret, MM_ERR_MM_FREE);
165         }
166     }
167
168     *mem_available += mem_to_add;
169
170     return SYS_ERR_OK;
171 }
172
173 static errval_t percore_free(struct capref ramcap) 
174 {
175     struct capability info;
176     errval_t ret;
177
178     ret = debug_cap_identify(ramcap, &info);
179     if (err_is_fail(ret)) {
180         return err_push(ret, MON_ERR_CAP_IDENTIFY);
181     }
182
183     if (info.type != ObjType_RAM) {
184         return SYS_ERR_INVALID_SOURCE_TYPE;
185     }
186
187 #if 0
188     printf("%d: Cap is type %d Ram base 0x%"PRIxGENPADDR
189            " (%"PRIuGENPADDR") Bits %d\n", disp_get_core_id(),
190            info.type, info.u.ram.base, info.u.ram.base, 
191            info.u.ram.bits);
192 #endif
193
194     return do_free(&mm_percore, ramcap, info.u.ram.base,
195                    info.u.ram.bits, &mem_avail);
196 }
197
198 #ifdef __scc__
199 static errval_t local_free(struct capref ramcap) 
200 {
201     struct capability info;
202     errval_t ret;
203
204     ret = debug_cap_identify(ramcap, &info);
205     if (err_is_fail(ret)) {
206         return err_push(ret, MON_ERR_CAP_IDENTIFY);
207     }
208
209     if (info.type != ObjType_RAM) {
210         return SYS_ERR_INVALID_SOURCE_TYPE;
211     }
212
213     return do_free(&mm_local, ramcap, info.u.ram.base,
214                    info.u.ram.bits, &mem_local);
215 }
216 #endif
217
218 errval_t percore_free_handler_common(struct capref ramcap, genpaddr_t base,
219                                      uint8_t bits)
220 {
221 #ifndef __scc__
222     return do_free(&mm_percore, ramcap, base, bits, &mem_avail);
223 #else
224     if (base < SHARED_MEM_MIN) {
225         return do_free(&mm_local, ramcap, base, bits, &mem_local);
226     } else {
227         return do_free(&mm_percore, ramcap, base, bits, &mem_avail);
228     }
229 #endif
230 }
231
232 memsize_t mem_available_handler_common(void) 
233 {
234     return mem_avail;
235 }
236
237
238 static errval_t do_alloc(struct mm *mm, struct capref *ret, uint8_t bits,
239                          genpaddr_t minbase, genpaddr_t maxlimit,
240                          memsize_t *mem_available)
241 {
242     errval_t err;
243
244     assert(bits >= MINSIZEBITS);
245
246     if (((memsize_t)1 << bits) > *mem_available) {
247         return MM_ERR_NOT_FOUND;
248     }
249
250     if(maxlimit == 0) {
251         err = mm_alloc(mm, bits, ret, NULL);
252     } else {
253         err = mm_alloc_range(mm, bits, minbase, maxlimit, ret, NULL);
254     }
255
256     if (err_is_ok(err)) {
257         *mem_available -= (memsize_t)1 << bits;
258     }
259
260     return err;
261 }
262
263
264 errval_t percore_alloc(struct capref *ret, uint8_t bits,
265                               genpaddr_t minbase, genpaddr_t maxlimit)
266 {
267     return do_alloc(&mm_percore, ret, bits, minbase, maxlimit, &mem_avail);
268 }
269
270
271 static errval_t local_alloc(struct capref *ret, uint8_t bits,
272                             genpaddr_t minbase, genpaddr_t maxlimit)
273 {
274     errval_t err;
275
276 #ifdef __scc__
277     // first try local memory
278     err = do_alloc(&mm_local, ret, bits, minbase, maxlimit, &mem_local);
279   
280     // then try the general percore memory
281     if (err_is_fail(err)) {
282         err = percore_alloc(ret, bits, minbase, maxlimit);
283     }
284 #else
285     // first try the general percore memory
286     err = percore_alloc(ret, bits, minbase, maxlimit);
287
288     // then try the local reserve
289     if (err_is_fail(err)) {
290         err = do_alloc(&mm_local, ret, bits, minbase, maxlimit, &mem_local);
291     }
292 #endif
293
294     return err;
295 }
296
297 static errval_t get_more_ram(uint8_t bits, genpaddr_t minbase, 
298                              genpaddr_t maxlimit)
299 {
300     errval_t err;
301     struct capref cap;
302     
303     // try to steal a RAM cap
304     try_steal(&err, &cap, bits, minbase, maxlimit);
305     if (err_is_fail(err)) {
306         // try to get a local reserve RAM cap
307         err = local_alloc(&cap, bits, minbase, maxlimit);
308         if (err_is_fail(err)) {
309             return err;
310         }
311     }
312     // make the cap available for a subsequent alloc
313 #ifndef __scc__
314     percore_free(cap);
315 #else
316     local_free(cap);
317 #endif
318
319     return SYS_ERR_OK;    
320 }
321
322 static errval_t do_slot_prealloc_refill(struct slot_prealloc *slot_alloc_inst)
323 {
324     errval_t err;
325
326     assert(slot_alloc_inst != NULL);
327
328     err = slot_prealloc_refill(slot_alloc_inst);
329     if (err_is_fail(err)) {
330         err = get_more_ram(slot_alloc_inst->cnode_size_bits + OBJBITS_CTE, 0,0);
331         if (err_is_fail(err)) {
332             // debug_printf("get_more_ram failed\n");
333         }
334         err = slot_prealloc_refill(slot_alloc_inst);
335         if (err_is_ok(err)) {
336             // debug_printf("second refill succeeded\n");
337         }
338     }
339     return err;
340 }
341
342 errval_t percore_allocate_handler_common(uint8_t bits,
343                                          genpaddr_t minbase, 
344                                          genpaddr_t maxlimit,
345                                          struct capref *retcap)
346 {
347     struct capref cap;
348     errval_t err, ret;
349
350     // debug_printf("percore alloc request: bits: %d\n", bits);
351
352     trace_event(TRACE_SUBSYS_MEMSERV, TRACE_EVENT_MEMSERV_PERCORE_ALLOC, bits);
353
354     // refill slot allocator if needed 
355     err = do_slot_prealloc_refill(mm_slots->slot_alloc_inst);
356     if (err_is_fail(err)) {
357         DEBUG_ERR(err, "Warning: failure in slot_prealloc_refill");
358     }
359
360     // refill slab allocators if needed 
361     err = slab_refill(&mm_percore.slabs);
362     if (err_is_fail(err)) {
363         DEBUG_ERR(err, "Warning: failure when refilling mm_percore slab");
364     }
365
366     err = slab_refill(&mm_local.slabs);
367     if (err_is_fail(err)) {
368         DEBUG_ERR(err, "Warning: failure when refilling mm_local slab");
369     }
370
371     // do the actual allocation
372 #ifndef __scc__
373     ret = percore_alloc(&cap, bits, minbase, maxlimit);
374 #else
375     ret = local_alloc(&cap, bits, minbase, maxlimit);
376 #endif
377
378     if (err_is_fail(ret)) {
379         // debug_printf("percore_alloc(%d (%lu)) failed\n", bits, 1UL << bits);
380                 printf("[%d][%"PRIuDOMAINID"] percore_alloc failed, going to steal\n",
381                                         disp_get_core_id(), disp_get_domain_id());
382         try_steal(&ret, &cap, bits, minbase, maxlimit);
383     }
384
385     trace_event(TRACE_SUBSYS_MEMSERV, TRACE_EVENT_MEMSERV_PERCORE_ALLOC_COMPLETE, bits);
386
387     *retcap = cap;
388     return ret;
389 }
390
391
392 // this is a candidate for smarter calculation. possibly by the skb
393 static memsize_t get_percore_size(int num_cores)
394 {
395 #ifdef MEMSERV_PERCORE_DYNAMIC
396     errval_t err;
397     memsize_t all_mem_avail, mem_percore, tot_mem;
398
399     // send message to mem_serv
400     struct mem_rpc_client *b = get_mem_client();
401     err = b->vtbl.available(b, &all_mem_avail, &tot_mem);
402
403     if (err_is_fail(err)) {
404         DEBUG_ERR(err, "Warning: failure in call to mem_serv.available");
405         // default to predetermined amount of memory per core
406         return PERCORE_MEM;
407     }
408
409     debug_printf("available memory: %"PRIuMEMSIZE" bytes over %d cores\n", 
410                   all_mem_avail, num_cores);
411
412     mem_percore = all_mem_avail / num_cores;
413
414     debug_printf("available memory per core: %"PRIuMEMSIZE" bytes\n", 
415                   mem_percore);
416
417     return mem_percore;
418 #else
419     // Use predetermined amount of memory per core
420
421     debug_printf("available memory per core: %"PRIuMEMSIZE" bytes\n", 
422                  PERCORE_MEM);
423
424     return PERCORE_MEM;
425 #endif
426 }
427
428 #ifdef MEMSERV_AFFINITY
429 static void set_affinity(coreid_t core)
430 {
431     // get core affinity range and set it as the default for ram_alloc
432     errval_t err;
433     genpaddr_t base;
434     genpaddr_t limit;
435     err = get_percore_affinity(core, &base, &limit);
436     if (err_is_fail(err)) {
437         DEBUG_ERR(err, "Warning: failure in get_percore_affinity");
438         base = 0;
439         limit = 0;
440     }
441     ram_set_affinity(base, limit);
442
443     debug_printf("affinity range is base: %"PRIuGENPADDR", limit: %"
444                  PRIuGENPADDR"\n", base, limit);
445 }
446 #endif
447
448
449 static memsize_t fill_mm(struct mm *mm, memsize_t mem_requested, uint8_t bits, 
450                       memsize_t *mem_tot)
451 {
452     errval_t err;
453
454     memsize_t mem_added = 0;
455     memsize_t mem_to_add = 0;
456     struct capref ramcap;
457     struct capability info;
458
459     // get as much of the requested memory as we can, requesting ever 
460     // smaller RAM caps until we hit the smallest RAM cap size
461
462     while (bits >= MINALLOCBITS) {
463
464         // debug_printf("adding memory %"PRIuMEMSIZE" (%d bits)\n", 
465         //             (memsize_t)1<<bits, bits);
466
467         err = ram_alloc(&ramcap, bits);
468         if (err_is_fail(err)) {
469             // skip this size and try the next size down
470             bits--;
471             continue;
472         } 
473
474         // XXX: Hack until we have cross-core cap management
475         // Forget about remote relations of this cap. This will ensure
476         // that the monitor will hand it back to us in case anyone on
477         // this core deletes it.
478         err = monitor_cap_set_remote(ramcap, false);
479         if(err_is_fail(err)) {
480             DEBUG_ERR(err, "Warning: failed to set cap non-remote. Trying next one.");
481             continue;
482         }
483
484         err = debug_cap_identify(ramcap, &info);
485         if (err_is_fail(err)) {
486             DEBUG_ERR(err, "Warning: failed to identify cap. Trying next one.");
487             continue;
488         }
489 #if 0
490         debug_printf("Cap is type %d Ram base 0x%"PRIxGENPADDR
491                      " (%"PRIuGENPADDR") Bits %d\n",
492                      info.type, info.u.ram.base, info.u.ram.base, 
493                      info.u.ram.bits);
494 #endif
495         assert(bits == info.u.ram.bits);
496         
497         mem_to_add = (memsize_t)1 << bits;
498
499         *mem_tot += mem_to_add;
500
501         err = mm_add(mm, ramcap, bits, info.u.ram.base);
502         if (err_is_ok(err)) {
503             mem_added += mem_to_add;
504
505             mem_requested -= mem_to_add;
506             uint8_t new_bits = log2floor(mem_requested);
507             bits = MIN(bits, new_bits);
508         } else {
509             DEBUG_ERR(err, "Warning: adding RAM region (%p/%d) FAILED", 
510                       info.u.ram.base, info.u.ram.bits);
511         }
512     }
513
514     return mem_added;
515 }
516
517
518 static errval_t init_mm(struct mm *mm, char nodebuf[], memsize_t nodebuf_size,
519                         struct slot_prealloc *slot_alloc_inst,
520                         memsize_t *mem_added, memsize_t *mem_tot)
521 {
522     errval_t err;
523
524     struct capref ramcap;
525     struct capability info;
526
527     /* XXX Base shouldn't need to be 0 ? */
528     err = mm_init(mm, ObjType_RAM,
529                   0, MAXSIZEBITS, MAXCHILDBITS, NULL,
530                   slot_alloc_prealloc, slot_alloc_inst, true);
531     if (err_is_fail(err)) {
532         return err_push(err, MM_ERR_MM_INIT);
533     }
534
535     slab_grow(&mm->slabs, nodebuf, nodebuf_size);
536
537     // Need to bootstrap with a small cap first!
538     err = ram_alloc(&ramcap, SMALLCAP_BITS);
539     if (err_is_fail(err)) {
540         DEBUG_ERR(err, "failed to get small RAM from mem_serv");
541         return err_push(err, LIB_ERR_RAM_ALLOC);
542     }
543
544     err = debug_cap_identify(ramcap, &info);
545     if (err_is_fail(err)) {
546         percore_free(ramcap);
547         return err_push(err, MON_ERR_CAP_IDENTIFY);
548     }
549
550 #if 0
551     printf("Cap is type %d Ram base 0x%"PRIxGENPADDR" Bits %d\n",
552            info.type, info.u.ram.base, info.u.ram.bits);
553 #endif
554     assert(SMALLCAP_BITS == info.u.ram.bits);
555
556     *mem_tot += (memsize_t)1<<SMALLCAP_BITS;
557
558     err = mm_add(mm, ramcap, SMALLCAP_BITS, info.u.ram.base);
559     if (err_is_ok(err)) {
560         *mem_added += (memsize_t)1<<SMALLCAP_BITS;
561     } else {
562         percore_free(ramcap);
563         return err_push(err, MM_ERR_MM_ADD);
564     }
565
566     // try to refill slot allocator (may fail or do nothing) 
567     slot_prealloc_refill(mm->slot_alloc_inst);
568
569     return SYS_ERR_OK;
570 }
571
572 static errval_t init_slot_allocator(struct slot_prealloc *slot_alloc_inst, 
573                                 struct mm *mm)
574 {
575     errval_t err;
576
577     // Initialize slot allocator by passing a cnode cap for it to start with 
578     struct capref cnode_cap;
579     err = slot_alloc(&cnode_cap);
580     if (err_is_fail(err)) {
581         return err_push(err, LIB_ERR_SLOT_ALLOC);
582     }
583
584     struct capref cnode_start_cap = { .slot  = 0 };
585     struct capref ram;
586
587     err = ram_alloc(&ram, BASE_PAGE_BITS);
588     if (err_is_fail(err)) {
589         DEBUG_ERR(err, "failed to allocate RAM cap for cnode");
590         return err_push(err, LIB_ERR_RAM_ALLOC);
591     }
592
593     err = cnode_create_from_mem(cnode_cap, ram, &cnode_start_cap.cnode,
594                                 DEFAULT_CNODE_BITS);
595     if (err_is_fail(err)) {
596         return err_push(err, LIB_ERR_CNODE_CREATE_FROM_MEM);
597     }
598
599     // location where slot allocator will place its top-level cnode 
600     struct capref top_slot_cap = {
601         .cnode = cnode_root,
602         .slot = ROOTCN_SLOT_MODULECN, // XXX: we don't have the module CNode
603     };
604
605     // init slot allocator 
606     err = slot_prealloc_init(slot_alloc_inst, top_slot_cap,
607                              MAXCHILDBITS,
608                              CNODE_BITS, cnode_start_cap,
609                              1UL << DEFAULT_CNODE_BITS, mm);
610     if (err_is_fail(err)) {
611         return err_push(err, MM_ERR_SLOT_ALLOC_INIT);
612     } 
613       
614     return SYS_ERR_OK;
615 }
616
617 errval_t initialize_percore_mem_serv(coreid_t core, coreid_t *cores, 
618                                      int len_cores, memsize_t percore_mem)
619 {
620     errval_t err;
621
622     mem_avail = 0;
623     mem_total = 0;
624
625     trace_event(TRACE_SUBSYS_MEMSERV, TRACE_EVENT_MEMSERV_PERCORE_INIT, 0);
626
627     err = init_slot_allocator(&percore_slot_alloc, mm_slots);
628     if (err_is_fail(err)) {
629         return err_push(err, MM_ERR_SLOT_ALLOC_INIT);
630     }
631
632 #ifdef __scc__
633     ram_set_affinity(0, EXTRA_SHARED_MEM_MIN);
634     err = init_mm(&mm_local, local_nodebuf, sizeof(local_nodebuf),
635                   &percore_slot_alloc, &mem_local, &mem_total);
636     if (err_is_fail(err)) {
637         return err;
638     }
639     ram_set_affinity(SHARED_MEM_MIN + (PERCORE_MEM_SIZE * disp_get_core_id()),
640                      SHARED_MEM_MIN + (PERCORE_MEM_SIZE * (disp_get_core_id() + 1)));
641 #endif
642     err = init_mm(&mm_percore, percore_nodebuf, sizeof(percore_nodebuf),
643                   &percore_slot_alloc, &mem_avail, &mem_total);
644     if (err_is_fail(err)) {
645         return err;
646     }
647 #ifndef __scc__
648     err = init_mm(&mm_local, local_nodebuf, sizeof(local_nodebuf),
649                   &percore_slot_alloc, &mem_local, &mem_total);
650     if (err_is_fail(err)) {
651         return err;
652     }
653 #endif
654
655 #ifdef MEMSERV_AFFINITY
656     set_affinity(core);
657 #endif
658
659 #ifdef __scc__
660     // Suck up private RAM
661     ram_set_affinity(0, EXTRA_SHARED_MEM_MIN);
662 #endif
663
664     // determine how much memory we need to get to fill up the percore mm
665     percore_mem -= mem_total; // memory we've already taken
666     percore_mem -= LOCAL_MEM; // memory we'll take for mm_local
667
668 #ifdef __scc__
669     // Take all of private RAM we can get
670     percore_mem = PERCORE_MEM_SIZE;
671 #endif
672
673     uint8_t percore_bits = log2floor(percore_mem);
674     if (percore_bits > MAXSIZEBITS) {
675         percore_bits = MAXSIZEBITS;
676     }
677     // debug_printf("memory to use: %"PRIuMEMSIZE"\n", percore_mem);
678
679     mem_local += fill_mm(&mm_local, LOCAL_MEM, LOCAL_MEMBITS, &mem_total);
680
681 #ifdef __scc__
682     ram_set_affinity(SHARED_MEM_MIN + (PERCORE_MEM_SIZE * disp_get_core_id()),
683                      SHARED_MEM_MIN + (PERCORE_MEM_SIZE * (disp_get_core_id() + 1)));
684 #endif
685
686     mem_avail += fill_mm(&mm_percore, percore_mem, percore_bits, &mem_total);
687
688     // from now on we don't care where memory comes from anymore
689     ram_set_affinity(0,0);
690     // also use our own memory, rather than the remote central mem_serv
691     ram_alloc_set(local_alloc);
692
693     // try to refill slot allocator (may fail or do nothing)
694     // TODO: is this necessary?
695     slot_prealloc_refill(mm_slots->slot_alloc_inst);
696
697     // refill slab allocator if needed and possible 
698     if (slab_freecount(&mm_percore.slabs) <= MINSPARENODES
699         && mem_avail > (1UL << (CNODE_BITS + OBJBITS_CTE)) * 2
700         + 10 * BASE_PAGE_SIZE) {
701         slab_default_refill(&mm_percore.slabs); // may fail
702     }
703
704     if (slab_freecount(&mm_local.slabs) <= MINSPARENODES
705         && mem_avail > (1UL << (CNODE_BITS + OBJBITS_CTE)) * 2
706         + 10 * BASE_PAGE_SIZE) {
707         slab_default_refill(&mm_local.slabs); // may fail
708     }
709
710     // try to refill slot allocator - now it shouldn't fail!
711     err = slot_prealloc_refill(mm_slots->slot_alloc_inst);
712     if (err_is_fail(err)) {
713         DEBUG_ERR(err, "Fatal internal error in RAM allocator: "
714                   "failed to init slot allocator");
715         return err;
716     }
717
718     // init peer data structures so we know who to contact to steal memory
719     err = init_peers(core, len_cores, cores);
720     if (err_is_fail(err)) {
721         return err_push(err, MS_ERR_INIT_PEERS);
722     }
723
724     // done.
725     debug_printf("Percore RAM allocator initialised, %"PRIuMEMSIZE
726                  " MB (of %"PRIuMEMSIZE" MB) available\n",
727                  mem_avail / 1024 / 1024, mem_total / 1024 / 1024);
728
729
730     trace_event(TRACE_SUBSYS_MEMSERV, TRACE_EVENT_MEMSERV_PERCORE_INIT, 9);
731
732     return SYS_ERR_OK;
733 }
734
735 /**
736  * \brief Request a spawnd to reconnect to a local memserv
737  *
738  * \param coreid The core that the spawnd is running on
739  */
740 errval_t set_local_spawnd_memserv(coreid_t coreid)
741 {
742     struct spawn_rpc_client *cl;
743     errval_t err = spawn_rpc_client(coreid, &cl);
744     if (err_is_fail(err)) {
745         return err;
746     }
747
748     return cl->vtbl.use_local_memserv(cl);
749 }
750
751
752 static int run_worker(coreid_t core, struct args *args)
753 {
754     assert(args != NULL);
755
756     // debug_printf("Distributed mem_serv. percore server on core %d\n", core);
757
758     // this should never return
759     percore_mem_serv(core, args->cores, args->cores_len, args->ram); 
760     return EXIT_FAILURE; // so we should never reach here
761 }
762
763
764 static int run_master(coreid_t core, struct args *args)
765 {
766     assert(args != NULL);
767
768     errval_t err;
769
770     debug_printf("Distributed mem_serv. master on core %d\n", core);
771
772     memsize_t percore_mem;
773     if (args->ram > 0) {
774         percore_mem = args->ram;
775     } else {
776         percore_mem = get_percore_size(args->cores_len); 
777     }
778
779     // debug_printf("spawning on %d cores\n", args->cores_len);
780
781     // set up args for the spawn
782     // -w
783     // -c <core list>
784     // -r <percore_mem>
785     char *new_argv[7];
786     new_argv[0] = args->path;
787     new_argv[1] = "-w"; 
788     new_argv[2] = "-c";
789     new_argv[3] = list_to_string(args->cores, args->cores_len);
790     assert(new_argv[3] != NULL);
791     if (new_argv[3] == NULL) {
792         DEBUG_ERR(LIB_ERR_MALLOC_FAIL, "out of memory");
793         return EXIT_FAILURE;
794     }
795     new_argv[4] = "-r";
796     new_argv[5] = malloc(20); // enough to fit a 64 bit number
797     assert(new_argv[5] != NULL);
798     if (new_argv[5] == NULL) {
799         DEBUG_ERR(LIB_ERR_MALLOC_FAIL, "out of memory");
800         return EXIT_FAILURE;
801     }
802     sprintf(new_argv[5], "%"PRIuMEMSIZE, percore_mem);
803     new_argv[6] = NULL;
804
805     for (int i = 0; i < args->cores_len; i++) {
806         err = spawn_program(args->cores[i], new_argv[0], new_argv, 
807                             NULL, 0, NULL);
808         if (err_is_fail(err)) {
809             DEBUG_ERR(err, "spawning percore mem_serv on core %d", i);
810             return EXIT_FAILURE;
811         }
812     }
813     
814     // wait for all the spawned mem_servs to start up
815     // err = ns_barrier_master_l(args->cores, args->cores_len, MEMSERV_DIST);
816     err = nsb_master_l(args->cores, args->cores_len, MEMSERV_DIST);
817     if (err_is_fail(err)) {
818         DEBUG_ERR(err, "barrier_master failed");
819         return EXIT_FAILURE;
820     }
821
822     return EXIT_SUCCESS;
823 }
824
825 int common_main(int argc, char ** argv)
826 {
827     coreid_t core = disp_get_core_id();
828
829     struct args my_args;
830     my_args = process_args(argc, argv);
831
832     if (my_args.master) {
833         return run_master(core, &my_args);
834     } else {
835         return run_worker(core, &my_args);
836     } 
837
838     return EXIT_SUCCESS;
839 }