Blob


1 /*
2 * Write the dirty icache entries to disk. Random seeks are
3 * so expensive that it makes sense to wait until we have
4 * a lot and then just make a sequential pass over the disk.
5 */
6 #include "stdinc.h"
7 #include "dat.h"
8 #include "fns.h"
10 static void icachewriteproc(void*);
11 static void icachewritecoord(void*);
12 static IEntry *iesort(IEntry*);
14 int icachesleeptime = 1000; /* milliseconds */
15 int minicachesleeptime = 0;
17 enum
18 {
19 Bufsize = 8*1024*1024
20 };
22 typedef struct IWrite IWrite;
23 struct IWrite
24 {
25 Round round;
26 AState as;
27 };
29 static IWrite iwrite;
31 void
32 initicachewrite(void)
33 {
34 int i;
35 Index *ix;
37 initround(&iwrite.round, "icache", 120*60*1000);
38 ix = mainindex;
39 for(i=0; i<ix->nsects; i++){
40 ix->sects[i]->writechan = chancreate(sizeof(ulong), 1);
41 ix->sects[i]->writedonechan = chancreate(sizeof(ulong), 1);
42 vtproc(icachewriteproc, ix->sects[i]);
43 }
44 vtproc(icachewritecoord, nil);
45 vtproc(delaykickroundproc, &iwrite.round);
46 }
48 static u64int
49 ie2diskaddr(Index *ix, ISect *is, IEntry *ie)
50 {
51 u64int bucket, addr;
53 bucket = hashbits(ie->score, 32)/ix->div;
54 addr = is->blockbase + ((bucket - is->start) << is->blocklog);
55 return addr;
56 }
58 static IEntry*
59 nextchunk(Index *ix, ISect *is, IEntry **pie, u64int *paddr, uint *pnbuf)
60 {
61 u64int addr, naddr;
62 uint nbuf;
63 int bsize;
64 IEntry *iefirst, *ie, **l;
66 bsize = 1<<is->blocklog;
67 iefirst = *pie;
68 addr = ie2diskaddr(ix, is, iefirst);
69 nbuf = 0;
70 for(l = &iefirst->nextdirty; (ie = *l) != nil; l = &(*l)->nextdirty){
71 naddr = ie2diskaddr(ix, is, ie);
72 if(naddr - addr >= Bufsize)
73 break;
74 nbuf = naddr - addr;
75 }
76 nbuf += bsize;
78 *l = nil;
79 *pie = ie;
80 *paddr = addr;
81 *pnbuf = nbuf;
82 return iefirst;
83 }
85 static int
86 icachewritesect(Index *ix, ISect *is, u8int *buf)
87 {
88 int err, i, werr, h, bsize, t;
89 u32int lo, hi;
90 u64int addr, naddr;
91 uint nbuf, off;
92 DBlock *b;
93 IBucket ib;
94 IEntry *ie, *iedirty, **l, *chunk;
96 lo = is->start * ix->div;
97 if(TWID32/ix->div < is->stop)
98 hi = TWID32;
99 else
100 hi = is->stop * ix->div - 1;
102 trace(TraceProc, "icachewritesect enter %ud %ud %llud",
103 lo, hi, iwrite.as.aa);
105 iedirty = icachedirty(lo, hi, iwrite.as.aa);
106 iedirty = iesort(iedirty);
107 bsize = 1 << is->blocklog;
108 err = 0;
110 while(iedirty){
111 disksched();
112 while((t = icachesleeptime) == SleepForever){
113 sleep(1000);
114 disksched();
116 if(t < minicachesleeptime)
117 t = minicachesleeptime;
118 if(t > 0)
119 sleep(t);
120 trace(TraceProc, "icachewritesect nextchunk");
121 chunk = nextchunk(ix, is, &iedirty, &addr, &nbuf);
123 trace(TraceProc, "icachewritesect readpart 0x%llux+0x%ux",
124 addr, nbuf);
125 if(readpart(is->part, addr, buf, nbuf) < 0){
126 fprint(2, "%s: part %s addr 0x%llux: icachewritesect "
127 "readpart: %r\n", argv0, is->part->name, addr);
128 err = -1;
129 continue;
131 trace(TraceProc, "icachewritesect updatebuf");
132 addstat(StatIsectReadBytes, nbuf);
133 addstat(StatIsectRead, 1);
135 for(l=&chunk; (ie=*l)!=nil; l=&ie->nextdirty){
136 again:
137 naddr = ie2diskaddr(ix, is, ie);
138 off = naddr - addr;
139 if(off+bsize > nbuf){
140 fprint(2, "%s: whoops! addr=0x%llux nbuf=%ud "
141 "addr+nbuf=0x%llux naddr=0x%llux\n",
142 argv0, addr, nbuf, addr+nbuf, naddr);
143 assert(off+bsize <= nbuf);
145 unpackibucket(&ib, buf+off, is->bucketmagic);
146 if(okibucket(&ib, is) < 0){
147 fprint(2, "%s: bad bucket XXX\n", argv0);
148 goto skipit;
150 trace(TraceProc, "icachewritesect add %V at 0x%llux",
151 ie->score, naddr);
152 h = bucklook(ie->score, ie->ia.type, ib.data, ib.n);
153 if(h & 1){
154 h ^= 1;
155 packientry(ie, &ib.data[h]);
156 }else if(ib.n < is->buckmax){
157 memmove(&ib.data[h + IEntrySize], &ib.data[h],
158 ib.n*IEntrySize - h);
159 ib.n++;
160 packientry(ie, &ib.data[h]);
161 }else{
162 fprint(2, "%s: bucket overflow XXX\n", argv0);
163 skipit:
164 err = -1;
165 *l = ie->nextdirty;
166 ie = *l;
167 if(ie)
168 goto again;
169 else
170 break;
172 packibucket(&ib, buf+off, is->bucketmagic);
175 diskaccess(1);
177 trace(TraceProc, "icachewritesect writepart", addr, nbuf);
178 werr = 0;
179 if(writepart(is->part, addr, buf, nbuf) < 0 || flushpart(is->part) < 0)
180 werr = -1;
182 for(i=0; i<nbuf; i+=bsize){
183 if((b = _getdblock(is->part, addr+i, ORDWR, 0)) != nil){
184 memmove(b->data, buf+i, bsize);
185 putdblock(b);
189 if(werr < 0){
190 fprint(2, "%s: part %s addr 0x%llux: icachewritesect "
191 "writepart: %r\n", argv0, is->part->name, addr);
192 err = -1;
193 continue;
196 addstat(StatIsectWriteBytes, nbuf);
197 addstat(StatIsectWrite, 1);
198 icacheclean(chunk);
201 trace(TraceProc, "icachewritesect done");
202 return err;
205 static void
206 icachewriteproc(void *v)
208 int ret;
209 uint bsize;
210 ISect *is;
211 Index *ix;
212 u8int *buf;
214 ix = mainindex;
215 is = v;
216 threadsetname("icachewriteproc:%s", is->part->name);
218 bsize = 1<<is->blocklog;
219 buf = emalloc(Bufsize+bsize);
220 buf = (u8int*)(((uintptr)buf+bsize-1)&~(uintptr)(bsize-1));
222 for(;;){
223 trace(TraceProc, "icachewriteproc recv");
224 recv(is->writechan, 0);
225 trace(TraceWork, "start");
226 ret = icachewritesect(ix, is, buf);
227 trace(TraceProc, "icachewriteproc send");
228 trace(TraceWork, "finish");
229 sendul(is->writedonechan, ret);
233 static void
234 icachewritecoord(void *v)
236 int i, err;
237 Index *ix;
238 AState as;
240 USED(v);
242 threadsetname("icachewritecoord");
244 ix = mainindex;
245 iwrite.as = icachestate();
247 for(;;){
248 trace(TraceProc, "icachewritecoord sleep");
249 waitforkick(&iwrite.round);
250 trace(TraceWork, "start");
251 as = icachestate();
252 if(as.arena==iwrite.as.arena && as.aa==iwrite.as.aa){
253 /* will not be able to do anything more than last flush - kick disk */
254 trace(TraceProc, "icachewritecoord kick dcache");
255 kickdcache();
256 trace(TraceProc, "icachewritecoord kicked dcache");
257 goto SkipWork; /* won't do anything; don't bother rewriting bloom filter */
259 iwrite.as = as;
261 trace(TraceProc, "icachewritecoord start flush");
262 if(iwrite.as.arena){
263 for(i=0; i<ix->nsects; i++)
264 send(ix->sects[i]->writechan, 0);
265 if(ix->bloom)
266 send(ix->bloom->writechan, 0);
268 err = 0;
269 for(i=0; i<ix->nsects; i++)
270 err |= recvul(ix->sects[i]->writedonechan);
271 if(ix->bloom)
272 err |= recvul(ix->bloom->writedonechan);
274 trace(TraceProc, "icachewritecoord donewrite err=%d", err);
275 if(err == 0){
276 setatailstate(&iwrite.as);
279 SkipWork:
280 icacheclean(nil); /* wake up anyone waiting */
281 trace(TraceWork, "finish");
282 addstat(StatIcacheFlush, 1);
286 void
287 flushicache(void)
289 trace(TraceProc, "flushicache enter");
290 kickround(&iwrite.round, 1);
291 trace(TraceProc, "flushicache exit");
294 void
295 kickicache(void)
297 kickround(&iwrite.round, 0);
300 void
301 delaykickicache(void)
303 delaykickround(&iwrite.round);
306 static IEntry*
307 iesort(IEntry *ie)
309 int cmp;
310 IEntry **l;
311 IEntry *ie1, *ie2, *sorted;
313 if(ie == nil || ie->nextdirty == nil)
314 return ie;
316 /* split the lists */
317 ie1 = ie;
318 ie2 = ie;
319 if(ie2)
320 ie2 = ie2->nextdirty;
321 if(ie2)
322 ie2 = ie2->nextdirty;
323 while(ie1 && ie2){
324 ie1 = ie1->nextdirty;
325 ie2 = ie2->nextdirty;
326 if(ie2)
327 ie2 = ie2->nextdirty;
329 if(ie1){
330 ie2 = ie1->nextdirty;
331 ie1->nextdirty = nil;
334 /* sort the lists */
335 ie1 = iesort(ie);
336 ie2 = iesort(ie2);
338 /* merge the lists */
339 sorted = nil;
340 l = &sorted;
341 cmp = 0;
342 while(ie1 || ie2){
343 if(ie1 && ie2)
344 cmp = scorecmp(ie1->score, ie2->score);
345 if(ie1==nil || (ie2 && cmp > 0)){
346 *l = ie2;
347 l = &ie2->nextdirty;
348 ie2 = ie2->nextdirty;
349 }else{
350 *l = ie1;
351 l = &ie1->nextdirty;
352 ie1 = ie1->nextdirty;
355 *l = nil;
356 return sorted;