Blob


1 /*
2 * Write the dirty icache entries to disk. Random seeks are
3 * so expensive that it makes sense to wait until we have
4 * a lot and then just make a sequential pass over the disk.
5 */
6 #include "stdinc.h"
7 #include "dat.h"
8 #include "fns.h"
10 static void icachewriteproc(void*);
11 static void icachewritecoord(void*);
12 static IEntry *iesort(IEntry*);
14 int icachesleeptime = 1000; /* milliseconds */
16 enum
17 {
18 Bufsize = 8*1024*1024
19 };
21 typedef struct IWrite IWrite;
22 struct IWrite
23 {
24 Round round;
25 AState as;
26 };
28 static IWrite iwrite;
30 void
31 initicachewrite(void)
32 {
33 int i;
34 Index *ix;
36 initround(&iwrite.round, "icache", 120*60*1000);
37 ix = mainindex;
38 for(i=0; i<ix->nsects; i++){
39 ix->sects[i]->writechan = chancreate(sizeof(ulong), 1);
40 ix->sects[i]->writedonechan = chancreate(sizeof(ulong), 1);
41 vtproc(icachewriteproc, ix->sects[i]);
42 }
43 vtproc(icachewritecoord, nil);
44 vtproc(delaykickroundproc, &iwrite.round);
45 }
47 static IEntry*
48 nextchunk(Index *ix, ISect *is, IEntry **pie, u64int *paddr, uint *pnbuf)
49 {
50 u64int addr, naddr;
51 uint nbuf;
52 int bsize;
53 IEntry *iefirst, *ie, **l;
55 bsize = 1<<is->blocklog;
56 iefirst = *pie;
57 addr = is->blockbase + ((u64int)(hashbits(iefirst->score, 32) / ix->div - is->start) << is->blocklog);
58 nbuf = 0;
59 for(l=&iefirst->nextdirty; (ie=*l)!=nil; l=&(*l)->nextdirty){
60 naddr = is->blockbase + ((u64int)(hashbits(ie->score, 32) / ix->div - is->start) << is->blocklog);
61 if(naddr - addr >= Bufsize)
62 break;
63 nbuf = naddr-addr;
64 }
65 nbuf += bsize;
67 *l = nil;
68 *pie = ie;
69 *paddr = addr;
70 *pnbuf = nbuf;
71 return iefirst;
72 }
74 static int
75 icachewritesect(Index *ix, ISect *is, u8int *buf)
76 {
77 int err, h, bsize;
78 u32int lo, hi;
79 u64int addr, naddr;
80 uint nbuf, off;
81 DBlock *b;
82 IBucket ib;
83 IEntry *ie, *iedirty, **l, *chunk;
85 lo = is->start * ix->div;
86 if(TWID32/ix->div < is->stop)
87 hi = TWID32;
88 else
89 hi = is->stop * ix->div - 1;
91 trace(TraceProc, "icachewritesect enter %ud %ud %llud", lo, hi, iwrite.as.aa);
93 iedirty = icachedirty(lo, hi, iwrite.as.aa);
94 iedirty = iesort(iedirty);
95 bsize = 1<<is->blocklog;
96 err = 0;
98 while(iedirty){
99 sleep(icachesleeptime);
100 trace(TraceProc, "icachewritesect nextchunk");
101 chunk = nextchunk(ix, is, &iedirty, &addr, &nbuf);
103 trace(TraceProc, "icachewritesect readpart 0x%llux+0x%ux", addr, nbuf);
104 if(readpart(is->part, addr, buf, nbuf) < 0){
105 // XXX
106 fprint(2, "icachewriteproc readpart: %r\n");
107 err = -1;
108 continue;
110 trace(TraceProc, "icachewritesect updatebuf");
111 addstat(StatIsectReadBytes, nbuf);
112 addstat(StatIsectRead, 1);
114 for(l=&chunk; (ie=*l)!=nil; l=&ie->nextdirty){
115 again:
116 naddr = is->blockbase + ((u64int)(hashbits(ie->score, 32) / ix->div - is->start) << is->blocklog);
117 off = naddr - addr;
118 if(off+bsize > nbuf){
119 fprint(2, "whoops! addr=0x%llux nbuf=%ud addr+nbuf=0x%llux naddr=0x%llux\n",
120 addr, nbuf, addr+nbuf, naddr);
121 assert(off+bsize <= nbuf);
123 unpackibucket(&ib, buf+off, is->bucketmagic);
124 if(okibucket(&ib, is) < 0){
125 fprint(2, "bad bucket XXX\n");
126 goto skipit;
128 trace(TraceProc, "icachewritesect add %V at 0x%llux", ie->score, naddr);
129 h = bucklook(ie->score, ie->ia.type, ib.data, ib.n);
130 if(h & 1){
131 h ^= 1;
132 packientry(ie, &ib.data[h]);
133 }else if(ib.n < is->buckmax){
134 memmove(&ib.data[h+IEntrySize], &ib.data[h], ib.n*IEntrySize - h);
135 ib.n++;
136 packientry(ie, &ib.data[h]);
137 }else{
138 fprint(2, "bucket overflow XXX\n");
139 skipit:
140 err = -1;
141 *l = ie->nextdirty;
142 ie = *l;
143 if(ie)
144 goto again;
145 else
146 break;
148 packibucket(&ib, buf+off, is->bucketmagic);
149 if((b = _getdblock(is->part, naddr, ORDWR, 0)) != nil){
150 memmove(b->data, buf+off, bsize);
151 putdblock(b);
155 trace(TraceProc, "icachewritesect writepart", addr, nbuf);
156 if(writepart(is->part, addr, buf, nbuf) < 0){
157 // XXX
158 fprint(2, "icachewriteproc writepart: %r\n");
159 err = -1;
160 continue;
162 addstat(StatIsectWriteBytes, nbuf);
163 addstat(StatIsectWrite, 1);
164 icacheclean(chunk);
167 trace(TraceProc, "icachewritesect done");
168 return err;
171 static void
172 icachewriteproc(void *v)
174 uint bsize;
175 ISect *is;
176 Index *ix;
177 u8int *buf;
179 ix = mainindex;
180 is = v;
181 threadsetname("icachewriteproc:%s", is->part->name);
183 bsize = 1<<is->blocklog;
184 buf = emalloc(Bufsize+bsize);
185 buf = (u8int*)(((ulong)buf+bsize-1)&~(ulong)(bsize-1));
187 for(;;){
188 trace(TraceProc, "icachewriteproc recv");
189 recv(is->writechan, 0);
190 trace(TraceWork, "start");
191 icachewritesect(ix, is, buf);
192 trace(TraceProc, "icachewriteproc send");
193 trace(TraceWork, "finish");
194 send(is->writedonechan, 0);
198 static void
199 icachewritecoord(void *v)
201 int i;
202 Index *ix;
203 AState as;
205 USED(v);
207 threadsetname("icachewritecoord");
209 ix = mainindex;
210 iwrite.as = diskstate();
212 for(;;){
213 trace(TraceProc, "icachewritecoord sleep");
214 waitforkick(&iwrite.round);
215 trace(TraceWork, "start");
216 as = diskstate();
217 if(as.arena==iwrite.as.arena && as.aa==iwrite.as.aa){
218 /* will not be able to do anything more than last flush - kick disk */
219 trace(TraceProc, "icachewritecoord flush dcache");
220 kickdcache();
221 trace(TraceProc, "icachewritecoord flushed dcache");
223 iwrite.as = as;
225 trace(TraceProc, "icachewritecoord start flush");
226 if(iwrite.as.arena){
227 for(i=0; i<ix->nsects; i++)
228 send(ix->sects[i]->writechan, 0);
229 if(ix->bloom)
230 send(ix->bloom->writechan, 0);
232 for(i=0; i<ix->nsects; i++)
233 recv(ix->sects[i]->writedonechan, 0);
234 if(ix->bloom)
235 recv(ix->bloom->writedonechan, 0);
237 trace(TraceProc, "icachewritecoord donewrite");
238 setatailstate(&iwrite.as);
240 icacheclean(nil); /* wake up anyone waiting */
241 trace(TraceWork, "finish");
242 addstat(StatIcacheFlush, 1);
246 void
247 flushicache(void)
249 trace(TraceProc, "flushicache enter");
250 kickround(&iwrite.round, 1);
251 trace(TraceProc, "flushicache exit");
254 void
255 kickicache(void)
257 kickround(&iwrite.round, 0);
260 void
261 delaykickicache(void)
263 delaykickround(&iwrite.round);
266 static IEntry*
267 iesort(IEntry *ie)
269 int cmp;
270 IEntry **l;
271 IEntry *ie1, *ie2, *sorted;
273 if(ie == nil || ie->nextdirty == nil)
274 return ie;
276 /* split the lists */
277 ie1 = ie;
278 ie2 = ie;
279 if(ie2)
280 ie2 = ie2->nextdirty;
281 if(ie2)
282 ie2 = ie2->nextdirty;
283 while(ie1 && ie2){
284 ie1 = ie1->nextdirty;
285 ie2 = ie2->nextdirty;
286 if(ie2)
287 ie2 = ie2->nextdirty;
289 if(ie1){
290 ie2 = ie1->nextdirty;
291 ie1->nextdirty = nil;
294 /* sort the lists */
295 ie1 = iesort(ie);
296 ie2 = iesort(ie2);
298 /* merge the lists */
299 sorted = nil;
300 l = &sorted;
301 cmp = 0;
302 while(ie1 || ie2){
303 if(ie1 && ie2)
304 cmp = scorecmp(ie1->score, ie2->score);
305 if(ie1==nil || (ie2 && cmp > 0)){
306 *l = ie2;
307 l = &ie2->nextdirty;
308 ie2 = ie2->nextdirty;
309 }else{
310 *l = ie1;
311 l = &ie1->nextdirty;
312 ie1 = ie1->nextdirty;
315 *l = nil;
316 return sorted;