Blob


1 /*
2 * vbackup [-Dnv] fspartition [score]
3 *
4 * Copy a file system to a disk image stored on Venti.
5 * Prints a vnfs config line for the copied image.
6 *
7 * -D print debugging
8 * -f 'fast' writes - skip write if block exists on server
9 * -m set mount name
10 * -n nop -- don't actually write blocks
11 * -s print status updates
12 * -v print debugging trace
13 * -w write parallelism
14 *
15 * If score is given on the command line, it should be the
16 * score from a previous vbackup on this fspartition.
17 * In this mode, only the new blocks are stored to Venti.
18 * The result is still a complete image, but requires many
19 * fewer Venti writes in the common case.
20 *
21 * This program is structured as three processes connected
22 * by buffered queues:
23 *
24 * fsysproc | cmpproc | ventiproc
25 *
26 * Fsysproc reads the disk and queues the blocks.
27 * Cmpproc compares the blocks against the SHA1 hashes
28 * in the old image, if any. It discards the unchanged blocks
29 * and queues the changed ones. Ventiproc writes blocks to Venti.
30 *
31 * There is a fourth proc, statusproc, which prints status
32 * updates about how the various procs are progressing.
33 */
35 #include <u.h>
36 #include <libc.h>
37 #include <bio.h>
38 #include <thread.h>
39 #include <libsec.h>
40 #include <venti.h>
41 #include <diskfs.h>
42 #include "queue.h"
44 enum
45 {
46 STACK = 32768,
47 };
49 typedef struct WriteReq WriteReq;
50 struct WriteReq
51 {
52 Packet *p;
53 uint type;
54 uchar score[VtScoreSize];
55 };
57 Biobuf bscores; /* biobuf filled with block scores */
58 int debug; /* debugging flag (not used) */
59 Disk* disk; /* disk being backed up */
60 RWLock endlk; /* silly synchonization */
61 int errors; /* are we exiting with an error status? */
62 int fastwrites; /* do not write blocks already on server */
63 int fsscanblock; /* last block scanned */
64 Fsys* fsys; /* file system being backed up */
65 int nchange; /* number of changed blocks */
66 int nop; /* don't actually send blocks to venti */
67 int nskip; /* number of blocks skipped (already on server) */
68 int nwritethread; /* number of write-behind threads */
69 Queue* qcmp; /* queue fsys->cmp */
70 Queue* qventi; /* queue cmp->venti */
71 int statustime; /* print status every _ seconds */
72 int verbose; /* print extra stuff */
73 VtFile* vfile; /* venti file being written */
74 Channel* writechan; /* chan(WriteReq) */
75 VtConn* z; /* connection to venti */
76 VtCache* zcache; /* cache of venti blocks */
77 uchar* zero; /* blocksize zero bytes */
79 int nsend, nrecv;
81 void cmpproc(void*);
82 void fsysproc(void*);
83 void statusproc(void*);
84 void ventiproc(void*);
85 int timefmt(Fmt*);
86 char* mountplace(char *dev);
88 void
89 usage(void)
90 {
91 fprint(2, "usage: vbackup [-DVnv] [-m mtpt] [-s secs] [-w n] disk [score]\n");
92 threadexitsall("usage");
93 }
95 void
96 threadmain(int argc, char **argv)
97 {
98 char *pref, *mountname;
99 uchar score[VtScoreSize], prev[VtScoreSize];
100 int i, fd, csize;
101 vlong bsize;
102 Tm tm;
103 VtEntry e;
104 VtBlock *b;
105 VtCache *c;
106 VtRoot root;
107 char *tmp, *tmpnam;
109 fmtinstall('F', vtfcallfmt);
110 fmtinstall('H', encodefmt);
111 fmtinstall('T', timefmt);
112 fmtinstall('V', vtscorefmt);
114 mountname = sysname();
115 ARGBEGIN{
116 default:
117 usage();
118 break;
119 case 'D':
120 debug++;
121 break;
122 case 'V':
123 chattyventi = 1;
124 break;
125 case 'f':
126 fastwrites = 1;
127 break;
128 case 'm':
129 mountname = EARGF(usage());
130 break;
131 case 'n':
132 nop = 1;
133 break;
134 case 's':
135 statustime = atoi(EARGF(usage()));
136 break;
137 case 'v':
138 verbose = 1;
139 break;
140 case 'w':
141 nwritethread = atoi(EARGF(usage()));
142 break;
143 }ARGEND
145 if(argc != 1 && argc != 2)
146 usage();
148 if(statustime)
149 print("# %T vbackup %s %s\n", argv[0], argc>=2 ? argv[1] : "");
151 /*
152 * open fs
153 */
154 if((disk = diskopenfile(argv[0])) == nil)
155 sysfatal("diskopen: %r");
156 if((disk = diskcache(disk, 32768, 2*MAXQ+16)) == nil)
157 sysfatal("diskcache: %r");
158 if((fsys = fsysopen(disk)) == nil)
159 sysfatal("ffsopen: %r");
161 /*
162 * connect to venti
163 */
164 if((z = vtdial(nil)) == nil)
165 sysfatal("vtdial: %r");
166 if(vtconnect(z) < 0)
167 sysfatal("vtconnect: %r");
169 /*
170 * set up venti block cache
171 */
172 zero = vtmallocz(fsys->blocksize);
173 bsize = fsys->blocksize;
174 csize = 50; /* plenty; could probably do with 5 */
176 if(verbose)
177 fprint(2, "cache %d blocks\n", csize);
178 c = vtcachealloc(z, bsize, csize);
179 zcache = c;
181 /*
182 * parse starting score
183 */
184 memset(prev, 0, sizeof prev);
185 if(argc == 1){
186 vfile = vtfilecreateroot(c, (fsys->blocksize/VtScoreSize)*VtScoreSize,
187 fsys->blocksize, VtDataType);
188 if(vfile == nil)
189 sysfatal("vtfilecreateroot: %r");
190 vtfilelock(vfile, VtORDWR);
191 if(vtfilewrite(vfile, zero, 1, bsize*fsys->nblock-1) != 1)
192 sysfatal("vtfilewrite: %r");
193 if(vtfileflush(vfile) < 0)
194 sysfatal("vtfileflush: %r");
195 }else{
196 if(vtparsescore(argv[1], &pref, score) < 0)
197 sysfatal("bad score: %r");
198 if(pref!=nil && strcmp(pref, fsys->type) != 0)
199 sysfatal("score is %s but fsys is %s", pref, fsys->type);
200 b = vtcacheglobal(c, score, VtRootType);
201 if(b){
202 if(vtrootunpack(&root, b->data) < 0)
203 sysfatal("bad root: %r");
204 if(strcmp(root.type, fsys->type) != 0)
205 sysfatal("root is %s but fsys is %s", root.type, fsys->type);
206 memmove(prev, score, VtScoreSize);
207 memmove(score, root.score, VtScoreSize);
208 vtblockput(b);
210 b = vtcacheglobal(c, score, VtDirType);
211 if(b == nil)
212 sysfatal("vtcacheglobal %V: %r", score);
213 if(vtentryunpack(&e, b->data, 0) < 0)
214 sysfatal("%V: vtentryunpack failed", score);
215 if(verbose)
216 fprint(2, "entry: size %llud psize %d dsize %d\n",
217 e.size, e.psize, e.dsize);
218 vtblockput(b);
219 if((vfile = vtfileopenroot(c, &e)) == nil)
220 sysfatal("vtfileopenroot: %r");
221 vtfilelock(vfile, VtORDWR);
222 if(e.dsize != bsize)
223 sysfatal("file system block sizes don't match %d %lld", e.dsize, bsize);
224 if(e.size != fsys->nblock*bsize)
225 sysfatal("file system block counts don't match %lld %lld", e.size, fsys->nblock*bsize);
228 /*
229 * write scores of blocks into temporary file
230 */
231 if((tmp = getenv("TMP")) != nil){
232 /* okay, good */
233 }else if(access("/var/tmp", 0) >= 0)
234 tmp = "/var/tmp";
235 else
236 tmp = "/tmp";
237 tmpnam = smprint("%s/vbackup.XXXXXX", tmp);
238 if(tmpnam == nil)
239 sysfatal("smprint: %r");
241 if((fd = opentemp(tmpnam)) < 0)
242 sysfatal("opentemp %s: %r", tmpnam);
243 if(statustime)
244 print("# %T reading scores into %s\n", tmpnam);
245 if(verbose)
246 fprint(2, "read scores into %s...\n", tmpnam);
248 Binit(&bscores, fd, OWRITE);
249 for(i=0; i<fsys->nblock; i++){
250 if(vtfileblockscore(vfile, i, score) < 0)
251 sysfatal("vtfileblockhash %d: %r", i);
252 if(Bwrite(&bscores, score, VtScoreSize) != VtScoreSize)
253 sysfatal("Bwrite: %r");
255 Bterm(&bscores);
256 vtfileunlock(vfile);
258 /*
259 * prep scores for rereading
260 */
261 seek(fd, 0, 0);
262 Binit(&bscores, fd, OREAD);
264 /*
265 * start the main processes
266 */
267 if(statustime)
268 print("# %T starting procs\n");
269 qcmp = qalloc();
270 qventi = qalloc();
272 rlock(&endlk);
273 proccreate(fsysproc, nil, STACK);
274 rlock(&endlk);
275 proccreate(ventiproc, nil, STACK);
276 rlock(&endlk);
277 proccreate(cmpproc, nil, STACK);
278 if(statustime){
279 rlock(&endlk);
280 proccreate(statusproc, nil, STACK);
283 /*
284 * wait for processes to finish
285 */
286 wlock(&endlk);
288 if(statustime)
289 print("# %T procs exited: %d blocks changed, %d read, %d written, %d skipped, %d copied\n",
290 nchange, vtcachenread, vtcachenwrite, nskip, vtcachencopy);
292 /*
293 * prepare root block
294 */
295 vtfilelock(vfile, -1);
296 if(vtfileflush(vfile) < 0)
297 sysfatal("vtfileflush: %r");
298 if(vtfilegetentry(vfile, &e) < 0)
299 sysfatal("vtfilegetentry: %r");
301 b = vtcacheallocblock(c, VtDirType);
302 if(b == nil)
303 sysfatal("vtcacheallocblock: %r");
304 vtentrypack(&e, b->data, 0);
305 if(vtblockwrite(b) < 0)
306 sysfatal("vtblockwrite: %r");
308 memset(&root, 0, sizeof root);
309 strecpy(root.name, root.name+sizeof root.name, argv[0]);
310 strecpy(root.type, root.type+sizeof root.type, fsys->type);
311 memmove(root.score, b->score, VtScoreSize);
312 root.blocksize = fsys->blocksize;
313 memmove(root.prev, prev, VtScoreSize);
314 vtblockput(b);
316 b = vtcacheallocblock(c, VtRootType);
317 if(b == nil)
318 sysfatal("vtcacheallocblock: %r");
319 vtrootpack(&root, b->data);
320 if(vtblockwrite(b) < 0)
321 sysfatal("vtblockwrite: %r");
323 tm = *localtime(time(0));
324 tm.year += 1900;
325 tm.mon++;
326 print("mount /%s/%d/%02d%02d%s %s:%V %d/%02d%02d/%02d%02d\n",
327 mountname, tm.year, tm.mon, tm.mday,
328 mountplace(argv[0]),
329 root.type, b->score,
330 tm.year, tm.mon, tm.mday, tm.hour, tm.min);
331 print("# %T %s %s:%V\n", argv[0], root.type, b->score);
332 if(statustime)
333 print("# %T venti sync\n");
334 vtblockput(b);
335 if(vtsync(z) < 0)
336 sysfatal("vtsync: %r");
337 if(statustime)
338 print("# %T synced\n");
339 threadexitsall(nil);
342 void
343 fsysproc(void *dummy)
345 u32int i;
346 Block *db;
348 USED(dummy);
350 for(i=0; i<fsys->nblock; i++){
351 fsscanblock = i;
352 if((db = fsysreadblock(fsys, i)) != nil)
353 qwrite(qcmp, db, i);
355 fsscanblock = i;
356 qclose(qcmp);
358 if(statustime)
359 print("# %T fsys proc exiting\n");
360 runlock(&endlk);
363 void
364 cmpproc(void *dummy)
366 uchar *data;
367 Block *db;
368 u32int bno, bsize;
369 uchar score[VtScoreSize];
370 uchar score1[VtScoreSize];
372 USED(dummy);
374 bsize = fsys->blocksize;
375 while((db = qread(qcmp, &bno)) != nil){
376 data = db->data;
377 sha1(data, vtzerotruncate(VtDataType, data, bsize), score, nil);
378 if(Bseek(&bscores, (vlong)bno*VtScoreSize, 0) < 0)
379 sysfatal("cmpproc Bseek: %r");
380 if(Bread(&bscores, score1, VtScoreSize) != VtScoreSize)
381 sysfatal("cmpproc Bread: %r");
382 if(memcmp(score, score1, VtScoreSize) != 0){
383 nchange++;
384 if(verbose)
385 print("# block %ud: old %V new %V\n", bno, score1, score);
386 qwrite(qventi, db, bno);
387 }else
388 blockput(db);
390 qclose(qventi);
391 if(statustime)
392 print("# %T cmp proc exiting\n");
393 runlock(&endlk);
396 void
397 writethread(void *v)
399 WriteReq wr;
400 char err[ERRMAX];
402 USED(v);
404 while(recv(writechan, &wr) == 1){
405 nrecv++;
406 if(wr.p == nil)
407 break;
409 if(fastwrites && vtread(z, wr.score, wr.type, nil, 0) < 0){
410 rerrstr(err, sizeof err);
411 if(strstr(err, "read too small")){ /* already exists */
412 nskip++;
413 packetfree(wr.p);
414 continue;
417 if(vtwritepacket(z, wr.score, wr.type, wr.p) < 0)
418 sysfatal("vtwritepacket: %r");
422 int
423 myvtwrite(VtConn *z, uchar score[VtScoreSize], uint type, uchar *buf, int n)
425 WriteReq wr;
427 if(nwritethread == 0)
428 return vtwrite(z, score, type, buf, n);
430 wr.p = packetalloc();
431 packetappend(wr.p, buf, n);
432 packetsha1(wr.p, score);
433 memmove(wr.score, score, VtScoreSize);
434 wr.type = type;
435 nsend++;
436 send(writechan, &wr);
437 return 0;
440 void
441 ventiproc(void *dummy)
443 int i;
444 Block *db;
445 u32int bno;
446 u64int bsize;
448 USED(dummy);
450 proccreate(vtsendproc, z, STACK);
451 proccreate(vtrecvproc, z, STACK);
453 writechan = chancreate(sizeof(WriteReq), 0);
454 for(i=0; i<nwritethread; i++)
455 threadcreate(writethread, nil, STACK);
456 vtcachesetwrite(zcache, myvtwrite);
458 bsize = fsys->blocksize;
459 vtfilelock(vfile, -1);
460 while((db = qread(qventi, &bno)) != nil){
461 if(nop){
462 blockput(db);
463 continue;
465 if(vtfilewrite(vfile, db->data, bsize, bno*bsize) != bsize)
466 sysfatal("ventiproc vtfilewrite: %r");
467 if(vtfileflushbefore(vfile, (bno+1)*bsize) < 0)
468 sysfatal("ventiproc vtfileflushbefore: %r");
469 blockput(db);
471 vtfileunlock(vfile);
472 vtcachesetwrite(zcache, nil);
473 for(i=0; i<nwritethread; i++)
474 send(writechan, nil);
475 if(statustime)
476 print("# %T venti proc exiting - nsend %d nrecv %d\n", nsend, nrecv);
477 runlock(&endlk);
480 static int
481 percent(u32int a, u32int b)
483 return (vlong)a*100/b;
486 void
487 statusproc(void *dummy)
489 int n;
490 USED(dummy);
492 for(n=0;;n++){
493 sleep(1000);
494 if(qcmp->closed && qcmp->nel==0 && qventi->closed && qventi->nel==0)
495 break;
496 if(n < statustime)
497 continue;
498 n = 0;
499 print("# %T fsscan=%d%% cmpq=%d%% ventiq=%d%%\n",
500 percent(fsscanblock, fsys->nblock),
501 percent(qcmp->nel, MAXQ),
502 percent(qventi->nel, MAXQ));
504 print("# %T status proc exiting\n");
505 runlock(&endlk);
508 int
509 timefmt(Fmt *fmt)
511 vlong ns;
512 Tm tm;
513 ns = nsec();
514 tm = *localtime(time(0));
515 return fmtprint(fmt, "%04d/%02d%02d %02d:%02d:%02d.%03d",
516 tm.year+1900, tm.mon+1, tm.mday, tm.hour, tm.min, tm.sec,
517 (int)(ns%1000000000)/1000000);
520 char*
521 mountplace(char *dev)
523 char *cmd, *q;
524 int p[2], fd[3], n;
525 char buf[100];
527 if(pipe(p) < 0)
528 sysfatal("pipe: %r");
530 fd[0] = -1;
531 fd[1] = p[1];
532 fd[2] = -1;
533 cmd = smprint("mount | awk '$1==\"%s\" && $2 == \"on\" {print $3}'", dev);
534 if(threadspawnl(fd, "sh", "sh", "-c", cmd, nil) < 0)
535 sysfatal("exec mount|awk (to find mtpt of %s): %r", dev);
536 /* threadspawnl closed p[1] */
537 n = readn(p[0], buf, sizeof buf-1);
538 close(p[0]);
539 if(n <= 0)
540 return dev;
541 buf[n] = 0;
542 if((q = strchr(buf, '\n')) == nil)
543 return dev;
544 *q = 0;
545 q = buf+strlen(buf);
546 if(q>buf && *(q-1) == '/')
547 *--q = 0;
548 return strdup(buf);