Blob


1 /*
2 * Mirror one arena partition onto another.
3 * Be careful to copy only new data.
4 */
6 #include "stdinc.h"
7 #include "dat.h"
8 #include "fns.h"
10 Channel *writechan;
12 typedef struct Write Write;
13 struct Write
14 {
15 uchar *p;
16 int n;
17 uvlong o;
18 int error;
19 };
21 Part *src;
22 Part *dst;
23 int force;
24 int verbose;
25 int dosha1 = 1;
26 char *status;
27 uvlong astart, aend;
29 void
30 usage(void)
31 {
32 fprint(2, "usage: mirrorarenas [-sv] src dst [ranges]\n");
33 threadexitsall("usage");
34 }
36 char *tagged;
37 char *tagname;
38 int tagindx;
40 void
41 tag(int indx, char *name, char *fmt, ...)
42 {
43 va_list arg;
45 if(tagged){
46 free(tagged);
47 tagged = nil;
48 }
49 tagindx = indx;
50 tagname = name;
51 va_start(arg, fmt);
52 tagged = vsmprint(fmt, arg);
53 va_end(arg);
54 }
56 enum
57 {
58 Sealed = 1,
59 Mirrored = 2,
60 Empty = 4,
61 };
63 void
64 setstatus(int bits)
65 {
66 static int startindx = -1;
67 static char *startname, *endname;
68 static int lastbits;
69 char buf[100];
71 if(bits != lastbits) {
72 if(startindx >= 0) {
73 switch(lastbits) {
74 case Sealed:
75 snprint(buf, sizeof buf, "sealed");
76 break;
77 case Mirrored:
78 snprint(buf, sizeof buf, "mirrored");
79 break;
80 case Sealed+Mirrored:
81 snprint(buf, sizeof buf, "mirrored sealed");
82 break;
83 case Empty:
84 snprint(buf, sizeof buf, "empty");
85 break;
86 default:
87 snprint(buf, sizeof buf, "%d", bits);
88 break;
89 }
90 print("%T %s-%s %s\n", startname, endname, buf);
91 }
92 lastbits = bits;
93 startindx = tagindx;
94 startname = tagname;
95 endname = tagname;
96 } else {
97 endname = tagname;
98 }
99 if(bits < 0) {
100 startindx = -1;
101 return;
105 void
106 chat(char *fmt, ...)
108 va_list arg;
110 setstatus(-1);
112 if(tagged){
113 write(1, tagged, strlen(tagged));
114 free(tagged);
115 tagged = nil;
117 va_start(arg, fmt);
118 vfprint(1, fmt, arg);
119 va_end(arg);
122 #pragma varargck argpos tag 3
123 #pragma varargck argpos chat 1
126 int
127 ereadpart(Part *p, u64int offset, u8int *buf, u32int count)
129 if(readpart(p, offset, buf, count) != count){
130 chat("%T readpart %s at %#llux+%ud: %r\n", p->name, offset, count);
131 return -1;
133 return 0;
136 int
137 ewritepart(Part *p, u64int offset, u8int *buf, u32int count)
139 if(writepart(p, offset, buf, count) != count || flushpart(p) < 0){
140 chat("%T writepart %s at %#llux+%ud: %r\n", p->name, offset, count);
141 return -1;
143 return 0;
146 /*
147 * Extra proc to do writes to dst, so that we can overlap reading
148 * src with writing dst during copy. This is an easy factor of two
149 * (almost) in performance.
150 */
151 static Write wsync;
152 static void
153 writeproc(void *v)
155 Write *w;
157 USED(v);
158 while((w = recvp(writechan)) != nil){
159 if(w == &wsync)
160 continue;
161 if(ewritepart(dst, w->o, w->p, w->n) < 0)
162 w->error = 1;
166 int
167 copy(uvlong start, uvlong end, char *what, DigestState *ds)
169 int i, n;
170 uvlong o;
171 enum {
172 Chunk = 1024*1024
173 };
174 static uchar tmpbuf[2*Chunk+MaxIo];
175 static uchar *tmp[2];
176 uchar *p;
177 Write w[2];
179 assert(start <= end);
180 assert(astart <= start && start < aend);
181 assert(astart <= end && end <= aend);
183 // align the buffers so readpart/writepart can do big transfers
184 p = tmpbuf;
185 if((uintptr)p%MaxIo)
186 p += MaxIo - (uintptr)p%MaxIo;
187 tmp[0] = p;
188 tmp[1] = p + Chunk;
190 if(verbose && start != end)
191 chat("%T copy %,llud-%,llud %s\n", start, end, what);
193 i = 0;
194 memset(w, 0, sizeof w);
195 for(o=start; o<end; o+=n){
196 if(w[i].error)
197 goto error;
198 n = Chunk;
199 if(o+n > end)
200 n = end - o;
201 if(ereadpart(src, o, tmp[i], n) < 0)
202 goto error;
203 w[i].p = tmp[i];
204 w[i].o = o;
205 w[i].n = n;
206 w[i].error = 0;
207 sendp(writechan, &w[i]);
208 if(ds)
209 sha1(tmp[i], n, nil, ds);
210 i = 1-i;
212 if(w[i].error)
213 goto error;
215 /*
216 * wait for queued write to finish
217 */
218 sendp(writechan, &wsync);
219 i = 1-i;
220 if(w[i].error)
221 return -1;
222 return 0;
224 error:
225 /*
226 * sync with write proc
227 */
228 w[i].p = nil;
229 w[i].o = 0;
230 w[i].n = 0;
231 w[i].error = 0;
232 sendp(writechan, &w[i]);
233 return -1;
236 /* single-threaded, for reference */
237 int
238 copy1(uvlong start, uvlong end, char *what, DigestState *ds)
240 int n;
241 uvlong o;
242 static uchar tmp[1024*1024];
244 assert(start <= end);
245 assert(astart <= start && start < aend);
246 assert(astart <= end && end <= aend);
248 if(verbose && start != end)
249 chat("%T copy %,llud-%,llud %s\n", start, end, what);
251 for(o=start; o<end; o+=n){
252 n = sizeof tmp;
253 if(o+n > end)
254 n = end - o;
255 if(ereadpart(src, o, tmp, n) < 0)
256 return -1;
257 if(ds)
258 sha1(tmp, n, nil, ds);
259 if(ewritepart(dst, o, tmp, n) < 0)
260 return -1;
262 return 0;
265 int
266 asha1(Part *p, uvlong start, uvlong end, DigestState *ds)
268 int n;
269 uvlong o;
270 static uchar tmp[1024*1024];
272 if(start == end)
273 return 0;
274 assert(start < end);
276 if(verbose)
277 chat("%T sha1 %,llud-%,llud\n", start, end);
279 for(o=start; o<end; o+=n){
280 n = sizeof tmp;
281 if(o+n > end)
282 n = end - o;
283 if(ereadpart(p, o, tmp, n) < 0)
284 return -1;
285 sha1(tmp, n, nil, ds);
287 return 0;
290 uvlong
291 rdown(uvlong a, int b)
293 return a-a%b;
296 uvlong
297 rup(uvlong a, int b)
299 if(a%b == 0)
300 return a;
301 return a+b-a%b;
304 void
305 mirror(int indx, Arena *sa, Arena *da)
307 vlong v, si, di, end;
308 int clumpmax, blocksize, sealed;
309 static uchar buf[MaxIoSize];
310 ArenaHead h;
311 DigestState xds, *ds;
312 vlong shaoff, base;
314 base = sa->base;
315 blocksize = sa->blocksize;
316 end = sa->base + sa->size;
318 astart = base - blocksize;
319 aend = end + blocksize;
321 tag(indx, sa->name, "%T %s (%,llud-%,llud)\n", sa->name, astart, aend);
323 if(force){
324 copy(astart, aend, "all", nil);
325 return;
328 if(sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){
329 if(scorecmp(sa->score, da->score) == 0){
330 setstatus(Sealed+Mirrored);
331 if(verbose > 1)
332 chat("%T %s: %V sealed mirrored\n", sa->name, sa->score);
333 return;
335 chat("%T %s: warning: sealed score mismatch %V vs %V\n", sa->name, sa->score, da->score);
336 /* Keep executing; will correct seal if possible. */
338 if(!sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){
339 chat("%T %s: dst is sealed, src is not\n", sa->name);
340 status = "errors";
341 return;
343 if(sa->diskstats.used < da->diskstats.used){
344 chat("%T %s: src used %,lld < dst used %,lld\n", sa->name, sa->diskstats.used, da->diskstats.used);
345 status = "errors";
346 return;
349 if(da->clumpmagic != sa->clumpmagic){
350 /*
351 * Write this now to reduce the window in which
352 * the head and tail disagree about clumpmagic.
353 */
354 da->clumpmagic = sa->clumpmagic;
355 memset(buf, 0, sizeof buf);
356 packarena(da, buf);
357 if(ewritepart(dst, end, buf, blocksize) < 0)
358 return;
361 memset(&h, 0, sizeof h);
362 h.version = da->version;
363 strcpy(h.name, da->name);
364 h.blocksize = da->blocksize;
365 h.size = da->size + 2*da->blocksize;
366 h.clumpmagic = da->clumpmagic;
367 memset(buf, 0, sizeof buf);
368 packarenahead(&h, buf);
369 if(ewritepart(dst, base - blocksize, buf, blocksize) < 0)
370 return;
372 shaoff = 0;
373 ds = nil;
374 sealed = sa->diskstats.sealed && scorecmp(sa->score, zeroscore) != 0;
375 if(sealed && dosha1){
376 /* start sha1 state with header */
377 memset(&xds, 0, sizeof xds);
378 ds = &xds;
379 sha1(buf, blocksize, nil, ds);
380 shaoff = base;
383 if(sa->diskstats.used != da->diskstats.used){
384 di = base+rdown(da->diskstats.used, blocksize);
385 si = base+rup(sa->diskstats.used, blocksize);
386 if(ds && asha1(dst, shaoff, di, ds) < 0)
387 return;
388 if(copy(di, si, "data", ds) < 0)
389 return;
390 shaoff = si;
393 clumpmax = sa->clumpmax;
394 di = end - da->diskstats.clumps/clumpmax * blocksize;
395 si = end - (sa->diskstats.clumps+clumpmax-1)/clumpmax * blocksize;
397 if(sa->diskstats.sealed){
398 /*
399 * might be a small hole between the end of the
400 * data and the beginning of the directory.
401 */
402 v = base+rup(sa->diskstats.used, blocksize);
403 if(ds && asha1(dst, shaoff, v, ds) < 0)
404 return;
405 if(copy(v, si, "hole", ds) < 0)
406 return;
407 shaoff = si;
410 if(da->diskstats.clumps != sa->diskstats.clumps){
411 if(ds && asha1(dst, shaoff, si, ds) < 0)
412 return;
413 if(copy(si, di, "directory", ds) < 0) /* si < di because clumpinfo blocks grow down */
414 return;
415 shaoff = di;
418 da->ctime = sa->ctime;
419 da->wtime = sa->wtime;
420 da->diskstats = sa->diskstats;
421 da->diskstats.sealed = 0;
423 /*
424 * Repack the arena tail information
425 * and save it for next time...
426 */
427 memset(buf, 0, sizeof buf);
428 packarena(da, buf);
429 if(ewritepart(dst, end, buf, blocksize) < 0)
430 return;
432 if(sealed){
433 /*
434 * ... but on the final pass, copy the encoding
435 * of the tail information from the source
436 * arena itself. There are multiple possible
437 * ways to write the tail info out (the exact
438 * details have changed as venti went through
439 * revisions), and to keep the SHA1 hash the
440 * same, we have to use what the disk uses.
441 */
442 if(asha1(dst, shaoff, end, ds) < 0
443 || copy(end, end+blocksize-VtScoreSize, "tail", ds) < 0)
444 return;
445 if(dosha1){
446 memset(buf, 0, VtScoreSize);
447 sha1(buf, VtScoreSize, da->score, ds);
448 if(scorecmp(sa->score, da->score) == 0){
449 setstatus(Sealed+Mirrored);
450 if(verbose > 1)
451 chat("%T %s: %V sealed mirrored\n", sa->name, sa->score);
452 if(ewritepart(dst, end+blocksize-VtScoreSize, da->score, VtScoreSize) < 0)
453 return;
454 }else{
455 chat("%T %s: sealing dst: score mismatch: %V vs %V\n", sa->name, sa->score, da->score);
456 memset(&xds, 0, sizeof xds);
457 asha1(dst, base-blocksize, end+blocksize-VtScoreSize, &xds);
458 sha1(buf, VtScoreSize, 0, &xds);
459 chat("%T reseal: %V\n", da->score);
460 status = "errors";
462 }else{
463 setstatus(Mirrored);
464 if(verbose > 1)
465 chat("%T %s: %V mirrored\n", sa->name, sa->score);
466 if(ewritepart(dst, end+blocksize-VtScoreSize, sa->score, VtScoreSize) < 0)
467 return;
469 }else{
470 if(sa->diskstats.used > 0 || verbose > 1) {
471 chat("%T %s: %,lld used mirrored\n",
472 sa->name, sa->diskstats.used);
474 if(sa->diskstats.used > 0)
475 setstatus(Mirrored);
476 else
477 setstatus(Empty);
481 void
482 mirrormany(ArenaPart *sp, ArenaPart *dp, char *range)
484 int i, lo, hi;
485 char *s, *t;
486 Arena *sa, *da;
488 if(range == nil){
489 for(i=0; i<sp->narenas; i++){
490 sa = sp->arenas[i];
491 da = dp->arenas[i];
492 mirror(i, sa, da);
494 setstatus(-1);
495 return;
497 if(strcmp(range, "none") == 0)
498 return;
500 for(s=range; *s; s=t){
501 t = strchr(s, ',');
502 if(t)
503 *t++ = 0;
504 else
505 t = s+strlen(s);
506 if(*s == '-')
507 lo = 0;
508 else
509 lo = strtol(s, &s, 0);
510 hi = lo;
511 if(*s == '-'){
512 s++;
513 if(*s == 0)
514 hi = sp->narenas-1;
515 else
516 hi = strtol(s, &s, 0);
518 if(*s != 0){
519 chat("%T bad arena range: %s\n", s);
520 continue;
522 for(i=lo; i<=hi; i++){
523 sa = sp->arenas[i];
524 da = dp->arenas[i];
525 mirror(i, sa, da);
527 setstatus(-1);
532 void
533 threadmain(int argc, char **argv)
535 int i;
536 Arena *sa, *da;
537 ArenaPart *s, *d;
538 char *ranges;
540 ventifmtinstall();
542 ARGBEGIN{
543 case 'F':
544 force = 1;
545 break;
546 case 'v':
547 verbose++;
548 break;
549 case 's':
550 dosha1 = 0;
551 break;
552 default:
553 usage();
554 }ARGEND
556 if(argc != 2 && argc != 3)
557 usage();
558 ranges = nil;
559 if(argc == 3)
560 ranges = argv[2];
562 if((src = initpart(argv[0], OREAD)) == nil)
563 sysfatal("initpart %s: %r", argv[0]);
564 if((dst = initpart(argv[1], ORDWR)) == nil)
565 sysfatal("initpart %s: %r", argv[1]);
566 if((s = initarenapart(src)) == nil)
567 sysfatal("initarenapart %s: %r", argv[0]);
568 for(i=0; i<s->narenas; i++)
569 delarena(s->arenas[i]);
570 if((d = initarenapart(dst)) == nil)
571 sysfatal("loadarenapart %s: %r", argv[1]);
572 for(i=0; i<d->narenas; i++)
573 delarena(d->arenas[i]);
575 /*
576 * The arena geometries must match or all bets are off.
577 */
578 if(s->narenas != d->narenas)
579 sysfatal("arena count mismatch: %d vs %d", s->narenas, d->narenas);
580 for(i=0; i<s->narenas; i++){
581 sa = s->arenas[i];
582 da = d->arenas[i];
583 if(sa->version != da->version)
584 sysfatal("arena %d: version mismatch: %d vs %d", i, sa->version, da->version);
585 if(sa->blocksize != da->blocksize)
586 sysfatal("arena %d: blocksize mismatch: %d vs %d", i, sa->blocksize, da->blocksize);
587 if(sa->size != da->size)
588 sysfatal("arena %d: size mismatch: %,lld vs %,lld", i, sa->size, da->size);
589 if(strcmp(sa->name, da->name) != 0)
590 sysfatal("arena %d: name mismatch: %s vs %s", i, sa->name, da->name);
593 /*
594 * Mirror one arena at a time.
595 */
596 writechan = chancreate(sizeof(void*), 0);
597 vtproc(writeproc, nil);
598 mirrormany(s, d, ranges);
599 sendp(writechan, nil);
600 threadexitsall(status);