Blob


1 /*
2 * Rebuild the index from scratch, in place.
3 */
4 #include "stdinc.h"
5 #include "dat.h"
6 #include "fns.h"
8 enum
9 {
10 MinBufSize = 64*1024,
11 MaxBufSize = 4*1024*1024,
12 };
14 typedef struct IEntryBuf IEntryBuf;
15 struct IEntryBuf
16 {
17 IEntry ie[100];
18 int nie;
19 };
21 typedef struct ScoreBuf ScoreBuf;
22 struct ScoreBuf
23 {
24 uchar score[100][VtScoreSize];
25 int nscore;
26 };
28 int dumb;
29 int errors;
30 char **isect;
31 int nisect;
32 int bloom;
33 int zero;
35 u32int isectmem;
36 u64int totalbuckets;
37 u64int totalclumps;
38 Channel *arenadonechan;
39 Channel *isectdonechan;
40 Index *ix;
42 u64int arenaentries;
43 u64int skipentries;
44 u64int indexentries;
46 static int shouldprocess(ISect*);
47 static void isectproc(void*);
48 static void arenapartproc(void*);
50 void
51 usage(void)
52 {
53 fprint(2, "usage: buildindex [-bd] [-i isect]... [-M imem] venti.conf\n");
54 threadexitsall("usage");
55 }
57 void
58 threadmain(int argc, char *argv[])
59 {
60 int fd, i, napart;
61 u32int bcmem, imem;
62 Config conf;
63 Part *p;
65 ventifmtinstall();
66 imem = 256*1024*1024;
67 ARGBEGIN{
68 case 'b':
69 bloom = 1;
70 break;
71 case 'd': /* debugging - make sure to run all 3 passes */
72 dumb = 1;
73 break;
74 case 'i':
75 isect = vtrealloc(isect, (nisect+1)*sizeof(isect[0]));
76 isect[nisect++] = EARGF(usage());
77 break;
78 case 'M':
79 imem = unittoull(EARGF(usage()));
80 break;
81 default:
82 usage();
83 break;
84 }ARGEND
86 if(argc != 1)
87 usage();
89 if(initventi(argv[0], &conf) < 0)
90 sysfatal("can't init venti: %r");
91 ix = mainindex;
92 if(nisect == 0 && ix->bloom)
93 bloom = 1;
94 if(bloom && ix->bloom && resetbloom(ix->bloom) < 0)
95 sysfatal("loadbloom: %r");
96 if(bloom && !ix->bloom)
97 sysfatal("-b specified but no bloom filter");
98 if(!bloom)
99 ix->bloom = nil;
100 isectmem = imem/ix->nsects;
102 /*
103 * safety first - only need read access to arenas
104 */
105 p = nil;
106 for(i=0; i<ix->narenas; i++){
107 if(ix->arenas[i]->part != p){
108 p = ix->arenas[i]->part;
109 if((fd = open(p->filename, OREAD)) < 0)
110 sysfatal("cannot reopen %s: %r", p->filename);
111 dup(fd, p->fd);
112 close(fd);
116 /*
117 * need a block for every arena
118 */
119 bcmem = maxblocksize * (mainindex->narenas + 16);
120 if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
121 initdcache(bcmem);
123 totalclumps = 0;
124 for(i=0; i<ix->narenas; i++)
125 totalclumps += ix->arenas[i]->diskstats.clumps;
127 totalbuckets = 0;
128 for(i=0; i<ix->nsects; i++)
129 totalbuckets += ix->sects[i]->blocks;
130 fprint(2, "%,lld clumps, %,lld buckets\n", totalclumps, totalbuckets);
132 /* start index procs */
133 fprint(2, "%T read index\n");
134 isectdonechan = chancreate(sizeof(void*), 1);
135 for(i=0; i<ix->nsects; i++){
136 if(shouldprocess(ix->sects[i])){
137 ix->sects[i]->writechan = chancreate(sizeof(IEntryBuf), 1);
138 vtproc(isectproc, ix->sects[i]);
142 for(i=0; i<nisect; i++)
143 if(isect[i])
144 fprint(2, "warning: did not find index section %s\n", isect[i]);
146 /* start arena procs */
147 p = nil;
148 napart = 0;
149 arenadonechan = chancreate(sizeof(void*), 0);
150 for(i=0; i<ix->narenas; i++){
151 if(ix->arenas[i]->part != p){
152 p = ix->arenas[i]->part;
153 vtproc(arenapartproc, p);
154 napart++;
158 /* wait for arena procs to finish */
159 for(i=0; i<napart; i++)
160 recvp(arenadonechan);
162 /* tell index procs to finish */
163 for(i=0; i<ix->nsects; i++)
164 if(ix->sects[i]->writechan)
165 send(ix->sects[i]->writechan, nil);
167 /* wait for index procs to finish */
168 for(i=0; i<ix->nsects; i++)
169 if(ix->sects[i]->writechan)
170 recvp(isectdonechan);
172 if(ix->bloom && writebloom(ix->bloom) < 0)
173 fprint(2, "writing bloom filter: %r\n");
175 fprint(2, "%T done arenaentries=%,lld indexed=%,lld (nskip=%,lld)\n",
176 arenaentries, indexentries, skipentries);
177 threadexitsall(nil);
180 static int
181 shouldprocess(ISect *is)
183 int i;
185 if(nisect == 0)
186 return 1;
188 for(i=0; i<nisect; i++)
189 if(isect[i] && strcmp(isect[i], is->name) == 0){
190 isect[i] = nil;
191 return 1;
193 return 0;
196 static void
197 add(u64int *a, u64int n)
199 static Lock l;
201 lock(&l);
202 *a += n;
203 unlock(&l);
206 /*
207 * Read through an arena partition and send each of its IEntries
208 * to the appropriate index section. When finished, send on
209 * arenadonechan.
210 */
211 enum
213 ClumpChunks = 32*1024,
214 };
215 static void
216 arenapartproc(void *v)
218 int i, j, n, nskip, x;
219 u32int clump;
220 u64int addr, tot;
221 Arena *a;
222 ClumpInfo *ci, *cis;
223 IEntry ie;
224 Part *p;
225 IEntryBuf *buf, *b;
226 uchar *score;
227 ScoreBuf sb;
229 p = v;
230 threadsetname("arenaproc %s", p->name);
231 buf = MKNZ(IEntryBuf, ix->nsects);
233 nskip = 0;
234 tot = 0;
235 sb.nscore = 0;
236 cis = MKN(ClumpInfo, ClumpChunks);
237 for(i=0; i<ix->narenas; i++){
238 a = ix->arenas[i];
239 if(a->part != p)
240 continue;
241 if(a->memstats.clumps)
242 fprint(2, "%T arena %s: %d entries\n",
243 a->name, a->memstats.clumps);
244 /*
245 * Running the loop backwards accesses the
246 * clump info blocks forwards, since they are
247 * stored in reverse order at the end of the arena.
248 * This speeds things slightly.
249 */
250 addr = ix->amap[i].start + a->memstats.used;
251 for(clump=a->memstats.clumps; clump > 0; clump-=n){
252 n = ClumpChunks;
253 if(n > clump)
254 n = clump;
255 if(readclumpinfos(a, clump-n, cis, n) != n){
256 fprint(2, "%T arena %s: directory read: %r\n", a->name);
257 errors = 1;
258 break;
260 for(j=n-1; j>=0; j--){
261 ci = &cis[j];
262 ie.ia.type = ci->type;
263 ie.ia.size = ci->uncsize;
264 addr -= ci->size + ClumpSize;
265 ie.ia.addr = addr;
266 ie.ia.blocks = (ci->size + ClumpSize + (1<<ABlockLog)-1) >> ABlockLog;
267 scorecp(ie.score, ci->score);
268 if(ci->type == VtCorruptType)
269 nskip++;
270 else{
271 tot++;
272 x = indexsect(ix, ie.score);
273 assert(0 <= x && x < ix->nsects);
274 if(ix->sects[x]->writechan) {
275 b = &buf[x];
276 b->ie[b->nie] = ie;
277 b->nie++;
278 if(b->nie == nelem(b->ie)) {
279 send(ix->sects[x]->writechan, b);
280 b->nie = 0;
283 if(ix->bloom) {
284 score = sb.score[sb.nscore++];
285 scorecp(score, ie.score);
286 if(sb.nscore == nelem(sb.score)) {
287 markbloomfiltern(ix->bloom, sb.score, sb.nscore);
288 sb.nscore = 0;
294 if(addr != ix->amap[i].start)
295 fprint(2, "%T arena %s: clump miscalculation %lld != %lld\n", a->name, addr, ix->amap[i].start);
297 add(&arenaentries, tot);
298 add(&skipentries, nskip);
300 for(i=0; i<ix->nsects; i++)
301 if(ix->sects[i]->writechan && buf[i].nie > 0)
302 send(ix->sects[i]->writechan, &buf[i]);
303 free(buf);
304 free(cis);
305 if(ix->bloom && sb.nscore > 0)
306 markbloomfiltern(ix->bloom, sb.score, sb.nscore);
307 sendp(arenadonechan, p);
310 /*
311 * Convert score into relative bucket number in isect.
312 * Can pass a packed ientry instead of score - score is first.
313 */
314 static u32int
315 score2bucket(ISect *is, uchar *score)
317 u32int b;
319 b = hashbits(score, 32)/ix->div;
320 if(b < is->start || b >= is->stop){
321 fprint(2, "score2bucket: score=%V div=%d b=%ud start=%ud stop=%ud\n",
322 score, ix->div, b, is->start, is->stop);
324 assert(is->start <= b && b < is->stop);
325 return b - is->start;
328 /*
329 * Convert offset in index section to bucket number.
330 */
331 static u32int
332 offset2bucket(ISect *is, u64int offset)
334 u32int b;
336 assert(is->blockbase <= offset);
337 offset -= is->blockbase;
338 b = offset/is->blocksize;
339 assert(b < is->stop-is->start);
340 return b;
343 /*
344 * Convert bucket number to offset.
345 */
346 static u64int
347 bucket2offset(ISect *is, u32int b)
349 assert(b <= is->stop-is->start);
350 return is->blockbase + (u64int)b*is->blocksize;
353 /*
354 * IEntry buffers to hold initial round of spraying.
355 */
356 typedef struct Buf Buf;
357 struct Buf
359 Part *part; /* partition being written */
360 uchar *bp; /* current block */
361 uchar *ep; /* end of block */
362 uchar *wp; /* write position in block */
363 u64int boffset; /* start offset */
364 u64int woffset; /* next write offset */
365 u64int eoffset; /* end offset */
366 u32int nentry; /* number of entries written */
367 };
369 static void
370 bflush(Buf *buf)
372 u32int bufsize;
374 if(buf->woffset >= buf->eoffset)
375 sysfatal("buf index chunk overflow - need bigger index");
376 bufsize = buf->ep - buf->bp;
377 if(writepart(buf->part, buf->woffset, buf->bp, bufsize) < 0){
378 fprint(2, "write %s: %r\n", buf->part->name);
379 errors = 1;
381 buf->woffset += bufsize;
382 memset(buf->bp, 0, bufsize);
383 buf->wp = buf->bp;
386 static void
387 bwrite(Buf *buf, IEntry *ie)
389 if(buf->wp+IEntrySize > buf->ep)
390 bflush(buf);
391 assert(buf->bp <= buf->wp && buf->wp < buf->ep);
392 packientry(ie, buf->wp);
393 buf->wp += IEntrySize;
394 assert(buf->bp <= buf->wp && buf->wp <= buf->ep);
395 buf->nentry++;
398 /*
399 * Minibuffer. In-memory data structure holds our place
400 * in the buffer but has no block data. We are writing and
401 * reading the minibuffers at the same time. (Careful!)
402 */
403 typedef struct Minibuf Minibuf;
404 struct Minibuf
406 u64int boffset; /* start offset */
407 u64int roffset; /* read offset */
408 u64int woffset; /* write offset */
409 u64int eoffset; /* end offset */
410 u32int nentry; /* # entries left to read */
411 u32int nwentry; /* # entries written */
412 };
414 /*
415 * Index entry pool. Used when trying to shuffle around
416 * the entries in a big buffer into the corresponding M minibuffers.
417 * Sized to hold M*EntriesPerBlock entries, so that there will always
418 * either be room in the pool for another block worth of entries
419 * or there will be an entire block worth of sorted entries to
420 * write out.
421 */
422 typedef struct IEntryLink IEntryLink;
423 typedef struct IPool IPool;
425 struct IEntryLink
427 uchar ie[IEntrySize]; /* raw IEntry */
428 IEntryLink *next; /* next in chain */
429 };
431 struct IPool
433 ISect *isect;
434 u32int buck0; /* first bucket in pool */
435 u32int mbufbuckets; /* buckets per minibuf */
436 IEntryLink *entry; /* all IEntryLinks */
437 u32int nentry; /* # of IEntryLinks */
438 IEntryLink *free; /* free list */
439 u32int nfree; /* # on free list */
440 Minibuf *mbuf; /* all minibufs */
441 u32int nmbuf; /* # of minibufs */
442 IEntryLink **mlist; /* lists for each minibuf */
443 u32int *mcount; /* # on each mlist[i] */
444 u32int bufsize; /* block buffer size */
445 uchar *rbuf; /* read buffer */
446 uchar *wbuf; /* write buffer */
447 u32int epbuf; /* entries per block buffer */
448 };
450 /*
451 static int
452 countsokay(IPool *p)
454 int i;
455 u64int n;
457 n = 0;
458 for(i=0; i<p->nmbuf; i++)
459 n += p->mcount[i];
460 n += p->nfree;
461 if(n != p->nentry){
462 print("free %ud:", p->nfree);
463 for(i=0; i<p->nmbuf; i++)
464 print(" %ud", p->mcount[i]);
465 print(" = %lld nentry: %ud\n", n, p->nentry);
467 return n == p->nentry;
469 */
471 static IPool*
472 mkipool(ISect *isect, Minibuf *mbuf, u32int nmbuf,
473 u32int mbufbuckets, u32int bufsize)
475 u32int i, nentry;
476 uchar *data;
477 IPool *p;
478 IEntryLink *l;
480 nentry = (nmbuf+1)*bufsize / IEntrySize;
481 p = ezmalloc(sizeof(IPool)
482 +nentry*sizeof(IEntry)
483 +nmbuf*sizeof(IEntryLink*)
484 +nmbuf*sizeof(u32int)
485 +3*bufsize);
487 p->isect = isect;
488 p->mbufbuckets = mbufbuckets;
489 p->bufsize = bufsize;
490 p->entry = (IEntryLink*)(p+1);
491 p->nentry = nentry;
492 p->mlist = (IEntryLink**)(p->entry+nentry);
493 p->mcount = (u32int*)(p->mlist+nmbuf);
494 p->nmbuf = nmbuf;
495 p->mbuf = mbuf;
496 data = (uchar*)(p->mcount+nmbuf);
497 data += bufsize - (uintptr)data%bufsize;
498 p->rbuf = data;
499 p->wbuf = data+bufsize;
500 p->epbuf = bufsize/IEntrySize;
502 for(i=0; i<p->nentry; i++){
503 l = &p->entry[i];
504 l->next = p->free;
505 p->free = l;
506 p->nfree++;
508 return p;
511 /*
512 * Add the index entry ie to the pool p.
513 * Caller must know there is room.
514 */
515 static void
516 ipoolinsert(IPool *p, uchar *ie)
518 u32int buck, x;
519 IEntryLink *l;
521 assert(p->free != nil);
523 buck = score2bucket(p->isect, ie);
524 x = (buck-p->buck0) / p->mbufbuckets;
525 if(x >= p->nmbuf){
526 fprint(2, "buck=%ud mbufbucket=%ud x=%ud\n",
527 buck, p->mbufbuckets, x);
529 assert(x < p->nmbuf);
531 l = p->free;
532 p->free = l->next;
533 p->nfree--;
534 memmove(l->ie, ie, IEntrySize);
535 l->next = p->mlist[x];
536 p->mlist[x] = l;
537 p->mcount[x]++;
540 /*
541 * Pull out a block containing as many
542 * entries as possible for minibuffer x.
543 */
544 static u32int
545 ipoolgetbuf(IPool *p, u32int x)
547 uchar *bp, *ep, *wp;
548 IEntryLink *l;
549 u32int n;
551 bp = p->wbuf;
552 ep = p->wbuf + p->bufsize;
553 n = 0;
554 assert(x < p->nmbuf);
555 for(wp=bp; wp+IEntrySize<=ep && p->mlist[x]; wp+=IEntrySize){
556 l = p->mlist[x];
557 p->mlist[x] = l->next;
558 p->mcount[x]--;
559 memmove(wp, l->ie, IEntrySize);
560 l->next = p->free;
561 p->free = l;
562 p->nfree++;
563 n++;
565 memset(wp, 0, ep-wp);
566 return n;
569 /*
570 * Read a block worth of entries from the minibuf
571 * into the pool. Caller must know there is room.
572 */
573 static void
574 ipoolloadblock(IPool *p, Minibuf *mb)
576 u32int i, n;
578 assert(mb->nentry > 0);
579 assert(mb->roffset >= mb->woffset);
580 assert(mb->roffset < mb->eoffset);
582 n = p->bufsize/IEntrySize;
583 if(n > mb->nentry)
584 n = mb->nentry;
585 if(readpart(p->isect->part, mb->roffset, p->rbuf, p->bufsize) < 0)
586 fprint(2, "readpart %s: %r\n", p->isect->part->name);
587 else{
588 for(i=0; i<n; i++)
589 ipoolinsert(p, p->rbuf+i*IEntrySize);
591 mb->nentry -= n;
592 mb->roffset += p->bufsize;
595 /*
596 * Write out a block worth of entries to minibuffer x.
597 * If necessary, pick up the data there before overwriting it.
598 */
599 static void
600 ipoolflush0(IPool *pool, u32int x)
602 u32int bufsize;
603 Minibuf *mb;
605 mb = pool->mbuf+x;
606 bufsize = pool->bufsize;
607 mb->nwentry += ipoolgetbuf(pool, x);
608 if(mb->nentry > 0 && mb->roffset == mb->woffset){
609 assert(pool->nfree >= pool->bufsize/IEntrySize);
610 /*
611 * There will be room in the pool -- we just
612 * removed a block worth.
613 */
614 ipoolloadblock(pool, mb);
616 if(writepart(pool->isect->part, mb->woffset, pool->wbuf, bufsize) < 0)
617 fprint(2, "writepart %s: %r\n", pool->isect->part->name);
618 mb->woffset += bufsize;
621 /*
622 * Write out some full block of entries.
623 * (There must be one -- the pool is almost full!)
624 */
625 static void
626 ipoolflush1(IPool *pool)
628 u32int i;
630 assert(pool->nfree <= pool->epbuf);
632 for(i=0; i<pool->nmbuf; i++){
633 if(pool->mcount[i] >= pool->epbuf){
634 ipoolflush0(pool, i);
635 return;
638 /* can't be reached - someone must be full */
639 sysfatal("ipoolflush1");
642 /*
643 * Flush all the entries in the pool out to disk.
644 * Nothing more to read from disk.
645 */
646 static void
647 ipoolflush(IPool *pool)
649 u32int i;
651 for(i=0; i<pool->nmbuf; i++)
652 while(pool->mlist[i])
653 ipoolflush0(pool, i);
654 assert(pool->nfree == pool->nentry);
657 /*
658 * Third pass. Pick up each minibuffer from disk into
659 * memory and then write out the buckets.
660 */
662 /*
663 * Compare two packed index entries.
664 * Usual ordering except break ties by putting higher
665 * index addresses first (assumes have duplicates
666 * due to corruption in the lower addresses).
667 */
668 static int
669 ientrycmpaddr(const void *va, const void *vb)
671 int i;
672 uchar *a, *b;
674 a = (uchar*)va;
675 b = (uchar*)vb;
676 i = ientrycmp(a, b);
677 if(i)
678 return i;
679 return -memcmp(a+IEntryAddrOff, b+IEntryAddrOff, 8);
682 static void
683 zerorange(Part *p, u64int o, u64int e)
685 static uchar zero[MaxIoSize];
686 u32int n;
688 for(; o<e; o+=n){
689 n = sizeof zero;
690 if(o+n > e)
691 n = e-o;
692 if(writepart(p, o, zero, n) < 0)
693 fprint(2, "writepart %s: %r\n", p->name);
697 /*
698 * Load a minibuffer into memory and write out the
699 * corresponding buckets.
700 */
701 static void
702 sortminibuffer(ISect *is, Minibuf *mb, uchar *buf, u32int nbuf, u32int bufsize)
704 uchar *buckdata, *p, *q, *ep;
705 u32int b, lastb, memsize, n;
706 u64int o;
707 IBucket ib;
708 Part *part;
710 part = is->part;
711 buckdata = emalloc(is->blocksize);
713 if(mb->nwentry == 0)
714 return;
716 /*
717 * read entire buffer.
718 */
719 assert(mb->nwentry*IEntrySize <= mb->woffset-mb->boffset);
720 assert(mb->woffset-mb->boffset <= nbuf);
721 if(readpart(part, mb->boffset, buf, mb->woffset-mb->boffset) < 0){
722 fprint(2, "readpart %s: %r\n", part->name);
723 errors = 1;
724 return;
726 assert(*(uint*)buf != 0xa5a5a5a5);
728 /*
729 * remove fragmentation due to IEntrySize
730 * not evenly dividing Bufsize
731 */
732 memsize = (bufsize/IEntrySize)*IEntrySize;
733 for(o=mb->boffset, p=q=buf; o<mb->woffset; o+=bufsize){
734 memmove(p, q, memsize);
735 p += memsize;
736 q += bufsize;
738 ep = buf + mb->nwentry*IEntrySize;
739 assert(ep <= buf+nbuf);
741 /*
742 * sort entries
743 */
744 qsort(buf, mb->nwentry, IEntrySize, ientrycmpaddr);
746 /*
747 * write buckets out
748 */
749 n = 0;
750 lastb = offset2bucket(is, mb->boffset);
751 for(p=buf; p<ep; p=q){
752 b = score2bucket(is, p);
753 for(q=p; q<ep && score2bucket(is, q)==b; q+=IEntrySize)
755 if(lastb+1 < b && zero)
756 zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, b));
757 if(IBucketSize+(q-p) > is->blocksize)
758 sysfatal("bucket overflow - make index bigger");
759 memmove(buckdata+IBucketSize, p, q-p);
760 ib.n = (q-p)/IEntrySize;
761 n += ib.n;
762 packibucket(&ib, buckdata, is->bucketmagic);
763 if(writepart(part, bucket2offset(is, b), buckdata, is->blocksize) < 0)
764 fprint(2, "write %s: %r\n", part->name);
765 lastb = b;
767 if(lastb+1 < is->stop-is->start && zero)
768 zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, is->stop - is->start));
770 if(n != mb->nwentry)
771 fprint(2, "sortminibuffer bug: n=%ud nwentry=%ud have=%ld\n", n, mb->nwentry, (ep-buf)/IEntrySize);
773 free(buckdata);
776 static void
777 isectproc(void *v)
779 u32int buck, bufbuckets, bufsize, epbuf, i, j;
780 u32int mbufbuckets, n, nbucket, nn, space;
781 u32int nbuf, nminibuf, xminiclump, prod;
782 u64int blocksize, offset, xclump;
783 uchar *data, *p;
784 Buf *buf;
785 IEntry ie;
786 IEntryBuf ieb;
787 IPool *ipool;
788 ISect *is;
789 Minibuf *mbuf, *mb;
791 is = v;
792 blocksize = is->blocksize;
793 nbucket = is->stop - is->start;
795 /*
796 * Three passes:
797 * pass 1 - write index entries from arenas into
798 * large sequential sections on index disk.
799 * requires nbuf * bufsize memory.
801 * pass 2 - split each section into minibufs.
802 * requires nminibuf * bufsize memory.
804 * pass 3 - read each minibuf into memory and
805 * write buckets out.
806 * requires entries/minibuf * IEntrySize memory.
808 * The larger we set bufsize the less seeking hurts us.
810 * The fewer sections and minibufs we have, the less
811 * seeking hurts us.
813 * The fewer sections and minibufs we have, the
814 * more entries we end up with in each minibuf
815 * at the end.
817 * Shoot for using half our memory to hold each
818 * minibuf. The chance of a random distribution
819 * getting off by 2x is quite low.
821 * Once that is decided, figure out the smallest
822 * nminibuf and nsection/biggest bufsize we can use
823 * and still fit in the memory constraints.
824 */
826 /* expected number of clump index entries we'll see */
827 xclump = nbucket * (double)totalclumps/totalbuckets;
829 /* number of clumps we want to see in a minibuf */
830 xminiclump = isectmem/2/IEntrySize;
832 /* total number of minibufs we need */
833 prod = (xclump+xminiclump-1) / xminiclump;
835 /* if possible, skip second pass */
836 if(!dumb && prod*MinBufSize < isectmem){
837 nbuf = prod;
838 nminibuf = 1;
839 }else{
840 /* otherwise use nsection = sqrt(nmini) */
841 for(nbuf=1; nbuf*nbuf<prod; nbuf++)
843 if(nbuf*MinBufSize > isectmem)
844 sysfatal("not enough memory");
845 nminibuf = nbuf;
847 /* size buffer to use extra memory */
848 bufsize = MinBufSize;
849 while(bufsize*2*nbuf <= isectmem && bufsize < MaxBufSize)
850 bufsize *= 2;
851 data = emalloc(nbuf*bufsize);
852 epbuf = bufsize/IEntrySize;
853 fprint(2, "%T %s: %,ud buckets, %,ud groups, %,ud minigroups, %,ud buffer\n",
854 is->part->name, nbucket, nbuf, nminibuf, bufsize);
855 /*
856 * Accept index entries from arena procs.
857 */
858 buf = MKNZ(Buf, nbuf);
859 p = data;
860 offset = is->blockbase;
861 bufbuckets = (nbucket+nbuf-1)/nbuf;
862 for(i=0; i<nbuf; i++){
863 buf[i].part = is->part;
864 buf[i].bp = p;
865 buf[i].wp = p;
866 p += bufsize;
867 buf[i].ep = p;
868 buf[i].boffset = offset;
869 buf[i].woffset = offset;
870 if(i < nbuf-1){
871 offset += bufbuckets*blocksize;
872 buf[i].eoffset = offset;
873 }else{
874 offset = is->blockbase + nbucket*blocksize;
875 buf[i].eoffset = offset;
878 assert(p == data+nbuf*bufsize);
880 n = 0;
881 while(recv(is->writechan, &ieb) == 1){
882 if(ieb.nie == 0)
883 break;
884 for(j=0; j<ieb.nie; j++){
885 ie = ieb.ie[j];
886 buck = score2bucket(is, ie.score);
887 i = buck/bufbuckets;
888 assert(i < nbuf);
889 bwrite(&buf[i], &ie);
890 n++;
893 add(&indexentries, n);
895 nn = 0;
896 for(i=0; i<nbuf; i++){
897 bflush(&buf[i]);
898 buf[i].bp = nil;
899 buf[i].ep = nil;
900 buf[i].wp = nil;
901 nn += buf[i].nentry;
903 if(n != nn)
904 fprint(2, "isectproc bug: n=%ud nn=%ud\n", n, nn);
906 free(data);
908 fprint(2, "%T %s: reordering\n", is->part->name);
910 /*
911 * Rearrange entries into minibuffers and then
912 * split each minibuffer into buckets.
913 * The minibuffer must be sized so that it is
914 * a multiple of blocksize -- ipoolloadblock assumes
915 * that each minibuf starts aligned on a blocksize
916 * boundary.
917 */
918 mbuf = MKN(Minibuf, nminibuf);
919 mbufbuckets = (bufbuckets+nminibuf-1)/nminibuf;
920 while(mbufbuckets*blocksize % bufsize)
921 mbufbuckets++;
922 for(i=0; i<nbuf; i++){
923 /*
924 * Set up descriptors.
925 */
926 n = buf[i].nentry;
927 nn = 0;
928 offset = buf[i].boffset;
929 memset(mbuf, 0, nminibuf*sizeof(mbuf[0]));
930 for(j=0; j<nminibuf; j++){
931 mb = &mbuf[j];
932 mb->boffset = offset;
933 offset += mbufbuckets*blocksize;
934 if(offset > buf[i].eoffset)
935 offset = buf[i].eoffset;
936 mb->eoffset = offset;
937 mb->roffset = mb->boffset;
938 mb->woffset = mb->boffset;
939 mb->nentry = epbuf * (mb->eoffset - mb->boffset)/bufsize;
940 if(mb->nentry > buf[i].nentry)
941 mb->nentry = buf[i].nentry;
942 buf[i].nentry -= mb->nentry;
943 nn += mb->nentry;
945 if(n != nn)
946 fprint(2, "isectproc bug2: n=%ud nn=%ud (i=%d)\n", n, nn, i);;
947 /*
948 * Rearrange.
949 */
950 if(!dumb && nminibuf == 1){
951 mbuf[0].nwentry = mbuf[0].nentry;
952 mbuf[0].woffset = buf[i].woffset;
953 }else{
954 ipool = mkipool(is, mbuf, nminibuf, mbufbuckets, bufsize);
955 ipool->buck0 = bufbuckets*i;
956 for(j=0; j<nminibuf; j++){
957 mb = &mbuf[j];
958 while(mb->nentry > 0){
959 if(ipool->nfree < epbuf){
960 ipoolflush1(ipool);
961 /* ipoolflush1 might change mb->nentry */
962 continue;
964 assert(ipool->nfree >= epbuf);
965 ipoolloadblock(ipool, mb);
968 ipoolflush(ipool);
969 nn = 0;
970 for(j=0; j<nminibuf; j++)
971 nn += mbuf[j].nwentry;
972 if(n != nn)
973 fprint(2, "isectproc bug3: n=%ud nn=%ud (i=%d)\n", n, nn, i);
974 free(ipool);
977 /*
978 * Make buckets.
979 */
980 space = 0;
981 for(j=0; j<nminibuf; j++)
982 if(space < mbuf[j].woffset - mbuf[j].boffset)
983 space = mbuf[j].woffset - mbuf[j].boffset;
985 data = emalloc(space);
986 for(j=0; j<nminibuf; j++){
987 mb = &mbuf[j];
988 sortminibuffer(is, mb, data, space, bufsize);
990 free(data);
993 sendp(isectdonechan, is);