Blob


1 /*
2 * Rebuild the index from scratch, in place.
3 */
4 #include "stdinc.h"
5 #include "dat.h"
6 #include "fns.h"
8 enum
9 {
10 MinBufSize = 64*1024,
11 MaxBufSize = 4*1024*1024,
12 };
14 typedef struct IEntryBuf IEntryBuf;
15 struct IEntryBuf
16 {
17 IEntry ie[100];
18 int nie;
19 };
21 typedef struct ScoreBuf ScoreBuf;
22 struct ScoreBuf
23 {
24 uchar score[100][VtScoreSize];
25 int nscore;
26 };
28 int dumb;
29 int errors;
30 char **isect;
31 int nisect;
32 int bloom;
33 int zero;
35 u32int isectmem;
36 u64int totalbuckets;
37 u64int totalclumps;
38 Channel *arenadonechan;
39 Channel *isectdonechan;
40 Index *ix;
42 u64int arenaentries;
43 u64int skipentries;
44 u64int indexentries;
46 static int shouldprocess(ISect*);
47 static void isectproc(void*);
48 static void arenapartproc(void*);
50 void
51 usage(void)
52 {
53 fprint(2, "usage: buildindex [-bd] [-i isect]... [-M imem] venti.conf\n");
54 threadexitsall("usage");
55 }
57 void
58 threadmain(int argc, char *argv[])
59 {
60 int fd, i, napart, nfinish, maxdisks;
61 u32int bcmem, imem;
62 Config conf;
63 Part *p;
65 maxdisks = 100000;
66 ventifmtinstall();
67 imem = 256*1024*1024;
68 ARGBEGIN{
69 case 'b':
70 bloom = 1;
71 break;
72 case 'd': /* debugging - make sure to run all 3 passes */
73 dumb = 1;
74 break;
75 case 'i':
76 isect = vtrealloc(isect, (nisect+1)*sizeof(isect[0]));
77 isect[nisect++] = EARGF(usage());
78 break;
79 case 'M':
80 imem = unittoull(EARGF(usage()));
81 break;
82 case 'm': /* temporary - might go away */
83 maxdisks = atoi(EARGF(usage()));
84 break;
85 default:
86 usage();
87 break;
88 }ARGEND
90 if(argc != 1)
91 usage();
93 if(initventi(argv[0], &conf) < 0)
94 sysfatal("can't init venti: %r");
95 ix = mainindex;
96 if(nisect == 0 && ix->bloom)
97 bloom = 1;
98 if(bloom && ix->bloom && resetbloom(ix->bloom) < 0)
99 sysfatal("loadbloom: %r");
100 if(bloom && !ix->bloom)
101 sysfatal("-b specified but no bloom filter");
102 if(!bloom)
103 ix->bloom = nil;
104 isectmem = imem/ix->nsects;
106 /*
107 * safety first - only need read access to arenas
108 */
109 p = nil;
110 for(i=0; i<ix->narenas; i++){
111 if(ix->arenas[i]->part != p){
112 p = ix->arenas[i]->part;
113 if((fd = open(p->filename, OREAD)) < 0)
114 sysfatal("cannot reopen %s: %r", p->filename);
115 dup(fd, p->fd);
116 close(fd);
120 /*
121 * need a block for every arena
122 */
123 bcmem = maxblocksize * (mainindex->narenas + 16);
124 if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
125 initdcache(bcmem);
127 totalclumps = 0;
128 for(i=0; i<ix->narenas; i++)
129 totalclumps += ix->arenas[i]->diskstats.clumps;
131 totalbuckets = 0;
132 for(i=0; i<ix->nsects; i++)
133 totalbuckets += ix->sects[i]->blocks;
134 fprint(2, "%,lld clumps, %,lld buckets\n", totalclumps, totalbuckets);
136 /* start index procs */
137 fprint(2, "%T read index\n");
138 isectdonechan = chancreate(sizeof(void*), 1);
139 for(i=0; i<ix->nsects; i++){
140 if(shouldprocess(ix->sects[i])){
141 ix->sects[i]->writechan = chancreate(sizeof(IEntryBuf), 1);
142 vtproc(isectproc, ix->sects[i]);
146 for(i=0; i<nisect; i++)
147 if(isect[i])
148 fprint(2, "warning: did not find index section %s\n", isect[i]);
150 /* start arena procs */
151 p = nil;
152 napart = 0;
153 nfinish = 0;
154 arenadonechan = chancreate(sizeof(void*), 0);
155 for(i=0; i<ix->narenas; i++){
156 if(ix->arenas[i]->part != p){
157 p = ix->arenas[i]->part;
158 vtproc(arenapartproc, p);
159 if(++napart >= maxdisks){
160 recvp(arenadonechan);
161 nfinish++;
166 /* wait for arena procs to finish */
167 for(; nfinish<napart; nfinish++)
168 recvp(arenadonechan);
170 /* tell index procs to finish */
171 for(i=0; i<ix->nsects; i++)
172 if(ix->sects[i]->writechan)
173 send(ix->sects[i]->writechan, nil);
175 /* wait for index procs to finish */
176 for(i=0; i<ix->nsects; i++)
177 if(ix->sects[i]->writechan)
178 recvp(isectdonechan);
180 if(ix->bloom && writebloom(ix->bloom) < 0)
181 fprint(2, "writing bloom filter: %r\n");
183 fprint(2, "%T done arenaentries=%,lld indexed=%,lld (nskip=%,lld)\n",
184 arenaentries, indexentries, skipentries);
185 threadexitsall(nil);
188 static int
189 shouldprocess(ISect *is)
191 int i;
193 if(nisect == 0)
194 return 1;
196 for(i=0; i<nisect; i++)
197 if(isect[i] && strcmp(isect[i], is->name) == 0){
198 isect[i] = nil;
199 return 1;
201 return 0;
204 static void
205 add(u64int *a, u64int n)
207 static Lock l;
209 lock(&l);
210 *a += n;
211 unlock(&l);
214 /*
215 * Read through an arena partition and send each of its IEntries
216 * to the appropriate index section. When finished, send on
217 * arenadonechan.
218 */
219 enum
221 ClumpChunks = 32*1024,
222 };
223 static void
224 arenapartproc(void *v)
226 int i, j, n, nskip, x;
227 u32int clump;
228 u64int addr, tot;
229 Arena *a;
230 ClumpInfo *ci, *cis;
231 IEntry ie;
232 Part *p;
233 IEntryBuf *buf, *b;
234 uchar *score;
235 ScoreBuf sb;
237 p = v;
238 threadsetname("arenaproc %s", p->name);
239 buf = MKNZ(IEntryBuf, ix->nsects);
241 nskip = 0;
242 tot = 0;
243 sb.nscore = 0;
244 cis = MKN(ClumpInfo, ClumpChunks);
245 for(i=0; i<ix->narenas; i++){
246 a = ix->arenas[i];
247 if(a->part != p)
248 continue;
249 if(a->memstats.clumps)
250 fprint(2, "%T arena %s: %d entries\n",
251 a->name, a->memstats.clumps);
252 /*
253 * Running the loop backwards accesses the
254 * clump info blocks forwards, since they are
255 * stored in reverse order at the end of the arena.
256 * This speeds things slightly.
257 */
258 addr = ix->amap[i].start + a->memstats.used;
259 for(clump=a->memstats.clumps; clump > 0; clump-=n){
260 n = ClumpChunks;
261 if(n > clump)
262 n = clump;
263 if(readclumpinfos(a, clump-n, cis, n) != n){
264 fprint(2, "%T arena %s: directory read: %r\n", a->name);
265 errors = 1;
266 break;
268 for(j=n-1; j>=0; j--){
269 ci = &cis[j];
270 ie.ia.type = ci->type;
271 ie.ia.size = ci->uncsize;
272 addr -= ci->size + ClumpSize;
273 ie.ia.addr = addr;
274 ie.ia.blocks = (ci->size + ClumpSize + (1<<ABlockLog)-1) >> ABlockLog;
275 scorecp(ie.score, ci->score);
276 if(ci->type == VtCorruptType)
277 nskip++;
278 else{
279 tot++;
280 x = indexsect(ix, ie.score);
281 assert(0 <= x && x < ix->nsects);
282 if(ix->sects[x]->writechan) {
283 b = &buf[x];
284 b->ie[b->nie] = ie;
285 b->nie++;
286 if(b->nie == nelem(b->ie)) {
287 send(ix->sects[x]->writechan, b);
288 b->nie = 0;
291 if(ix->bloom) {
292 score = sb.score[sb.nscore++];
293 scorecp(score, ie.score);
294 if(sb.nscore == nelem(sb.score)) {
295 markbloomfiltern(ix->bloom, sb.score, sb.nscore);
296 sb.nscore = 0;
302 if(addr != ix->amap[i].start)
303 fprint(2, "%T arena %s: clump miscalculation %lld != %lld\n", a->name, addr, ix->amap[i].start);
305 add(&arenaentries, tot);
306 add(&skipentries, nskip);
308 for(i=0; i<ix->nsects; i++)
309 if(ix->sects[i]->writechan && buf[i].nie > 0)
310 send(ix->sects[i]->writechan, &buf[i]);
311 free(buf);
312 free(cis);
313 if(ix->bloom && sb.nscore > 0)
314 markbloomfiltern(ix->bloom, sb.score, sb.nscore);
315 sendp(arenadonechan, p);
318 /*
319 * Convert score into relative bucket number in isect.
320 * Can pass a packed ientry instead of score - score is first.
321 */
322 static u32int
323 score2bucket(ISect *is, uchar *score)
325 u32int b;
327 b = hashbits(score, 32)/ix->div;
328 if(b < is->start || b >= is->stop){
329 fprint(2, "score2bucket: score=%V div=%d b=%ud start=%ud stop=%ud\n",
330 score, ix->div, b, is->start, is->stop);
332 assert(is->start <= b && b < is->stop);
333 return b - is->start;
336 /*
337 * Convert offset in index section to bucket number.
338 */
339 static u32int
340 offset2bucket(ISect *is, u64int offset)
342 u32int b;
344 assert(is->blockbase <= offset);
345 offset -= is->blockbase;
346 b = offset/is->blocksize;
347 assert(b < is->stop-is->start);
348 return b;
351 /*
352 * Convert bucket number to offset.
353 */
354 static u64int
355 bucket2offset(ISect *is, u32int b)
357 assert(b <= is->stop-is->start);
358 return is->blockbase + (u64int)b*is->blocksize;
361 /*
362 * IEntry buffers to hold initial round of spraying.
363 */
364 typedef struct Buf Buf;
365 struct Buf
367 Part *part; /* partition being written */
368 uchar *bp; /* current block */
369 uchar *ep; /* end of block */
370 uchar *wp; /* write position in block */
371 u64int boffset; /* start offset */
372 u64int woffset; /* next write offset */
373 u64int eoffset; /* end offset */
374 u32int nentry; /* number of entries written */
375 };
377 static void
378 bflush(Buf *buf)
380 u32int bufsize;
382 if(buf->woffset >= buf->eoffset)
383 sysfatal("buf index chunk overflow - need bigger index");
384 bufsize = buf->ep - buf->bp;
385 if(writepart(buf->part, buf->woffset, buf->bp, bufsize) < 0){
386 fprint(2, "write %s: %r\n", buf->part->name);
387 errors = 1;
389 buf->woffset += bufsize;
390 memset(buf->bp, 0, bufsize);
391 buf->wp = buf->bp;
394 static void
395 bwrite(Buf *buf, IEntry *ie)
397 if(buf->wp+IEntrySize > buf->ep)
398 bflush(buf);
399 assert(buf->bp <= buf->wp && buf->wp < buf->ep);
400 packientry(ie, buf->wp);
401 buf->wp += IEntrySize;
402 assert(buf->bp <= buf->wp && buf->wp <= buf->ep);
403 buf->nentry++;
406 /*
407 * Minibuffer. In-memory data structure holds our place
408 * in the buffer but has no block data. We are writing and
409 * reading the minibuffers at the same time. (Careful!)
410 */
411 typedef struct Minibuf Minibuf;
412 struct Minibuf
414 u64int boffset; /* start offset */
415 u64int roffset; /* read offset */
416 u64int woffset; /* write offset */
417 u64int eoffset; /* end offset */
418 u32int nentry; /* # entries left to read */
419 u32int nwentry; /* # entries written */
420 };
422 /*
423 * Index entry pool. Used when trying to shuffle around
424 * the entries in a big buffer into the corresponding M minibuffers.
425 * Sized to hold M*EntriesPerBlock entries, so that there will always
426 * either be room in the pool for another block worth of entries
427 * or there will be an entire block worth of sorted entries to
428 * write out.
429 */
430 typedef struct IEntryLink IEntryLink;
431 typedef struct IPool IPool;
433 struct IEntryLink
435 uchar ie[IEntrySize]; /* raw IEntry */
436 IEntryLink *next; /* next in chain */
437 };
439 struct IPool
441 ISect *isect;
442 u32int buck0; /* first bucket in pool */
443 u32int mbufbuckets; /* buckets per minibuf */
444 IEntryLink *entry; /* all IEntryLinks */
445 u32int nentry; /* # of IEntryLinks */
446 IEntryLink *free; /* free list */
447 u32int nfree; /* # on free list */
448 Minibuf *mbuf; /* all minibufs */
449 u32int nmbuf; /* # of minibufs */
450 IEntryLink **mlist; /* lists for each minibuf */
451 u32int *mcount; /* # on each mlist[i] */
452 u32int bufsize; /* block buffer size */
453 uchar *rbuf; /* read buffer */
454 uchar *wbuf; /* write buffer */
455 u32int epbuf; /* entries per block buffer */
456 };
458 /*
459 static int
460 countsokay(IPool *p)
462 int i;
463 u64int n;
465 n = 0;
466 for(i=0; i<p->nmbuf; i++)
467 n += p->mcount[i];
468 n += p->nfree;
469 if(n != p->nentry){
470 print("free %ud:", p->nfree);
471 for(i=0; i<p->nmbuf; i++)
472 print(" %ud", p->mcount[i]);
473 print(" = %lld nentry: %ud\n", n, p->nentry);
475 return n == p->nentry;
477 */
479 static IPool*
480 mkipool(ISect *isect, Minibuf *mbuf, u32int nmbuf,
481 u32int mbufbuckets, u32int bufsize)
483 u32int i, nentry;
484 uchar *data;
485 IPool *p;
486 IEntryLink *l;
488 nentry = (nmbuf+1)*bufsize / IEntrySize;
489 p = ezmalloc(sizeof(IPool)
490 +nentry*sizeof(IEntry)
491 +nmbuf*sizeof(IEntryLink*)
492 +nmbuf*sizeof(u32int)
493 +3*bufsize);
495 p->isect = isect;
496 p->mbufbuckets = mbufbuckets;
497 p->bufsize = bufsize;
498 p->entry = (IEntryLink*)(p+1);
499 p->nentry = nentry;
500 p->mlist = (IEntryLink**)(p->entry+nentry);
501 p->mcount = (u32int*)(p->mlist+nmbuf);
502 p->nmbuf = nmbuf;
503 p->mbuf = mbuf;
504 data = (uchar*)(p->mcount+nmbuf);
505 data += bufsize - (uintptr)data%bufsize;
506 p->rbuf = data;
507 p->wbuf = data+bufsize;
508 p->epbuf = bufsize/IEntrySize;
510 for(i=0; i<p->nentry; i++){
511 l = &p->entry[i];
512 l->next = p->free;
513 p->free = l;
514 p->nfree++;
516 return p;
519 /*
520 * Add the index entry ie to the pool p.
521 * Caller must know there is room.
522 */
523 static void
524 ipoolinsert(IPool *p, uchar *ie)
526 u32int buck, x;
527 IEntryLink *l;
529 assert(p->free != nil);
531 buck = score2bucket(p->isect, ie);
532 x = (buck-p->buck0) / p->mbufbuckets;
533 if(x >= p->nmbuf){
534 fprint(2, "buck=%ud mbufbucket=%ud x=%ud\n",
535 buck, p->mbufbuckets, x);
537 assert(x < p->nmbuf);
539 l = p->free;
540 p->free = l->next;
541 p->nfree--;
542 memmove(l->ie, ie, IEntrySize);
543 l->next = p->mlist[x];
544 p->mlist[x] = l;
545 p->mcount[x]++;
548 /*
549 * Pull out a block containing as many
550 * entries as possible for minibuffer x.
551 */
552 static u32int
553 ipoolgetbuf(IPool *p, u32int x)
555 uchar *bp, *ep, *wp;
556 IEntryLink *l;
557 u32int n;
559 bp = p->wbuf;
560 ep = p->wbuf + p->bufsize;
561 n = 0;
562 assert(x < p->nmbuf);
563 for(wp=bp; wp+IEntrySize<=ep && p->mlist[x]; wp+=IEntrySize){
564 l = p->mlist[x];
565 p->mlist[x] = l->next;
566 p->mcount[x]--;
567 memmove(wp, l->ie, IEntrySize);
568 l->next = p->free;
569 p->free = l;
570 p->nfree++;
571 n++;
573 memset(wp, 0, ep-wp);
574 return n;
577 /*
578 * Read a block worth of entries from the minibuf
579 * into the pool. Caller must know there is room.
580 */
581 static void
582 ipoolloadblock(IPool *p, Minibuf *mb)
584 u32int i, n;
586 assert(mb->nentry > 0);
587 assert(mb->roffset >= mb->woffset);
588 assert(mb->roffset < mb->eoffset);
590 n = p->bufsize/IEntrySize;
591 if(n > mb->nentry)
592 n = mb->nentry;
593 if(readpart(p->isect->part, mb->roffset, p->rbuf, p->bufsize) < 0)
594 fprint(2, "readpart %s: %r\n", p->isect->part->name);
595 else{
596 for(i=0; i<n; i++)
597 ipoolinsert(p, p->rbuf+i*IEntrySize);
599 mb->nentry -= n;
600 mb->roffset += p->bufsize;
603 /*
604 * Write out a block worth of entries to minibuffer x.
605 * If necessary, pick up the data there before overwriting it.
606 */
607 static void
608 ipoolflush0(IPool *pool, u32int x)
610 u32int bufsize;
611 Minibuf *mb;
613 mb = pool->mbuf+x;
614 bufsize = pool->bufsize;
615 mb->nwentry += ipoolgetbuf(pool, x);
616 if(mb->nentry > 0 && mb->roffset == mb->woffset){
617 assert(pool->nfree >= pool->bufsize/IEntrySize);
618 /*
619 * There will be room in the pool -- we just
620 * removed a block worth.
621 */
622 ipoolloadblock(pool, mb);
624 if(writepart(pool->isect->part, mb->woffset, pool->wbuf, bufsize) < 0)
625 fprint(2, "writepart %s: %r\n", pool->isect->part->name);
626 mb->woffset += bufsize;
629 /*
630 * Write out some full block of entries.
631 * (There must be one -- the pool is almost full!)
632 */
633 static void
634 ipoolflush1(IPool *pool)
636 u32int i;
638 assert(pool->nfree <= pool->epbuf);
640 for(i=0; i<pool->nmbuf; i++){
641 if(pool->mcount[i] >= pool->epbuf){
642 ipoolflush0(pool, i);
643 return;
646 /* can't be reached - someone must be full */
647 sysfatal("ipoolflush1");
650 /*
651 * Flush all the entries in the pool out to disk.
652 * Nothing more to read from disk.
653 */
654 static void
655 ipoolflush(IPool *pool)
657 u32int i;
659 for(i=0; i<pool->nmbuf; i++)
660 while(pool->mlist[i])
661 ipoolflush0(pool, i);
662 assert(pool->nfree == pool->nentry);
665 /*
666 * Third pass. Pick up each minibuffer from disk into
667 * memory and then write out the buckets.
668 */
670 /*
671 * Compare two packed index entries.
672 * Usual ordering except break ties by putting higher
673 * index addresses first (assumes have duplicates
674 * due to corruption in the lower addresses).
675 */
676 static int
677 ientrycmpaddr(const void *va, const void *vb)
679 int i;
680 uchar *a, *b;
682 a = (uchar*)va;
683 b = (uchar*)vb;
684 i = ientrycmp(a, b);
685 if(i)
686 return i;
687 return -memcmp(a+IEntryAddrOff, b+IEntryAddrOff, 8);
690 static void
691 zerorange(Part *p, u64int o, u64int e)
693 static uchar zero[MaxIoSize];
694 u32int n;
696 for(; o<e; o+=n){
697 n = sizeof zero;
698 if(o+n > e)
699 n = e-o;
700 if(writepart(p, o, zero, n) < 0)
701 fprint(2, "writepart %s: %r\n", p->name);
705 /*
706 * Load a minibuffer into memory and write out the
707 * corresponding buckets.
708 */
709 static void
710 sortminibuffer(ISect *is, Minibuf *mb, uchar *buf, u32int nbuf, u32int bufsize)
712 uchar *buckdata, *p, *q, *ep;
713 u32int b, lastb, memsize, n;
714 u64int o;
715 IBucket ib;
716 Part *part;
718 part = is->part;
719 buckdata = emalloc(is->blocksize);
721 if(mb->nwentry == 0)
722 return;
724 /*
725 * read entire buffer.
726 */
727 assert(mb->nwentry*IEntrySize <= mb->woffset-mb->boffset);
728 assert(mb->woffset-mb->boffset <= nbuf);
729 if(readpart(part, mb->boffset, buf, mb->woffset-mb->boffset) < 0){
730 fprint(2, "readpart %s: %r\n", part->name);
731 errors = 1;
732 return;
734 assert(*(uint*)buf != 0xa5a5a5a5);
736 /*
737 * remove fragmentation due to IEntrySize
738 * not evenly dividing Bufsize
739 */
740 memsize = (bufsize/IEntrySize)*IEntrySize;
741 for(o=mb->boffset, p=q=buf; o<mb->woffset; o+=bufsize){
742 memmove(p, q, memsize);
743 p += memsize;
744 q += bufsize;
746 ep = buf + mb->nwentry*IEntrySize;
747 assert(ep <= buf+nbuf);
749 /*
750 * sort entries
751 */
752 qsort(buf, mb->nwentry, IEntrySize, ientrycmpaddr);
754 /*
755 * write buckets out
756 */
757 n = 0;
758 lastb = offset2bucket(is, mb->boffset);
759 for(p=buf; p<ep; p=q){
760 b = score2bucket(is, p);
761 for(q=p; q<ep && score2bucket(is, q)==b; q+=IEntrySize)
763 if(lastb+1 < b && zero)
764 zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, b));
765 if(IBucketSize+(q-p) > is->blocksize)
766 sysfatal("bucket overflow - make index bigger");
767 memmove(buckdata+IBucketSize, p, q-p);
768 ib.n = (q-p)/IEntrySize;
769 n += ib.n;
770 packibucket(&ib, buckdata, is->bucketmagic);
771 if(writepart(part, bucket2offset(is, b), buckdata, is->blocksize) < 0)
772 fprint(2, "write %s: %r\n", part->name);
773 lastb = b;
775 if(lastb+1 < is->stop-is->start && zero)
776 zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, is->stop - is->start));
778 if(n != mb->nwentry)
779 fprint(2, "sortminibuffer bug: n=%ud nwentry=%ud have=%ld\n", n, mb->nwentry, (ep-buf)/IEntrySize);
781 free(buckdata);
784 static void
785 isectproc(void *v)
787 u32int buck, bufbuckets, bufsize, epbuf, i, j;
788 u32int mbufbuckets, n, nbucket, nn, space;
789 u32int nbuf, nminibuf, xminiclump, prod;
790 u64int blocksize, offset, xclump;
791 uchar *data, *p;
792 Buf *buf;
793 IEntry ie;
794 IEntryBuf ieb;
795 IPool *ipool;
796 ISect *is;
797 Minibuf *mbuf, *mb;
799 is = v;
800 blocksize = is->blocksize;
801 nbucket = is->stop - is->start;
803 /*
804 * Three passes:
805 * pass 1 - write index entries from arenas into
806 * large sequential sections on index disk.
807 * requires nbuf * bufsize memory.
809 * pass 2 - split each section into minibufs.
810 * requires nminibuf * bufsize memory.
812 * pass 3 - read each minibuf into memory and
813 * write buckets out.
814 * requires entries/minibuf * IEntrySize memory.
816 * The larger we set bufsize the less seeking hurts us.
818 * The fewer sections and minibufs we have, the less
819 * seeking hurts us.
821 * The fewer sections and minibufs we have, the
822 * more entries we end up with in each minibuf
823 * at the end.
825 * Shoot for using half our memory to hold each
826 * minibuf. The chance of a random distribution
827 * getting off by 2x is quite low.
829 * Once that is decided, figure out the smallest
830 * nminibuf and nsection/biggest bufsize we can use
831 * and still fit in the memory constraints.
832 */
834 /* expected number of clump index entries we'll see */
835 xclump = nbucket * (double)totalclumps/totalbuckets;
837 /* number of clumps we want to see in a minibuf */
838 xminiclump = isectmem/2/IEntrySize;
840 /* total number of minibufs we need */
841 prod = (xclump+xminiclump-1) / xminiclump;
843 /* if possible, skip second pass */
844 if(!dumb && prod*MinBufSize < isectmem){
845 nbuf = prod;
846 nminibuf = 1;
847 }else{
848 /* otherwise use nsection = sqrt(nmini) */
849 for(nbuf=1; nbuf*nbuf<prod; nbuf++)
851 if(nbuf*MinBufSize > isectmem)
852 sysfatal("not enough memory");
853 nminibuf = nbuf;
855 if (nbuf == 0) {
856 fprint(2, "%s: brand-new index, no work to do\n", argv0);
857 threadexitsall(nil);
860 /* size buffer to use extra memory */
861 bufsize = MinBufSize;
862 while(bufsize*2*nbuf <= isectmem && bufsize < MaxBufSize)
863 bufsize *= 2;
864 data = emalloc(nbuf*bufsize);
865 epbuf = bufsize/IEntrySize;
866 fprint(2, "%T %s: %,ud buckets, %,ud groups, %,ud minigroups, %,ud buffer\n",
867 is->part->name, nbucket, nbuf, nminibuf, bufsize);
868 /*
869 * Accept index entries from arena procs.
870 */
871 buf = MKNZ(Buf, nbuf);
872 p = data;
873 offset = is->blockbase;
874 bufbuckets = (nbucket+nbuf-1)/nbuf;
875 for(i=0; i<nbuf; i++){
876 buf[i].part = is->part;
877 buf[i].bp = p;
878 buf[i].wp = p;
879 p += bufsize;
880 buf[i].ep = p;
881 buf[i].boffset = offset;
882 buf[i].woffset = offset;
883 if(i < nbuf-1){
884 offset += bufbuckets*blocksize;
885 buf[i].eoffset = offset;
886 }else{
887 offset = is->blockbase + nbucket*blocksize;
888 buf[i].eoffset = offset;
891 assert(p == data+nbuf*bufsize);
893 n = 0;
894 while(recv(is->writechan, &ieb) == 1){
895 if(ieb.nie == 0)
896 break;
897 for(j=0; j<ieb.nie; j++){
898 ie = ieb.ie[j];
899 buck = score2bucket(is, ie.score);
900 i = buck/bufbuckets;
901 assert(i < nbuf);
902 bwrite(&buf[i], &ie);
903 n++;
906 add(&indexentries, n);
908 nn = 0;
909 for(i=0; i<nbuf; i++){
910 bflush(&buf[i]);
911 buf[i].bp = nil;
912 buf[i].ep = nil;
913 buf[i].wp = nil;
914 nn += buf[i].nentry;
916 if(n != nn)
917 fprint(2, "isectproc bug: n=%ud nn=%ud\n", n, nn);
919 free(data);
921 fprint(2, "%T %s: reordering\n", is->part->name);
923 /*
924 * Rearrange entries into minibuffers and then
925 * split each minibuffer into buckets.
926 * The minibuffer must be sized so that it is
927 * a multiple of blocksize -- ipoolloadblock assumes
928 * that each minibuf starts aligned on a blocksize
929 * boundary.
930 */
931 mbuf = MKN(Minibuf, nminibuf);
932 mbufbuckets = (bufbuckets+nminibuf-1)/nminibuf;
933 while(mbufbuckets*blocksize % bufsize)
934 mbufbuckets++;
935 for(i=0; i<nbuf; i++){
936 /*
937 * Set up descriptors.
938 */
939 n = buf[i].nentry;
940 nn = 0;
941 offset = buf[i].boffset;
942 memset(mbuf, 0, nminibuf*sizeof(mbuf[0]));
943 for(j=0; j<nminibuf; j++){
944 mb = &mbuf[j];
945 mb->boffset = offset;
946 offset += mbufbuckets*blocksize;
947 if(offset > buf[i].eoffset)
948 offset = buf[i].eoffset;
949 mb->eoffset = offset;
950 mb->roffset = mb->boffset;
951 mb->woffset = mb->boffset;
952 mb->nentry = epbuf * (mb->eoffset - mb->boffset)/bufsize;
953 if(mb->nentry > buf[i].nentry)
954 mb->nentry = buf[i].nentry;
955 buf[i].nentry -= mb->nentry;
956 nn += mb->nentry;
958 if(n != nn)
959 fprint(2, "isectproc bug2: n=%ud nn=%ud (i=%d)\n", n, nn, i);;
960 /*
961 * Rearrange.
962 */
963 if(!dumb && nminibuf == 1){
964 mbuf[0].nwentry = mbuf[0].nentry;
965 mbuf[0].woffset = buf[i].woffset;
966 }else{
967 ipool = mkipool(is, mbuf, nminibuf, mbufbuckets, bufsize);
968 ipool->buck0 = bufbuckets*i;
969 for(j=0; j<nminibuf; j++){
970 mb = &mbuf[j];
971 while(mb->nentry > 0){
972 if(ipool->nfree < epbuf){
973 ipoolflush1(ipool);
974 /* ipoolflush1 might change mb->nentry */
975 continue;
977 assert(ipool->nfree >= epbuf);
978 ipoolloadblock(ipool, mb);
981 ipoolflush(ipool);
982 nn = 0;
983 for(j=0; j<nminibuf; j++)
984 nn += mbuf[j].nwentry;
985 if(n != nn)
986 fprint(2, "isectproc bug3: n=%ud nn=%ud (i=%d)\n", n, nn, i);
987 free(ipool);
990 /*
991 * Make buckets.
992 */
993 space = 0;
994 for(j=0; j<nminibuf; j++)
995 if(space < mbuf[j].woffset - mbuf[j].boffset)
996 space = mbuf[j].woffset - mbuf[j].boffset;
998 data = emalloc(space);
999 for(j=0; j<nminibuf; j++){
1000 mb = &mbuf[j];
1001 sortminibuffer(is, mb, data, space, bufsize);
1003 free(data);
1006 sendp(isectdonechan, is);