Commit Diff


commit - 9ffbb5adcaeec878d3b6db0f8b1f654e839b4689
commit + 333c1dccc2f9af67b9c3d8513cca492d022fab4f
blob - 2cfd1fcf2e5b27dba0dd7d9150cc9c8e91a578ef
blob + 318f9ad24a32ab28101e2ec27f252f9c66625e15
--- src/cmd/venti/arena.c
+++ src/cmd/venti/arena.c
@@ -468,18 +468,16 @@ ReadErr:
 int
 wbarena(Arena *arena)
 {
-	ZBlock *b;
+	DBlock *b;
 	int bad;
 
-	b = alloczblock(arena->blocksize, 1);
-	if(b == nil){
+	if((b = getdblock(arena->part, arena->base + arena->size, 0)) == nil){
 		logerr(EAdmin, "can't write arena trailer: %r");
-///ZZZ add error message?
 		return -1;
 	}
-	bad = okarena(arena)<0 || packarena(arena, b->data)<0 || 
-		writepart(arena->part, arena->base + arena->size, b->data, arena->blocksize)<0;
-	freezblock(b);
+	dirtydblock(b, DirtyArenaTrailer);
+	bad = okarena(arena)<0 || packarena(arena, b->data)<0;
+	putdblock(b);
 	if(bad)
 		return -1;
 	return 0;
@@ -502,6 +500,10 @@ wbarenahead(Arena *arena)
 ///ZZZ add error message?
 		return -1;
 	}
+	/*
+	 * this writepart is okay because it only happens
+	 * during initialization.
+	 */
 	bad = packarenahead(&head, b->data)<0 ||
 	      writepart(arena->part, arena->base - arena->blocksize, b->data, arena->blocksize)<0;
 	freezblock(b);
@@ -582,6 +584,7 @@ okarena(Arena *arena)
 static CIBlock*
 getcib(Arena *arena, int clump, int writing, CIBlock *rock)
 {
+	int read;
 	CIBlock *cib;
 	u32int block, off;
 
@@ -613,7 +616,12 @@ getcib(Arena *arena, int clump, int writing, CIBlock *
 
 	cib->block = block;
 	cib->offset = off;
-	cib->data = getdblock(arena->part, arena->base + arena->size - (block + 1) * arena->blocksize, arena->blocksize);
+
+	read = 1;
+	if(writing && off == 0 && clump == arena->clumps-1)
+		read = 0;
+
+	cib->data = getdblock(arena->part, arena->base + arena->size - (block + 1) * arena->blocksize, read);
 	if(cib->data == nil)
 		return nil;
 	return cib;
blob - 33f5950aa75a29e2aaf39d94ca07427e22ba53c2
blob + 1aa81bb1b019fce7913841c1c6828e1fca0a6278
--- src/cmd/venti/clump.c
+++ src/cmd/venti/clump.c
@@ -46,7 +46,7 @@ if(0)print("storeclump %08x %p\n", mainindex->arenas[0
 	scorecp(cl.info.score, sc);
 
 if(0)print("whackblock %08x %p\n", mainindex->arenas[0], &cl);
-	dsize = whackblock(&cb->data[ClumpSize], zb->data, size);
+	dsize=0; // dsize = whackblock(&cb->data[ClumpSize], zb->data, size);
 if(0)print("whackedblock %08x %p\n", mainindex->arenas[0], &cl);
 	if(dsize > 0 && dsize < size){
 		cl.encoding = ClumpECompress;
blob - 49c8b38beb047e2e788b670b275705e187ed3529
blob + 98c935ed16e8c8f1d572d47aec932d7c9091d60b
--- src/cmd/venti/dat.h
+++ src/cmd/venti/dat.h
@@ -124,6 +124,7 @@ enum
 	DirtyIndex,
 	DirtyIndexBitmap,
 	DirtyArenaCib,
+	DirtyArenaTrailer,
 	DirtyMax,
 
 	VentiZZZZZZZZ
@@ -355,6 +356,10 @@ struct Clump
  *	<field name="tabsize" val="s->tabsize" type="U32int"/>
  *	<field name="buckets" val="s->buckets" type="U32int"/>
  *	<field name="buckdiv" val="s->div" type="U32int"/>
+ *	<field name="bitblocks" val="s->div" type="U32int"/>
+ *	<field name="maxdepth" val="s->div" type="U32int"/>
+ *	<field name="bitkeylog" val="s->div" type="U32int"/>
+ *	<field name="bitkeymask" val="s->div" type="U32int"/>
  *	<array name="sect" val="&s->smap[i]" elems="s->nsects" type="Amap"/>
  *	<array name="amap" val="&s->amap[i]" elems="s->narenas" type="Amap"/>
  *	<array name="arena" val="s->arenas[i]" elems="s->narenas" type="Arena"/>
@@ -495,6 +500,7 @@ struct Stats
 	long		indexreads;		/* index from disk */
 	long		indexwreads;		/* for writing a new entry */
 	long		indexareads;		/* for allocating an overflow block */
+	long		indexsplits;		/* index block splits */
 	long		diskwrites;		/* total disk writes */
 	long		diskreads;		/* total disk reads */
 	vlong		diskbwrites;		/* total disk bytes written */
blob - dcb47bcf62c756f40ac18b5c5c0c18bcaea0ab9a
blob + 7e63323249a624bbbc2cc2aca15111c001bb6f07
--- src/cmd/venti/dcache.c
+++ src/cmd/venti/dcache.c
@@ -230,10 +230,38 @@ dirtydblock(DBlock *b, int dirty)
 	int odirty;
 	Part *p;
 
-	rlock(&dcache.dirtylock);
+
 	assert(b->ref != 0);
-	assert(b->dirtying == 0);
-	b->dirtying = 1;
+
+	/*
+ 	 * Because we use an rlock to keep track of how
+	 * many blocks are being dirtied (and a wlock to
+	 * stop dirtying), we cannot try to dirty two blocks
+	 * at the same time in the same thread -- if the 
+	 * wlock happens between them, we get a deadlock.
+	 * 
+	 * The only place in the code where we need to
+	 * dirty multiple blocks at once is in splitiblock, when
+	 * we split an index block.  The new block has a dirty
+	 * flag of DirtyIndexSplit already, because it has to get
+	 * written to disk before the old block (DirtyIndex).
+	 * So when we see the DirtyIndexSplit block, we don't
+	 * acquire the rlock (and we don't set dirtying, so putdblock
+	 * won't drop the rlock).  This kludginess means that
+	 * the caller will have to be sure to putdblock on the 
+	 * new block before putdblock on the old block.
+	 */
+	if(dirty == DirtyIndexSplit){
+		/* caller should have another dirty block */
+		assert(!canwlock(&dcache.dirtylock));
+		/* this block should be clean */
+		assert(b->dirtying == 0);
+		assert(b->dirty == 0);
+	}else{
+		rlock(&dcache.dirtylock);
+		assert(b->dirtying == 0);
+		b->dirtying = 1;
+	}
 
 	qlock(&stats.lock);
 	if(b->dirty)
@@ -247,12 +275,13 @@ dirtydblock(DBlock *b, int dirty)
 	 * blocks.  Only clean blocks ever get marked DirtyIndexSplit,
 	 * though, so we don't need the opposite conjunction here.
 	 */
+	odirty = b->dirty;
 	if(b->dirty)
 		assert(b->dirty == dirty
 			|| (b->dirty==DirtyIndexSplit && dirty==DirtyIndex));
+	else
+		b->dirty = dirty;
 
-	odirty = b->dirty;
-	b->dirty = dirty;
 	p = b->part;
 	if(p->writechan == nil){
 fprint(2, "allocate write proc for part %s\n", p->name);
@@ -304,6 +333,8 @@ bumpdblock(void)
 			break;
 	}
 
+	fprint(2, "bump %s at %llud\n", b->part->name, b->addr);
+
 	/*
 	 * unchain the block
 	 */
@@ -541,6 +572,7 @@ static void
 flushproc(void *v)
 {
 	int i, j, n;
+	ulong t0;
 	DBlock *b, **write;
 
 	USED(v);
@@ -551,7 +583,8 @@ flushproc(void *v)
 		rsleep(&dcache.flush);
 		qunlock(&dcache.lock);
 
-		fprint(2, "flushing dcache\n");
+		fprint(2, "flushing dcache: t=0 ms\n");
+		t0 = nsec()/1000;
 
 		/*
 		 * Because we don't record any dependencies at all, we must write out
@@ -568,10 +601,10 @@ flushproc(void *v)
 		 * finishes with them.
 		 */ 
 	
-		fprint(2, "flushproc: wlock\n");
+		fprint(2, "flushproc: wlock at t=%lud\n", (ulong)(nsec()/1000) - t0);
 		wlock(&dcache.dirtylock);
 
-		fprint(2, "flushproc: build list\n");
+		fprint(2, "flushproc: build list at t=%lud\n", (ulong)(nsec()/1000) - t0);
 		write = dcache.write;
 		n = 0;
 		for(i=0; i<dcache.nblocks; i++){
@@ -583,12 +616,15 @@ flushproc(void *v)
 		qsort(write, n, sizeof(write[0]), writeblockcmp);
 
 		/* Write each stage of blocks out. */
+		fprint(2, "flushproc: write blocks at t=%lud\n", (ulong)(nsec()/1000) - t0);
 		i = 0;
-		for(j=1; j<DirtyMax; j++)
+		for(j=1; j<DirtyMax; j++){
+			fprint(2, "flushproc: flush dirty %d at t=%lud\n", j, (ulong)(nsec()/1000) - t0);
 			i += parallelwrites(write+i, write+n, j);
+		}
 		assert(i == n);
 
-		fprint(2, "flushproc: update dirty bits\n");
+		fprint(2, "flushproc: update dirty bits at t=%lud\n", (ulong)(nsec()/1000) - t0);
 		qlock(&dcache.lock);
 		for(i=0; i<n; i++){
 			b = write[i];
@@ -602,6 +638,8 @@ flushproc(void *v)
 		qunlock(&dcache.lock);
 		wunlock(&dcache.dirtylock);
 
+		fprint(2, "flushproc: done at t=%lud\n", (ulong)(nsec()/1000) - t0);
+
 		qlock(&stats.lock);
 		stats.dcacheflushes++;
 		stats.dcacheflushwrites += n;
blob - 19daa8e1dec0daa099656080dbc067382c3707d7
blob + 6934e3d5da2d90f2fa24861ae959b5d6b1aa311d
--- src/cmd/venti/fmtindex.c
+++ src/cmd/venti/fmtindex.c
@@ -66,9 +66,6 @@ threadmain(int argc, char *argv[])
 		for(i = 0; i < ix->nsects; i++)
 			n += ix->sects[i]->blocks;
 
-		if(ix->div < 100)
-			sysfatal("index divisor too coarse: use bigger block size");
-
 		fprint(2, "using %ud buckets of %ud; div=%d\n", ix->buckets, n, ix->div);
 	}
 	amap = MKNZ(AMap, narenas);
@@ -105,6 +102,8 @@ threadmain(int argc, char *argv[])
 	}
 	fprint(2, "configured index=%s with arenas=%d and storage=%lld\n",
 		ix->name, n, addr - IndexBase);
+	fprint(2, "\tbitblocks=%d maxdepth=%d buckets=%d\n",
+		ix->bitblocks, ix->maxdepth, ix->buckets);
 
 	ix->amap = amap;
 	ix->arenas = arenas;
blob - 859c106e564fd70e7521aa4d1cf7f122b7b15ab7
blob + 6bd637188235a79ee484e771669b747680e6cebf
--- src/cmd/venti/httpd.c
+++ src/cmd/venti/httpd.c
@@ -258,6 +258,7 @@ estats(HConnect *c)
 	hprint(hout, "index disk reads=%,ld\n", stats.indexreads);
 	hprint(hout, "index disk reads for modify=%,ld\n", stats.indexwreads);
 	hprint(hout, "index disk reads for allocation=%,ld\n", stats.indexareads);
+	hprint(hout, "index block splits=%,ld\n", stats.indexsplits);
 
 	hprint(hout, "index cache lookups=%,ld\n", stats.iclookups);
 	hprint(hout, "index cache hits=%,ld %d%%\n", stats.ichits,
@@ -277,7 +278,8 @@ estats(HConnect *c)
 
 	hprint(hout, "disk cache flushes=%,ld\n", stats.dcacheflushes);
 	hprint(hout, "disk cache flush writes=%,ld (%,ld per flush)\n", 
-		stats.dcacheflushwrites, stats.dcacheflushwrites/stats.dcacheflushes);
+		stats.dcacheflushwrites,
+		stats.dcacheflushwrites/(stats.dcacheflushes ? stats.dcacheflushes : 1));
 
 	hprint(hout, "disk writes=%,ld\n", stats.diskwrites);
 	hprint(hout, "disk bytes written=%,lld\n", stats.diskbwrites);
@@ -368,7 +370,7 @@ dindex(HConnect *c)
 		ix->name, ix->version, ix->blocksize, ix->tabsize);
 	hprint(hout, "\tbuckets=%d div=%d\n", ix->buckets, ix->div);
 	for(i = 0; i < ix->nsects; i++)
-		hprint(hout, "\tsect=%s for buckets [%lld,%lld)\n", ix->smap[i].name, ix->smap[i].start, ix->smap[i].stop);
+		hprint(hout, "\tsect=%s for buckets [%lld,%lld) buckmax=%d\n", ix->smap[i].name, ix->smap[i].start, ix->smap[i].stop, ix->sects[i]->buckmax);
 	for(i = 0; i < ix->narenas; i++){
 		if(ix->arenas[i] != nil && ix->arenas[i]->clumps != 0){
 			hprint(hout, "arena=%s at index [%lld,%lld)\n\t", ix->amap[i].name, ix->amap[i].start, ix->amap[i].stop);
blob - 168b13f6b146d41ed017d5785241838ec1d0870f
blob + c1fc141888576f502526ea8201a4fac56b99cd95
--- src/cmd/venti/index.c
+++ src/cmd/venti/index.c
@@ -116,6 +116,9 @@ static int	writebucket(ISect *is, u32int buck, IBucket
 static int	okibucket(IBucket *ib, ISect *is);
 static int	initindex1(Index*);
 static ISect	*initisect1(ISect *is);
+static int	splitiblock(Index *ix, DBlock *b, ISect *is, u32int buck, IBucket *ib);
+
+#define KEY(k,d)	((d) ? (k)>>(32-(d)) : 0)
 
 //static QLock	indexlock;	//ZZZ
 
@@ -763,6 +766,7 @@ storeientry(Index *ix, IEntry *ie)
 	stats.indexwreads++;
 	qunlock(&stats.lock);
 
+top:
 	b = loadibucket(ix, ie->score, &is, &buck, &ib);
 	if(b == nil)
 		return -1;
@@ -786,8 +790,20 @@ storeientry(Index *ix, IEntry *ie)
 
 		packientry(ie, &ib.data[h]);
 		ok = writebucket(is, buck, &ib, b);
+		goto out;
+	}
+
+	/* block is full -- not supposed to happen in V1 */
+	if(ix->version == IndexVersion1){
+		seterr(EAdmin, "index bucket %ud on %s is full\n", buck, is->part->name);
+		ok = -1;
 		goto out;
 	}
+
+	/* in V2, split the block and try again; splitiblock puts b */
+	if(splitiblock(ix, b, is, buck, &ib) < 0)
+		return -1;
+	goto top;
 
 out:
 	putdblock(b);
@@ -797,7 +813,7 @@ out:
 static int
 writebucket(ISect *is, u32int buck, IBucket *ib, DBlock *b)
 {
-	assert(b->dirty == DirtyIndex);
+	assert(b->dirty == DirtyIndex || b->dirty == DirtyIndexSplit);
 
 	if(buck >= is->blocks){
 		seterr(EAdmin, "index write out of bounds: %d >= %d\n",
@@ -919,7 +935,7 @@ indexsect0(Index *ix, u32int buck)
  * load the index block at bucket #buck
  */
 static DBlock*
-loadibucket0(Index *ix, u32int buck, ISect **pis, u32int *pbuck, IBucket *ib)
+loadibucket0(Index *ix, u32int buck, ISect **pis, u32int *pbuck, IBucket *ib, int read)
 {
 	ISect *is;
 	DBlock *b;
@@ -931,12 +947,15 @@ loadibucket0(Index *ix, u32int buck, ISect **pis, u32i
 	}
 
 	buck -= is->start;
-	if((b = getdblock(is->part, is->blockbase + ((u64int)buck << is->blocklog), 1)) == nil)
+	if((b = getdblock(is->part, is->blockbase + ((u64int)buck << is->blocklog), read)) == nil)
 		return nil;
 
-	*pis = is;
-	*pbuck = buck;
-	unpackibucket(ib, b->data);
+	if(pis)
+		*pis = is;
+	if(pbuck)
+		*pbuck = buck;
+	if(ib)
+		unpackibucket(ib, b->data);
 	return b;
 }
 
@@ -955,14 +974,16 @@ indexsect1(Index *ix, u8int *score)
 static DBlock*
 loadibucket1(Index *ix, u8int *score, ISect **pis, u32int *pbuck, IBucket *ib)
 {
-	return loadibucket0(ix, hashbits(score, 32)/ix->div, pis, pbuck, ib);
+	return loadibucket0(ix, hashbits(score, 32)/ix->div, pis, pbuck, ib, 1);
 }
 
 static u32int
 keytobuck(Index *ix, u32int key, int d)
 {
-	/* clear all but top d bits */
-	if(d != 32)
+	/* clear all but top d bits; can't depend on boundary case shifts */
+	if(d == 0)
+		key = 0;
+	else if(d != 32)
 		key &= ~((1<<(32-d))-1);
 
 	/* truncate to maxdepth bits */
@@ -981,27 +1002,33 @@ static int
 bitmapop(Index *ix, u32int key, int d, int set)
 {
 	DBlock *b;
-	ISect *is;
-	IBucket ib;
-	u32int buck;
 	int inuse;
+	u32int key1, buck1;
 
 	if(d >= ix->maxdepth)
 		return 0;
 
-	/* construct .xxx1 in bucket number format */
-	key = keytobuck(ix, key, d) | (1<<(ix->maxdepth-d-1));
 
-	/* check whether key (now the bucket number for .xxx1) is in use */
+	/* construct .xxx1 in bucket number format */
+	key1 = key | (1<<(32-d-1));
+	buck1 = keytobuck(ix, key1, d+1);
 
-	if((b = loadibucket0(ix, key >> ix->bitkeylog, &is, &buck, &ib)) == nil){
+if(0) fprint(2, "key %d/%0*ub key1 %d/%0*ub buck1 %08ux\n",
+	d, d, KEY(key, d), d+1, d+1, KEY(key1, d+1), buck1);
+
+	/* check whether buck1 is in use */
+
+	if((b = loadibucket0(ix, buck1 >> ix->bitkeylog, nil, nil, nil, 1)) == nil){
 		seterr(ECorrupt, "cannot load in-use bitmap block");
+fprint(2, "loadibucket: %r\n");
 		return -1;
 	}
-	inuse = ((u32int*)b->data)[(key & ix->bitkeymask)>>5] & (1<<(key&31));
+if(0) fprint(2, "buck1 %08ux bitkeymask %08ux bitkeylog %d\n", buck1, ix->bitkeymask, ix->bitkeylog);
+	buck1 &= ix->bitkeymask;
+	inuse = ((u32int*)b->data)[buck1>>5] & (1<<(buck1&31));
 	if(set && !inuse){
 		dirtydblock(b, DirtyIndexBitmap);
-		((u32int*)b->data)[(key & ix->bitkeymask)>>5] |= (1<<(key&31));
+		((u32int*)b->data)[buck1>>5] |= (1<<(buck1&31));
 	}
 	putdblock(b);
 	return inuse;
@@ -1073,13 +1100,15 @@ loadibucket2(Index *ix, u8int *score, ISect **pis, u32
 			return nil;
 		}
 
-		if((b = loadibucket0(ix, keytobuck(ix, key, d), pis, pbuck, ib)) == nil)
+		if((b = loadibucket0(ix, keytobuck(ix, key, d), pis, pbuck, ib, 1)) == nil)
 			return nil;
 
 		if(ib->depth == d)
 			return b;
 
 		if(ib->depth < d){
+			fprint(2, "loaded block %ud for %d/%0*ub got %d/%0*ub\n",
+				*pbuck, d, d, KEY(key,d), ib->depth, ib->depth, KEY(key, ib->depth));
 			seterr(EBug, "index block has smaller depth than expected -- cannot happen");
 			putdblock(b);
 			return nil;
@@ -1087,8 +1116,12 @@ loadibucket2(Index *ix, u8int *score, ISect **pis, u32
 
 		/*
 		 * ib->depth > d, meaning the bitmap was out of date.
-		 * fix the bitmap and try again.
+		 * fix the bitmap and try again.  if we actually updated
+		 * the bitmap in splitiblock, this would only happen if
+		 * venti crashed at an inopportune moment.  but this way
+		 * the code gets tested more.
 		 */
+if(0) fprint(2, "update bitmap: %d/%0*ub is split\n", d, d, KEY(key,d));
 		putdblock(b);
 		if(marksplit(ix, key, d) < 0)
 			return nil;
@@ -1097,6 +1130,110 @@ loadibucket2(Index *ix, u8int *score, ISect **pis, u32
 	return nil;
 }
 
+static int
+splitiblock(Index *ix, DBlock *b, ISect *is, u32int buck, IBucket *ib)
+{
+	int i, d;
+	u8int *score;
+	u32int buck0, buck1, key0, key1, key, dmask;
+	DBlock *b0, *b1;
+	IBucket ib0, ib1;
+
+	if(ib->depth == ix->maxdepth){
+if(0) fprint(2, "depth=%d == maxdepth\n", ib->depth);
+		seterr(EAdmin, "index bucket %ud on %s is full\n", buck, is->part->name);
+		putdblock(b);
+		return -1;
+	}
+
+	buck = is->start+buck - ix->bitblocks;
+	d = ib->depth+1;
+	buck0 = buck;
+	buck1 = buck0 | (1<<(ix->maxdepth-d));
+	if(ix->maxdepth == 32){
+		key0 = buck0;
+		key1 = buck1;
+	}else{
+		key0 = buck0 << (32-ix->maxdepth);
+		key1 = buck1 << (32-ix->maxdepth);
+	}
+	buck0 += ix->bitblocks;
+	buck1 += ix->bitblocks;
+	USED(buck0);
+	USED(key1);
+
+	if(d == 32)
+		dmask = TWID32;
+	else
+		dmask = TWID32 ^ ((1<<(32-d))-1);
+
+	/*
+	 * Since we hold the lock for b, the bitmap
+	 * thinks buck1 doesn't exist, and the bit
+	 * for buck1 can't be updated without first
+	 * locking and splitting b, it's safe to try to
+	 * acquire the lock on buck1 without dropping b.
+	 * No one else will go after it too.
+	 *
+	 * Also, none of the rest of the code ever locks
+	 * more than one block at a time, so it's okay if
+	 * we do.
+	 */
+	if((b1 = loadibucket0(ix, buck1, nil, nil, &ib1, 0)) == nil){
+		putdblock(b);
+		return -1;
+	}
+	b0 = b;
+	ib0 = *ib;
+
+	/*
+	 * Funny locking going on here -- see dirtydblock.
+	 * We must putdblock(b1) before putdblock(b0).
+	 */
+	dirtydblock(b0, DirtyIndex);
+	dirtydblock(b1, DirtyIndexSplit);
+
+	/*
+	 * Split the block contents.
+	 * The block is already sorted, so it's pretty easy:
+	 * the first part stays, the second part goes to b1.
+	 */
+	ib0.n = 0;
+	ib0.depth = d;
+	ib1.n = 0;
+	ib1.depth = d;
+	for(i=0; i<ib->n; i++){
+		score = ib->data+i*IEntrySize;
+		key = hashbits(score, 32);
+		if((key&dmask) != key0)
+			break;
+	}
+	ib0.n = i;
+	ib1.n = ib->n - ib0.n;
+	memmove(ib1.data, ib0.data+ib0.n*IEntrySize, ib1.n*IEntrySize);
+if(0) fprint(2, "splitiblock %d in %d/%0*ub => %d in %d/%0*ub + %d in %d/%0*ub\n",
+	ib->n, d-1, d-1, key0>>(32-d+1), 
+	ib0.n, d, d, key0>>(32-d), 
+	ib1.n, d, d, key1>>(32-d));
+
+	packibucket(&ib0, b0->data);
+	packibucket(&ib1, b1->data);
+
+	/* order matters!  see comment above. */
+	putdblock(b1);
+	putdblock(b0);
+
+	/*
+	 * let the recovery code take care of updating the bitmap.
+	 */
+
+	qlock(&stats.lock);
+	stats.indexsplits++;
+	qunlock(&stats.lock);
+
+	return 0;
+}
+
 int
 indexsect(Index *ix, u8int *score)
 {
blob - 27f91a61bbafab1ced644c3f8a62f9be262f2a17
blob + 1bb8841ee2111ce867bb925d555fa547b74dd49c
--- src/cmd/venti/lumpqueue.c
+++ src/cmd/venti/lumpqueue.c
@@ -102,6 +102,9 @@ flushqueue(void)
 	int i;
 	LumpQueue *q;
 
+	if(!lumpqs)
+		return;
+
 	qlock(&glk);
 	gen++;
 	qunlock(&glk);
blob - 8a162958233602283af5460d0d4aeeb5f263f2e5
blob + 66e6c79a3b17b293bdb9258e27c22203426859ba
--- src/cmd/venti/part.c
+++ src/cmd/venti/part.c
@@ -2,7 +2,7 @@
 #include "dat.h"
 #include "fns.h"
 
-#define trace 0
+#define trace 1
 
 u32int	maxblocksize;
 int	readonly;
blob - a4d73248173957259886d6c2ac835e18462a96f2
blob + 079fd97b624b1749de8afb58d489cefbebe7e8fb
--- src/cmd/venti/stats.c
+++ src/cmd/venti/stats.c
@@ -44,6 +44,7 @@ printstats(void)
 	fprint(2, "index disk reads=%,ld\n", stats.indexreads);
 	fprint(2, "index disk reads for modify=%,ld\n", stats.indexwreads);
 	fprint(2, "index disk reads for allocation=%,ld\n", stats.indexareads);
+	fprint(2, "index block splits=%,ld\n", stats.indexsplits);
 
 	fprint(2, "index cache lookups=%,ld\n", stats.iclookups);
 	fprint(2, "index cache hits=%,ld %d%%\n", stats.ichits,
blob - 78a6bd0fa3ea845c7144800b729b438e1e648e0d
blob + 7ba83aa1c880aacbc77510dec28bba2bd462c2a4
--- src/cmd/venti/syncarena.c
+++ src/cmd/venti/syncarena.c
@@ -128,6 +128,7 @@ syncarena(Arena *arena, u32int n, int zok, int fix)
 			fprint(2, "can't flush arena directory cache: %r");
 			err |= SyncFixErr;
 		}
+		flushdcache();
 	}
 
 	if(used != arena->used