Blob


1 /*
2 * Mirror one arena partition onto another.
3 * Be careful to copy only new data.
4 */
6 #include "stdinc.h"
7 #include "dat.h"
8 #include "fns.h"
10 Channel *writechan;
12 typedef struct Write Write;
13 struct Write
14 {
15 uchar *p;
16 int n;
17 uvlong o;
18 int error;
19 };
21 Part *src;
22 Part *dst;
23 int force;
24 int verbose;
25 int dosha1 = 1;
26 char *status;
27 uvlong astart, aend;
29 void
30 usage(void)
31 {
32 fprint(2, "usage: mirrorarenas [-sv] src dst [ranges]\n");
33 threadexitsall("usage");
34 }
36 char *tagged;
37 char *tagname;
38 int tagindx;
40 void
41 tag(int indx, char *name, char *fmt, ...)
42 {
43 va_list arg;
45 if(tagged){
46 free(tagged);
47 tagged = nil;
48 }
49 tagindx = indx;
50 tagname = name;
51 va_start(arg, fmt);
52 tagged = vsmprint(fmt, arg);
53 va_end(arg);
54 }
56 enum
57 {
58 Sealed = 1,
59 Mirrored = 2,
60 Empty = 4,
61 };
63 void
64 setstatus(int bits)
65 {
66 static int startindx = -1, endindx;
67 static char *startname, *endname;
68 static int lastbits;
69 char buf[100];
71 if(bits != lastbits) {
72 if(startindx >= 0) {
73 switch(lastbits) {
74 case Sealed:
75 snprint(buf, sizeof buf, "sealed");
76 break;
77 case Mirrored:
78 snprint(buf, sizeof buf, "mirrored");
79 break;
80 case Sealed+Mirrored:
81 snprint(buf, sizeof buf, "mirrored sealed");
82 break;
83 case Empty:
84 snprint(buf, sizeof buf, "empty");
85 break;
86 default:
87 snprint(buf, sizeof buf, "%d", bits);
88 break;
89 }
90 print("%T %s-%s %s\n", startname, endname, buf);
91 }
92 lastbits = bits;
93 startindx = tagindx;
94 endindx = tagindx;
95 startname = tagname;
96 endname = tagname;
97 } else {
98 endindx = tagindx;
99 endname = tagname;
101 if(bits < 0) {
102 startindx = -1;
103 endindx = -1;
104 return;
108 void
109 chat(char *fmt, ...)
111 va_list arg;
113 setstatus(-1);
115 if(tagged){
116 write(1, tagged, strlen(tagged));
117 free(tagged);
118 tagged = nil;
120 va_start(arg, fmt);
121 vfprint(1, fmt, arg);
122 va_end(arg);
125 #pragma varargck argpos tag 1
126 #pragma varargck argpos chat 1
129 int
130 ereadpart(Part *p, u64int offset, u8int *buf, u32int count)
132 if(readpart(p, offset, buf, count) != count){
133 chat("%T readpart %s at %#llux+%ud: %r\n", p->name, offset, count);
134 return -1;
136 return 0;
139 int
140 ewritepart(Part *p, u64int offset, u8int *buf, u32int count)
142 if(writepart(p, offset, buf, count) != count || flushpart(p) < 0){
143 chat("%T writepart %s at %#llux+%ud: %r\n", p->name, offset, count);
144 return -1;
146 return 0;
149 /*
150 * Extra proc to do writes to dst, so that we can overlap reading
151 * src with writing dst during copy. This is an easy factor of two
152 * (almost) in performance.
153 */
154 static Write wsync;
155 static void
156 writeproc(void *v)
158 Write *w;
160 USED(v);
161 while((w = recvp(writechan)) != nil){
162 if(w == &wsync)
163 continue;
164 if(ewritepart(dst, w->o, w->p, w->n) < 0)
165 w->error = 1;
169 int
170 copy(uvlong start, uvlong end, char *what, DigestState *ds)
172 int i, n;
173 uvlong o;
174 enum {
175 Chunk = 1024*1024
176 };
177 static uchar tmpbuf[2*Chunk+MaxIo];
178 static uchar *tmp[2];
179 uchar *p;
180 Write w[2];
182 assert(start <= end);
183 assert(astart <= start && start < aend);
184 assert(astart <= end && end <= aend);
186 // align the buffers so readpart/writepart can do big transfers
187 p = tmpbuf;
188 if((uintptr)p%MaxIo)
189 p += MaxIo - (uintptr)p%MaxIo;
190 tmp[0] = p;
191 tmp[1] = p + Chunk;
193 if(verbose && start != end)
194 chat("%T copy %,llud-%,llud %s\n", start, end, what);
196 i = 0;
197 memset(w, 0, sizeof w);
198 for(o=start; o<end; o+=n){
199 if(w[i].error)
200 goto error;
201 n = Chunk;
202 if(o+n > end)
203 n = end - o;
204 if(ereadpart(src, o, tmp[i], n) < 0)
205 goto error;
206 w[i].p = tmp[i];
207 w[i].o = o;
208 w[i].n = n;
209 w[i].error = 0;
210 sendp(writechan, &w[i]);
211 if(ds)
212 sha1(tmp[i], n, nil, ds);
213 i = 1-i;
215 if(w[i].error)
216 goto error;
218 /*
219 * wait for queued write to finish
220 */
221 sendp(writechan, &wsync);
222 i = 1-i;
223 if(w[i].error)
224 return -1;
225 return 0;
227 error:
228 /*
229 * sync with write proc
230 */
231 w[i].p = nil;
232 w[i].o = 0;
233 w[i].n = 0;
234 w[i].error = 0;
235 sendp(writechan, &w[i]);
236 return -1;
239 /* single-threaded, for reference */
240 int
241 copy1(uvlong start, uvlong end, char *what, DigestState *ds)
243 int n;
244 uvlong o;
245 static uchar tmp[1024*1024];
247 assert(start <= end);
248 assert(astart <= start && start < aend);
249 assert(astart <= end && end <= aend);
251 if(verbose && start != end)
252 chat("%T copy %,llud-%,llud %s\n", start, end, what);
254 for(o=start; o<end; o+=n){
255 n = sizeof tmp;
256 if(o+n > end)
257 n = end - o;
258 if(ereadpart(src, o, tmp, n) < 0)
259 return -1;
260 if(ds)
261 sha1(tmp, n, nil, ds);
262 if(ewritepart(dst, o, tmp, n) < 0)
263 return -1;
265 return 0;
268 int
269 asha1(Part *p, uvlong start, uvlong end, DigestState *ds)
271 int n;
272 uvlong o;
273 static uchar tmp[1024*1024];
275 if(start == end)
276 return 0;
277 assert(start < end);
279 if(verbose)
280 chat("%T sha1 %,llud-%,llud\n", start, end);
282 for(o=start; o<end; o+=n){
283 n = sizeof tmp;
284 if(o+n > end)
285 n = end - o;
286 if(ereadpart(p, o, tmp, n) < 0)
287 return -1;
288 sha1(tmp, n, nil, ds);
290 return 0;
293 uvlong
294 rdown(uvlong a, int b)
296 return a-a%b;
299 uvlong
300 rup(uvlong a, int b)
302 if(a%b == 0)
303 return a;
304 return a+b-a%b;
307 void
308 mirror(int indx, Arena *sa, Arena *da)
310 vlong v, si, di, end;
311 int clumpmax, blocksize, sealed;
312 static uchar buf[MaxIoSize];
313 ArenaHead h;
314 DigestState xds, *ds;
315 vlong shaoff, base;
317 base = sa->base;
318 blocksize = sa->blocksize;
319 end = sa->base + sa->size;
321 astart = base - blocksize;
322 aend = end + blocksize;
324 tag(indx, sa->name, "%T %s (%,llud-%,llud)\n", sa->name, astart, aend);
326 if(force){
327 copy(astart, aend, "all", nil);
328 return;
331 if(sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){
332 if(scorecmp(sa->score, da->score) == 0){
333 setstatus(Sealed+Mirrored);
334 if(verbose > 1)
335 chat("%T %s: %V sealed mirrored\n", sa->name, sa->score);
336 return;
338 chat("%T %s: warning: sealed score mismatch %V vs %V\n", sa->name, sa->score, da->score);
339 /* Keep executing; will correct seal if possible. */
341 if(!sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){
342 chat("%T %s: dst is sealed, src is not\n", sa->name);
343 status = "errors";
344 return;
346 if(sa->diskstats.used < da->diskstats.used){
347 chat("%T %s: src used %,lld < dst used %,lld\n", sa->name, sa->diskstats.used, da->diskstats.used);
348 status = "errors";
349 return;
352 if(da->clumpmagic != sa->clumpmagic){
353 /*
354 * Write this now to reduce the window in which
355 * the head and tail disagree about clumpmagic.
356 */
357 da->clumpmagic = sa->clumpmagic;
358 memset(buf, 0, sizeof buf);
359 packarena(da, buf);
360 if(ewritepart(dst, end, buf, blocksize) < 0)
361 return;
364 memset(&h, 0, sizeof h);
365 h.version = da->version;
366 strcpy(h.name, da->name);
367 h.blocksize = da->blocksize;
368 h.size = da->size + 2*da->blocksize;
369 h.clumpmagic = da->clumpmagic;
370 memset(buf, 0, sizeof buf);
371 packarenahead(&h, buf);
372 if(ewritepart(dst, base - blocksize, buf, blocksize) < 0)
373 return;
375 shaoff = 0;
376 ds = nil;
377 sealed = sa->diskstats.sealed && scorecmp(sa->score, zeroscore) != 0;
378 if(sealed && dosha1){
379 /* start sha1 state with header */
380 memset(&xds, 0, sizeof xds);
381 ds = &xds;
382 sha1(buf, blocksize, nil, ds);
383 shaoff = base;
386 if(sa->diskstats.used != da->diskstats.used){
387 di = base+rdown(da->diskstats.used, blocksize);
388 si = base+rup(sa->diskstats.used, blocksize);
389 if(ds && asha1(dst, shaoff, di, ds) < 0)
390 return;
391 if(copy(di, si, "data", ds) < 0)
392 return;
393 shaoff = si;
396 clumpmax = sa->clumpmax;
397 di = end - da->diskstats.clumps/clumpmax * blocksize;
398 si = end - (sa->diskstats.clumps+clumpmax-1)/clumpmax * blocksize;
400 if(sa->diskstats.sealed){
401 /*
402 * might be a small hole between the end of the
403 * data and the beginning of the directory.
404 */
405 v = base+rup(sa->diskstats.used, blocksize);
406 if(ds && asha1(dst, shaoff, v, ds) < 0)
407 return;
408 if(copy(v, si, "hole", ds) < 0)
409 return;
410 shaoff = si;
413 if(da->diskstats.clumps != sa->diskstats.clumps){
414 if(ds && asha1(dst, shaoff, si, ds) < 0)
415 return;
416 if(copy(si, di, "directory", ds) < 0) /* si < di because clumpinfo blocks grow down */
417 return;
418 shaoff = di;
421 da->ctime = sa->ctime;
422 da->wtime = sa->wtime;
423 da->diskstats = sa->diskstats;
424 da->diskstats.sealed = 0;
426 /*
427 * Repack the arena tail information
428 * and save it for next time...
429 */
430 memset(buf, 0, sizeof buf);
431 packarena(da, buf);
432 if(ewritepart(dst, end, buf, blocksize) < 0)
433 return;
435 if(sealed){
436 /*
437 * ... but on the final pass, copy the encoding
438 * of the tail information from the source
439 * arena itself. There are multiple possible
440 * ways to write the tail info out (the exact
441 * details have changed as venti went through
442 * revisions), and to keep the SHA1 hash the
443 * same, we have to use what the disk uses.
444 */
445 if(asha1(dst, shaoff, end, ds) < 0
446 || copy(end, end+blocksize-VtScoreSize, "tail", ds) < 0)
447 return;
448 if(dosha1){
449 memset(buf, 0, VtScoreSize);
450 sha1(buf, VtScoreSize, da->score, ds);
451 if(scorecmp(sa->score, da->score) == 0){
452 setstatus(Sealed+Mirrored);
453 if(verbose > 1)
454 chat("%T %s: %V sealed mirrored\n", sa->name, sa->score);
455 if(ewritepart(dst, end+blocksize-VtScoreSize, da->score, VtScoreSize) < 0)
456 return;
457 }else{
458 chat("%T %s: sealing dst: score mismatch: %V vs %V\n", sa->name, sa->score, da->score);
459 memset(&xds, 0, sizeof xds);
460 asha1(dst, base-blocksize, end+blocksize-VtScoreSize, &xds);
461 sha1(buf, VtScoreSize, 0, &xds);
462 chat("%T reseal: %V\n", da->score);
463 status = "errors";
465 }else{
466 setstatus(Mirrored);
467 if(verbose > 1)
468 chat("%T %s: %V mirrored\n", sa->name, sa->score);
469 if(ewritepart(dst, end+blocksize-VtScoreSize, sa->score, VtScoreSize) < 0)
470 return;
472 }else{
473 if(sa->diskstats.used > 0 || verbose > 1) {
474 chat("%T %s: %,lld used mirrored\n",
475 sa->name, sa->diskstats.used);
477 if(sa->diskstats.used > 0)
478 setstatus(Mirrored);
479 else
480 setstatus(Empty);
484 void
485 mirrormany(ArenaPart *sp, ArenaPart *dp, char *range)
487 int i, lo, hi;
488 char *s, *t;
489 Arena *sa, *da;
491 if(range == nil){
492 for(i=0; i<sp->narenas; i++){
493 sa = sp->arenas[i];
494 da = dp->arenas[i];
495 mirror(i, sa, da);
497 setstatus(-1);
498 return;
500 if(strcmp(range, "none") == 0)
501 return;
503 for(s=range; *s; s=t){
504 t = strchr(s, ',');
505 if(t)
506 *t++ = 0;
507 else
508 t = s+strlen(s);
509 if(*s == '-')
510 lo = 0;
511 else
512 lo = strtol(s, &s, 0);
513 hi = lo;
514 if(*s == '-'){
515 s++;
516 if(*s == 0)
517 hi = sp->narenas-1;
518 else
519 hi = strtol(s, &s, 0);
521 if(*s != 0){
522 chat("%T bad arena range: %s\n", s);
523 continue;
525 for(i=lo; i<=hi; i++){
526 sa = sp->arenas[i];
527 da = dp->arenas[i];
528 mirror(i, sa, da);
530 setstatus(-1);
535 void
536 threadmain(int argc, char **argv)
538 int i;
539 Arena *sa, *da;
540 ArenaPart *s, *d;
541 char *ranges;
543 ventifmtinstall();
545 ARGBEGIN{
546 case 'F':
547 force = 1;
548 break;
549 case 'v':
550 verbose++;
551 break;
552 case 's':
553 dosha1 = 0;
554 break;
555 default:
556 usage();
557 }ARGEND
559 if(argc != 2 && argc != 3)
560 usage();
561 ranges = nil;
562 if(argc == 3)
563 ranges = argv[2];
565 if((src = initpart(argv[0], OREAD)) == nil)
566 sysfatal("initpart %s: %r", argv[0]);
567 if((dst = initpart(argv[1], ORDWR)) == nil)
568 sysfatal("initpart %s: %r", argv[1]);
569 if((s = initarenapart(src)) == nil)
570 sysfatal("initarenapart %s: %r", argv[0]);
571 for(i=0; i<s->narenas; i++)
572 delarena(s->arenas[i]);
573 if((d = initarenapart(dst)) == nil)
574 sysfatal("loadarenapart %s: %r", argv[1]);
575 for(i=0; i<d->narenas; i++)
576 delarena(d->arenas[i]);
578 /*
579 * The arena geometries must match or all bets are off.
580 */
581 if(s->narenas != d->narenas)
582 sysfatal("arena count mismatch: %d vs %d", s->narenas, d->narenas);
583 for(i=0; i<s->narenas; i++){
584 sa = s->arenas[i];
585 da = d->arenas[i];
586 if(sa->version != da->version)
587 sysfatal("arena %d: version mismatch: %d vs %d", i, sa->version, da->version);
588 if(sa->blocksize != da->blocksize)
589 sysfatal("arena %d: blocksize mismatch: %d vs %d", i, sa->blocksize, da->blocksize);
590 if(sa->size != da->size)
591 sysfatal("arena %d: size mismatch: %,lld vs %,lld", i, sa->size, da->size);
592 if(strcmp(sa->name, da->name) != 0)
593 sysfatal("arena %d: name mismatch: %s vs %s", i, sa->name, da->name);
596 /*
597 * Mirror one arena at a time.
598 */
599 writechan = chancreate(sizeof(void*), 0);
600 vtproc(writeproc, nil);
601 mirrormany(s, d, ranges);
602 sendp(writechan, nil);
603 threadexitsall(status);