Blob


1 #include "common.h"
2 #include <ctype.h>
4 void doalldirs(void);
5 void dodir(char*);
6 void dofile(Dir*);
7 void rundir(char*);
8 char* file(char*, char);
9 void warning(char*, void*);
10 void error(char*, void*);
11 int returnmail(char**, char*, char*);
12 void logit(char*, char*, char**);
13 void doload(int);
15 #define HUNK 32
16 char *cmd;
17 char *root;
18 int debug;
19 int giveup = 2*24*60*60;
20 int load;
21 int limit;
23 /* the current directory */
24 Dir *dirbuf;
25 long ndirbuf = 0;
26 int nfiles;
27 char *curdir;
29 char *runqlog = "runq";
31 int *pidlist;
32 char **badsys; /* array of recalcitrant systems */
33 int nbad;
34 int npid = 50;
35 int sflag; /* single thread per directory */
36 int aflag; /* all directories */
37 int Eflag; /* ignore E.xxxxxx dates */
38 int Rflag; /* no giving up, ever */
40 void
41 usage(void)
42 {
43 fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
44 exits("");
45 }
47 void
48 main(int argc, char **argv)
49 {
50 char *qdir, *x;
52 qdir = 0;
54 ARGBEGIN{
55 case 'l':
56 x = ARGF();
57 if(x == 0)
58 usage();
59 load = atoi(x);
60 if(load < 0)
61 load = 0;
62 break;
63 case 'E':
64 Eflag++;
65 break;
66 case 'R': /* no giving up -- just leave stuff in the queue */
67 Rflag++;
68 break;
69 case 'a':
70 aflag++;
71 break;
72 case 'd':
73 debug++;
74 break;
75 case 'r':
76 limit = atoi(ARGF());
77 break;
78 case 's':
79 sflag++;
80 break;
81 case 't':
82 giveup = 60*60*atoi(ARGF());
83 break;
84 case 'q':
85 qdir = ARGF();
86 if(qdir == 0)
87 usage();
88 break;
89 case 'n':
90 npid = atoi(ARGF());
91 if(npid == 0)
92 usage();
93 break;
94 }ARGEND;
96 if(argc != 2)
97 usage();
99 pidlist = malloc(npid*sizeof(*pidlist));
100 if(pidlist == 0)
101 error("can't malloc", 0);
103 if(aflag == 0 && qdir == 0) {
104 qdir = getuser();
105 if(qdir == 0)
106 error("unknown user", 0);
108 root = argv[0];
109 cmd = argv[1];
111 if(chdir(root) < 0)
112 error("can't cd to %s", root);
114 doload(1);
115 if(aflag)
116 doalldirs();
117 else
118 dodir(qdir);
119 doload(0);
120 exits(0);
123 int
124 emptydir(char *name)
126 int fd;
127 long n;
128 char buf[2048];
130 fd = open(name, OREAD);
131 if(fd < 0)
132 return 1;
133 n = read(fd, buf, sizeof(buf));
134 close(fd);
135 if(n <= 0) {
136 if(debug)
137 fprint(2, "removing directory %s\n", name);
138 syslog(0, runqlog, "rmdir %s", name);
139 sysremove(name);
140 return 1;
142 return 0;
145 int
146 forkltd(void)
148 int i;
149 int pid;
151 for(i = 0; i < npid; i++){
152 if(pidlist[i] <= 0)
153 break;
156 while(i >= npid){
157 pid = waitpid();
158 if(pid < 0){
159 syslog(0, runqlog, "forkltd confused");
160 exits(0);
163 for(i = 0; i < npid; i++)
164 if(pidlist[i] == pid)
165 break;
167 pidlist[i] = fork();
168 return pidlist[i];
171 /*
172 * run all user directories, must be bootes (or root on unix) to do this
173 */
174 void
175 doalldirs(void)
177 Dir *db;
178 int fd;
179 long i, n;
182 fd = open(".", OREAD);
183 if(fd == -1){
184 warning("reading %s", root);
185 return;
187 n = sysdirreadall(fd, &db);
188 if(n > 0){
189 for(i=0; i<n; i++){
190 if(db[i].qid.type & QTDIR){
191 if(emptydir(db[i].name))
192 continue;
193 switch(forkltd()){
194 case -1:
195 syslog(0, runqlog, "out of procs");
196 doload(0);
197 exits(0);
198 case 0:
199 if(sysdetach() < 0)
200 error("%r", 0);
201 dodir(db[i].name);
202 exits(0);
203 default:
204 break;
208 free(db);
210 close(fd);
213 /*
214 * cd to a user directory and run it
215 */
216 void
217 dodir(char *name)
219 curdir = name;
221 if(chdir(name) < 0){
222 warning("cd to %s", name);
223 return;
225 if(debug)
226 fprint(2, "running %s\n", name);
227 rundir(name);
228 chdir("..");
231 /*
232 * run the current directory
233 */
234 void
235 rundir(char *name)
237 int fd;
238 long i;
240 if(aflag && sflag)
241 fd = sysopenlocked(".", OREAD);
242 else
243 fd = open(".", OREAD);
244 if(fd == -1){
245 warning("reading %s", name);
246 return;
248 nfiles = sysdirreadall(fd, &dirbuf);
249 if(nfiles > 0){
250 for(i=0; i<nfiles; i++){
251 if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
252 continue;
253 dofile(&dirbuf[i]);
255 free(dirbuf);
257 if(aflag && sflag)
258 sysunlockfile(fd);
259 else
260 close(fd);
263 /*
264 * free files matching name in the current directory
265 */
266 void
267 remmatch(char *name)
269 long i;
271 syslog(0, runqlog, "removing %s/%s", curdir, name);
273 for(i=0; i<nfiles; i++){
274 if(strcmp(&dirbuf[i].name[1], &name[1]) == 0)
275 sysremove(dirbuf[i].name);
278 /* error file (may have) appeared after we read the directory */
279 /* stomp on data file in case of phase error */
280 sysremove(file(name, 'D'));
281 sysremove(file(name, 'E'));
284 /*
285 * like trylock, but we've already got the lock on fd,
286 * and don't want an L. lock file.
287 */
288 static Mlock *
289 keeplockalive(char *path, int fd)
291 char buf[1];
292 Mlock *l;
294 l = malloc(sizeof(Mlock));
295 if(l == 0)
296 return 0;
297 l->fd = fd;
298 l->name = s_new();
299 s_append(l->name, path);
301 /* fork process to keep lock alive until sysunlock(l) */
302 switch(l->pid = rfork(RFPROC)){
303 default:
304 break;
305 case 0:
306 fd = l->fd;
307 for(;;){
308 sleep(1000*60);
309 if(pread(fd, buf, 1, 0) < 0)
310 break;
312 _exits(0);
314 return l;
317 /*
318 * try a message
319 */
320 void
321 dofile(Dir *dp)
323 Dir *d;
324 int dfd, ac, dtime, efd, pid, i, etime;
325 char *buf, *cp, **av;
326 Waitmsg *wm;
327 Biobuf *b;
328 Mlock *l = nil;
330 if(debug)
331 fprint(2, "dofile %s\n", dp->name);
332 /*
333 * if no data file or empty control or data file, just clean up
334 * the empty control file must be 15 minutes old, to minimize the
335 * chance of a race.
336 */
337 d = dirstat(file(dp->name, 'D'));
338 if(d == nil){
339 syslog(0, runqlog, "no data file for %s", dp->name);
340 remmatch(dp->name);
341 return;
343 if(dp->length == 0){
344 if(time(0)-dp->mtime > 15*60){
345 syslog(0, runqlog, "empty ctl file for %s", dp->name);
346 remmatch(dp->name);
348 return;
350 dtime = d->mtime;
351 free(d);
353 /*
354 * retry times depend on the age of the errors file
355 */
356 if(!Eflag && (d = dirstat(file(dp->name, 'E'))) != nil){
357 etime = d->mtime;
358 free(d);
359 if(etime - dtime < 60*60){
360 /* up to the first hour, try every 15 minutes */
361 if(time(0) - etime < 15*60)
362 return;
363 } else {
364 /* after the first hour, try once an hour */
365 if(time(0) - etime < 60*60)
366 return;
371 /*
372 * open control and data
373 */
374 b = sysopen(file(dp->name, 'C'), "rl", 0660);
375 if(b == 0) {
376 if(debug)
377 fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
378 return;
380 dfd = open(file(dp->name, 'D'), OREAD);
381 if(dfd < 0){
382 if(debug)
383 fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
384 Bterm(b);
385 sysunlockfile(Bfildes(b));
386 return;
389 /*
390 * make arg list
391 * - read args into (malloc'd) buffer
392 * - malloc a vector and copy pointers to args into it
393 */
394 buf = malloc(dp->length+1);
395 if(buf == 0){
396 warning("buffer allocation", 0);
397 Bterm(b);
398 sysunlockfile(Bfildes(b));
399 close(dfd);
400 return;
402 if(Bread(b, buf, dp->length) != dp->length){
403 warning("reading control file %s\n", dp->name);
404 Bterm(b);
405 sysunlockfile(Bfildes(b));
406 close(dfd);
407 free(buf);
408 return;
410 buf[dp->length] = 0;
411 av = malloc(2*sizeof(char*));
412 if(av == 0){
413 warning("argv allocation", 0);
414 close(dfd);
415 free(buf);
416 Bterm(b);
417 sysunlockfile(Bfildes(b));
418 return;
420 for(ac = 1, cp = buf; *cp; ac++){
421 while(isspace(*cp))
422 *cp++ = 0;
423 if(*cp == 0)
424 break;
426 av = realloc(av, (ac+2)*sizeof(char*));
427 if(av == 0){
428 warning("argv allocation", 0);
429 close(dfd);
430 free(buf);
431 Bterm(b);
432 sysunlockfile(Bfildes(b));
433 return;
435 av[ac] = cp;
436 while(*cp && !isspace(*cp)){
437 if(*cp++ == '"'){
438 while(*cp && *cp != '"')
439 cp++;
440 if(*cp)
441 cp++;
445 av[0] = cmd;
446 av[ac] = 0;
448 if(!Eflag &&time(0) - dtime > giveup){
449 if(returnmail(av, dp->name, "Giveup") != 0)
450 logit("returnmail failed", dp->name, av);
451 remmatch(dp->name);
452 goto done;
455 for(i = 0; i < nbad; i++){
456 if(strcmp(av[3], badsys[i]) == 0)
457 goto done;
460 /*
461 * Ken's fs, for example, gives us 5 minutes of inactivity before
462 * the lock goes stale, so we have to keep reading it.
463 */
464 l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
466 /*
467 * transfer
468 */
469 pid = fork();
470 switch(pid){
471 case -1:
472 sysunlock(l);
473 sysunlockfile(Bfildes(b));
474 syslog(0, runqlog, "out of procs");
475 exits(0);
476 case 0:
477 if(debug) {
478 fprint(2, "Starting %s", cmd);
479 for(ac = 0; av[ac]; ac++)
480 fprint(2, " %s", av[ac]);
481 fprint(2, "\n");
483 logit("execing", dp->name, av);
484 close(0);
485 dup(dfd, 0);
486 close(dfd);
487 close(2);
488 efd = open(file(dp->name, 'E'), OWRITE);
489 if(efd < 0){
490 if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
491 efd = create(file(dp->name, 'E'), OWRITE, 0666);
492 if(efd < 0){
493 if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
494 exits("could not open error file - Retry");
497 seek(efd, 0, 2);
498 exec(cmd, av);
499 error("can't exec %s", cmd);
500 break;
501 default:
502 for(;;){
503 wm = wait();
504 if(wm == nil)
505 error("wait failed: %r", "");
506 if(wm->pid == pid)
507 break;
508 free(wm);
510 if(debug)
511 fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
513 if(wm->msg[0]){
514 if(debug)
515 fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
516 if(!Rflag && atoi(wm->msg) != RetryCode){
517 /* return the message and remove it */
518 if(returnmail(av, dp->name, wm->msg) != 0)
519 logit("returnmail failed", dp->name, av);
520 remmatch(dp->name);
521 } else {
522 /* add sys to bad list and try again later */
523 nbad++;
524 badsys = realloc(badsys, nbad*sizeof(char*));
525 badsys[nbad-1] = strdup(av[3]);
527 } else {
528 /* it worked remove the message */
529 remmatch(dp->name);
531 free(wm);
534 done:
535 if (l)
536 sysunlock(l);
537 Bterm(b);
538 sysunlockfile(Bfildes(b));
539 free(buf);
540 free(av);
541 close(dfd);
545 /*
546 * return a name starting with the given character
547 */
548 char*
549 file(char *name, char type)
551 static char nname[Elemlen+1];
553 strncpy(nname, name, Elemlen);
554 nname[Elemlen] = 0;
555 nname[0] = type;
556 return nname;
559 /*
560 * send back the mail with an error message
562 * return 0 if successful
563 */
564 int
565 returnmail(char **av, char *name, char *msg)
567 int pfd[2];
568 Waitmsg *wm;
569 int fd;
570 char buf[256];
571 char attachment[256];
572 int i;
573 long n;
574 String *s;
575 char *sender;
577 if(av[1] == 0 || av[2] == 0){
578 logit("runq - dumping bad file", name, av);
579 return 0;
582 s = unescapespecial(s_copy(av[2]));
583 sender = s_to_c(s);
585 if(!returnable(sender) || strcmp(sender, "postmaster") == 0) {
586 logit("runq - dumping p to p mail", name, av);
587 return 0;
590 if(pipe(pfd) < 0){
591 logit("runq - pipe failed", name, av);
592 return -1;
595 switch(rfork(RFFDG|RFPROC|RFENVG)){
596 case -1:
597 logit("runq - fork failed", name, av);
598 return -1;
599 case 0:
600 logit("returning", name, av);
601 close(pfd[1]);
602 close(0);
603 dup(pfd[0], 0);
604 close(pfd[0]);
605 putenv("upasname", "/dev/null");
606 snprint(buf, sizeof(buf), "%s/marshal", UPASBIN);
607 snprint(attachment, sizeof(attachment), "%s", file(name, 'D'));
608 execl(buf, "send", "-A", attachment, "-s", "permanent failure", sender, nil);
609 error("can't exec", 0);
610 break;
611 default:
612 break;
615 close(pfd[0]);
616 fprint(pfd[1], "\n"); /* get out of headers */
617 if(av[1]){
618 fprint(pfd[1], "Your request ``%.20s ", av[1]);
619 for(n = 3; av[n]; n++)
620 fprint(pfd[1], "%s ", av[n]);
622 fprint(pfd[1], "'' failed (code %s).\nThe symptom was:\n\n", msg);
623 fd = open(file(name, 'E'), OREAD);
624 if(fd >= 0){
625 for(;;){
626 n = read(fd, buf, sizeof(buf));
627 if(n <= 0)
628 break;
629 if(write(pfd[1], buf, n) != n){
630 close(fd);
631 goto out;
634 close(fd);
636 close(pfd[1]);
637 out:
638 wm = wait();
639 if(wm == nil){
640 syslog(0, "runq", "wait: %r");
641 logit("wait failed", name, av);
642 return -1;
644 i = 0;
645 if(wm->msg[0]){
646 i = -1;
647 syslog(0, "runq", "returnmail child: %s", wm->msg);
648 logit("returnmail child failed", name, av);
650 free(wm);
651 return i;
654 /*
655 * print a warning and continue
656 */
657 void
658 warning(char *f, void *a)
660 char err[65];
661 char buf[256];
663 rerrstr(err, sizeof(err));
664 snprint(buf, sizeof(buf), f, a);
665 fprint(2, "runq: %s: %s\n", buf, err);
668 /*
669 * print an error and die
670 */
671 void
672 error(char *f, void *a)
674 char err[Errlen];
675 char buf[256];
677 rerrstr(err, sizeof(err));
678 snprint(buf, sizeof(buf), f, a);
679 fprint(2, "runq: %s: %s\n", buf, err);
680 exits(buf);
683 void
684 logit(char *msg, char *file, char **av)
686 int n, m;
687 char buf[256];
689 n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg);
690 for(; *av; av++){
691 m = strlen(*av);
692 if(n + m + 4 > sizeof(buf))
693 break;
694 sprint(buf + n, " '%s'", *av);
695 n += m + 3;
697 syslog(0, runqlog, "%s", buf);
700 char *loadfile = ".runqload";
702 /*
703 * load balancing
704 */
705 void
706 doload(int start)
708 int fd;
709 char buf[32];
710 int i, n;
711 Mlock *l;
712 Dir *d;
714 if(load <= 0)
715 return;
717 if(chdir(root) < 0){
718 load = 0;
719 return;
722 l = syslock(loadfile);
723 fd = open(loadfile, ORDWR);
724 if(fd < 0){
725 fd = create(loadfile, 0666, ORDWR);
726 if(fd < 0){
727 load = 0;
728 sysunlock(l);
729 return;
733 /* get current load */
734 i = 0;
735 n = read(fd, buf, sizeof(buf)-1);
736 if(n >= 0){
737 buf[n] = 0;
738 i = atoi(buf);
740 if(i < 0)
741 i = 0;
743 /* ignore load if file hasn't been changed in 30 minutes */
744 d = dirfstat(fd);
745 if(d != nil){
746 if(d->mtime + 30*60 < time(0))
747 i = 0;
748 free(d);
751 /* if load already too high, give up */
752 if(start && i >= load){
753 sysunlock(l);
754 exits(0);
757 /* increment/decrement load */
758 if(start)
759 i++;
760 else
761 i--;
762 seek(fd, 0, 0);
763 fprint(fd, "%d\n", i);
764 sysunlock(l);
765 close(fd);