Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <fcall.h>
4 #include <thread.h>
5 #include <errno.h>
7 #define err err9pserve /* Darwin x86 */
9 enum
10 {
11 STACK = 32768,
12 NHASH = 31,
13 MAXMSG = 64, /* per connection */
14 MAXMSGSIZE = 4*1024*1024
15 };
17 typedef struct Hash Hash;
18 typedef struct Fid Fid;
19 typedef struct Msg Msg;
20 typedef struct Conn Conn;
21 typedef struct Queue Queue;
23 struct Hash
24 {
25 Hash *next;
26 uint n;
27 void *v;
28 };
30 struct Fid
31 {
32 int fid;
33 int ref;
34 int cfid;
35 int openfd;
36 int offset;
37 int coffset;
38 int isdir;
39 Fid *next;
40 };
42 struct Msg
43 {
44 Conn *c;
45 int internal;
46 int sync;
47 int ref;
48 int ctag;
49 int tag;
50 int isopenfd;
51 Fcall tx;
52 Fcall rx;
53 Fid *fid;
54 Fid *newfid;
55 Fid *afid;
56 Msg *oldm;
57 Msg *next;
58 uchar *tpkt;
59 uchar *rpkt;
60 };
62 struct Conn
63 {
64 int fd;
65 int fdmode;
66 Fid *fdfid;
67 int nmsg;
68 int nfid;
69 Channel *inc;
70 Channel *internal;
71 int inputstalled;
72 char dir[40];
73 Hash *tag[NHASH];
74 Hash *fid[NHASH];
75 Queue *outq;
76 Queue *inq;
77 Channel *outqdead;
78 };
80 char *xaname;
81 char *addr;
82 int afd;
83 char adir[40];
84 int isunix;
85 Queue *outq;
86 Queue *inq;
87 int verbose = 0;
88 int logging = 0;
89 int msize = 8192;
90 u32int xafid = NOFID;
91 int attached;
92 int versioned;
93 int noauth;
95 void *gethash(Hash**, uint);
96 int puthash(Hash**, uint, void*);
97 int delhash(Hash**, uint, void*);
98 Msg *mread9p(Ioproc*, int);
99 int mwrite9p(Ioproc*, int, uchar*);
100 uchar *read9ppkt(Ioproc*, int);
101 int write9ppkt(int, uchar*);
102 Msg *msgnew(int);
103 void msgput(Msg*);
104 void msgclear(Msg*);
105 Msg *msgget(int);
106 void msgincref(Msg*);
107 Fid *fidnew(int);
108 void fidput(Fid*);
109 void *emalloc(int);
110 void *erealloc(void*, int);
111 Queue *qalloc(void);
112 int sendq(Queue*, void*);
113 void *recvq(Queue*);
114 void connthread(void*);
115 void connoutthread(void*);
116 void listenthread(void*);
117 void outputthread(void*);
118 void inputthread(void*);
119 void rewritehdr(Fcall*, uchar*);
120 void repack(Fcall*, uchar**);
121 int tlisten(char*, char*);
122 int taccept(int, char*);
123 int iolisten(Ioproc*, char*, char*);
124 int ioaccept(Ioproc*, int, char*);
125 int iorecvfd(Ioproc*, int);
126 int iosendfd(Ioproc*, int, int);
127 void mainproc(void*);
128 int ignorepipe(void*, char*);
129 int timefmt(Fmt*);
130 void dorootstat(void);
132 void
133 usage(void)
135 fprint(2, "usage: 9pserve [-lnv] [-A aname afid] [-c addr] [-M msize] address\n");
136 fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
137 threadexitsall("usage");
140 uchar vbuf[128];
141 extern int _threaddebuglevel;
142 void
143 threadmain(int argc, char **argv)
145 char *file, *x, *addr;
146 int fd;
148 rfork(RFNOTEG);
149 x = getenv("verbose9pserve");
150 if(x){
151 verbose = atoi(x);
152 fprint(2, "verbose9pserve %s => %d\n", x, verbose);
154 ARGBEGIN{
155 default:
156 usage();
157 case 'A':
158 attached = 1;
159 xaname = EARGF(usage());
160 xafid = atoi(EARGF(usage()));
161 break;
162 case 'M':
163 versioned = 1;
164 msize = atoi(EARGF(usage()));
165 break;
166 case 'c':
167 addr = netmkaddr(EARGF(usage()), "net", "9fs");
168 if((fd = dial(addr, nil, nil, nil)) < 0)
169 sysfatal("dial %s: %r", addr);
170 dup(fd, 0);
171 dup(fd, 1);
172 if(fd > 1)
173 close(fd);
174 break;
175 case 'n':
176 noauth = 1;
177 break;
178 case 'v':
179 verbose++;
180 break;
181 case 'u':
182 isunix++;
183 break;
184 case 'l':
185 logging++;
186 break;
187 }ARGEND
189 if(attached && !versioned){
190 fprint(2, "-A must be used with -M\n");
191 usage();
194 if(argc != 1)
195 usage();
196 addr = argv[0];
198 fmtinstall('T', timefmt);
200 if((afd = announce(addr, adir)) < 0)
201 sysfatal("announce %s: %r", addr);
202 if(logging){
203 if(strncmp(addr, "unix!", 5) == 0)
204 addr += 5;
205 file = smprint("%s.log", addr);
206 if(file == nil)
207 sysfatal("smprint log: %r");
208 if((fd = create(file, OWRITE, 0666)) < 0)
209 sysfatal("create %s: %r", file);
210 dup(fd, 2);
211 if(fd > 2)
212 close(fd);
214 if(verbose) fprint(2, "%T 9pserve running\n");
215 proccreate(mainproc, nil, STACK);
218 void
219 mainproc(void *v)
221 int n, nn;
222 Fcall f;
223 USED(v);
225 atnotify(ignorepipe, 1);
226 fmtinstall('D', dirfmt);
227 fmtinstall('M', dirmodefmt);
228 fmtinstall('F', fcallfmt);
229 fmtinstall('H', encodefmt);
231 outq = qalloc();
232 inq = qalloc();
234 if(!versioned){
235 f.type = Tversion;
236 f.version = "9P2000";
237 f.msize = msize;
238 f.tag = NOTAG;
239 n = convS2M(&f, vbuf, sizeof vbuf);
240 if(n <= BIT16SZ)
241 sysfatal("convS2M conversion error");
242 if(verbose > 1) fprint(2, "%T * <- %F\n", &f);
243 nn = write(1, vbuf, n);
244 if(n != nn)
245 sysfatal("error writing Tversion: %r\n");
246 n = read9pmsg(0, vbuf, sizeof vbuf);
247 if(n < 0)
248 sysfatal("read9pmsg failure");
249 if(convM2S(vbuf, n, &f) != n)
250 sysfatal("convM2S failure");
251 if(f.msize < msize)
252 msize = f.msize;
253 if(verbose > 1) fprint(2, "%T * -> %F\n", &f);
256 threadcreate(inputthread, nil, STACK);
257 threadcreate(outputthread, nil, STACK);
259 /* if(rootfid) */
260 /* dorootstat(); */
262 threadcreate(listenthread, nil, STACK);
263 threadexits(0);
266 int
267 ignorepipe(void *v, char *s)
269 USED(v);
270 if(strcmp(s, "sys: write on closed pipe") == 0)
271 return 1;
272 if(strcmp(s, "sys: tstp") == 0)
273 return 1;
274 if(strcmp(s, "sys: window size change") == 0)
275 return 1;
276 fprint(2, "9pserve %s: %T note: %s\n", addr, s);
277 return 0;
280 void
281 listenthread(void *arg)
283 Conn *c;
284 Ioproc *io;
286 io = ioproc();
287 USED(arg);
288 threadsetname("listen %s", adir);
289 for(;;){
290 c = emalloc(sizeof(Conn));
291 c->fd = iolisten(io, adir, c->dir);
292 if(c->fd < 0){
293 if(verbose) fprint(2, "%T listen: %r\n");
294 close(afd);
295 free(c);
296 return;
298 c->inc = chancreate(sizeof(void*), 0);
299 c->internal = chancreate(sizeof(void*), 0);
300 c->inq = qalloc();
301 c->outq = qalloc();
302 c->outqdead = chancreate(sizeof(void*), 0);
303 if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
304 threadcreate(connthread, c, STACK);
308 void
309 send9pmsg(Msg *m)
311 int n, nn;
313 n = sizeS2M(&m->rx);
314 m->rpkt = emalloc(n);
315 nn = convS2M(&m->rx, m->rpkt, n);
316 if(nn <= BIT16SZ)
317 sysfatal("convS2M conversion error");
318 if(nn != n)
319 sysfatal("sizeS2M and convS2M disagree");
320 sendq(m->c->outq, m);
323 void
324 sendomsg(Msg *m)
326 int n, nn;
328 n = sizeS2M(&m->tx);
329 m->tpkt = emalloc(n);
330 nn = convS2M(&m->tx, m->tpkt, n);
331 if(nn <= BIT16SZ)
332 sysfatal("convS2M conversion error");
333 if(nn != n)
334 sysfatal("sizeS2M and convS2M disagree");
335 sendq(outq, m);
338 void
339 err(Msg *m, char *ename)
341 m->rx.type = Rerror;
342 m->rx.ename = ename;
343 m->rx.tag = m->tx.tag;
344 send9pmsg(m);
347 char*
348 estrdup(char *s)
350 char *t;
352 t = emalloc(strlen(s)+1);
353 strcpy(t, s);
354 return t;
357 void
358 connthread(void *arg)
360 int i, fd;
361 Conn *c;
362 Hash *h, *hnext;
363 Msg *m, *om, *mm, sync;
364 Fid *f;
365 Ioproc *io;
367 c = arg;
368 threadsetname("conn %s", c->dir);
369 io = ioproc();
370 fd = ioaccept(io, c->fd, c->dir);
371 if(fd < 0){
372 if(verbose) fprint(2, "%T accept %s: %r\n", c->dir);
373 goto out;
375 close(c->fd);
376 c->fd = fd;
377 threadcreate(connoutthread, c, STACK);
378 while((m = mread9p(io, c->fd)) != nil){
379 if(verbose > 1) fprint(2, "%T fd#%d -> %F\n", c->fd, &m->tx);
380 m->c = c;
381 m->ctag = m->tx.tag;
382 c->nmsg++;
383 if(verbose > 1) fprint(2, "%T fd#%d: new msg %p\n", c->fd, m);
384 if(puthash(c->tag, m->tx.tag, m) < 0){
385 err(m, "duplicate tag");
386 continue;
388 msgincref(m);
389 switch(m->tx.type){
390 case Tversion:
391 m->rx.tag = m->tx.tag;
392 m->rx.msize = m->tx.msize;
393 if(m->rx.msize > msize)
394 m->rx.msize = msize;
395 m->rx.version = "9P2000";
396 m->rx.type = Rversion;
397 send9pmsg(m);
398 continue;
399 case Tflush:
400 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
401 m->rx.tag = m->tx.tag;
402 m->rx.type = Rflush;
403 send9pmsg(m);
404 continue;
406 msgincref(m->oldm);
407 break;
408 case Tattach:
409 m->afid = nil;
410 if(m->tx.afid != NOFID
411 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){
412 err(m, "unknown fid");
413 continue;
415 if(m->afid)
416 m->afid->ref++;
417 m->fid = fidnew(m->tx.fid);
418 if(puthash(c->fid, m->tx.fid, m->fid) < 0){
419 err(m, "duplicate fid");
420 continue;
422 m->fid->ref++;
423 if(attached && m->afid==nil){
424 if(m->tx.aname[0] && strcmp(xaname, m->tx.aname) != 0){
425 err(m, "invalid attach name");
426 continue;
428 m->tx.afid = xafid;
429 m->tx.aname = xaname;
430 m->tx.uname = getuser(); /* what srv.c used */
431 repack(&m->tx, &m->tpkt);
433 break;
434 case Twalk:
435 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
436 err(m, "unknown fid");
437 continue;
439 m->fid->ref++;
440 if(m->tx.newfid == m->tx.fid){
441 m->fid->ref++;
442 m->newfid = m->fid;
443 }else{
444 m->newfid = fidnew(m->tx.newfid);
445 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
446 err(m, "duplicate fid");
447 continue;
449 m->newfid->ref++;
451 break;
452 case Tauth:
453 if(attached){
454 err(m, "authentication not required");
455 continue;
457 if(noauth){
458 err(m, "authentication rejected");
459 continue;
461 m->afid = fidnew(m->tx.afid);
462 if(puthash(c->fid, m->tx.afid, m->afid) < 0){
463 err(m, "duplicate fid");
464 continue;
466 m->afid->ref++;
467 break;
468 case Tcreate:
469 if(m->tx.perm&(DMSYMLINK|DMDEVICE|DMNAMEDPIPE|DMSOCKET)){
470 err(m, "unsupported file type");
471 continue;
473 goto caseTopen;
474 case Topenfd:
475 if(m->tx.mode&~(OTRUNC|3)){
476 err(m, "bad openfd mode");
477 continue;
479 m->isopenfd = 1;
480 m->tx.type = Topen;
481 m->tpkt[4] = Topen;
482 /* fall through */
483 caseTopen:
484 case Topen:
485 case Tclunk:
486 case Tread:
487 case Twrite:
488 case Tremove:
489 case Tstat:
490 case Twstat:
491 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
492 err(m, "unknown fid");
493 continue;
495 m->fid->ref++;
496 break;
499 /* have everything - translate and send */
500 m->c = c;
501 m->ctag = m->tx.tag;
502 m->tx.tag = m->tag;
503 if(m->fid)
504 m->tx.fid = m->fid->fid;
505 if(m->newfid)
506 m->tx.newfid = m->newfid->fid;
507 if(m->afid)
508 m->tx.afid = m->afid->fid;
509 if(m->oldm)
510 m->tx.oldtag = m->oldm->tag;
511 /* reference passes to outq */
512 sendq(outq, m);
513 while(c->nmsg >= MAXMSG){
514 c->inputstalled = 1;
515 recvp(c->inc);
519 if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
521 /* flush all outstanding messages */
522 for(i=0; i<NHASH; i++){
523 while((h = c->tag[i]) != nil){
524 om = h->v;
525 msgincref(om); /* for us */
526 m = msgnew(0);
527 m->internal = 1;
528 m->c = c;
529 c->nmsg++;
530 m->tx.type = Tflush;
531 m->tx.tag = m->tag;
532 m->tx.oldtag = om->tag;
533 m->oldm = om;
534 msgincref(om);
535 msgincref(m); /* for outq */
536 sendomsg(m);
537 mm = recvp(c->internal);
538 assert(mm == m);
539 msgput(m); /* got from recvp */
540 msgput(m); /* got from msgnew */
541 if(delhash(c->tag, om->ctag, om) == 0)
542 msgput(om); /* got from hash table */
543 msgput(om); /* got from msgincref */
547 /*
548 * outputthread has written all its messages
549 * to the remote connection (because we've gotten all the replies!),
550 * but it might not have gotten a chance to msgput
551 * the very last one. sync up to make sure.
552 */
553 memset(&sync, 0, sizeof sync);
554 sync.sync = 1;
555 sync.c = c;
556 sendq(outq, &sync);
557 recvp(c->outqdead);
559 /* everything is quiet; can close the local output queue. */
560 sendq(c->outq, nil);
561 recvp(c->outqdead);
563 /* should be no messages left anywhere. */
564 assert(c->nmsg == 0);
566 /* clunk all outstanding fids */
567 for(i=0; i<NHASH; i++){
568 for(h=c->fid[i]; h; h=hnext){
569 f = h->v;
570 m = msgnew(0);
571 m->internal = 1;
572 m->c = c;
573 c->nmsg++;
574 m->tx.type = Tclunk;
575 m->tx.tag = m->tag;
576 m->tx.fid = f->fid;
577 m->fid = f;
578 f->ref++;
579 msgincref(m);
580 sendomsg(m);
581 mm = recvp(c->internal);
582 assert(mm == m);
583 msgclear(m);
584 msgput(m); /* got from recvp */
585 msgput(m); /* got from msgnew */
586 fidput(f); /* got from hash table */
587 hnext = h->next;
588 free(h);
592 out:
593 closeioproc(io);
594 assert(c->nmsg == 0);
595 assert(c->nfid == 0);
596 close(c->fd);
597 chanfree(c->internal);
598 c->internal = 0;
599 chanfree(c->inc);
600 c->inc = 0;
601 free(c->inq);
602 c->inq = 0;
603 free(c);
606 static void
607 openfdthread(void *v)
609 Conn *c;
610 Fid *fid;
611 Msg *m;
612 int n;
613 vlong tot;
614 Ioproc *io;
615 char buf[1024];
617 c = v;
618 fid = c->fdfid;
619 io = ioproc();
620 threadsetname("openfd %s", c->fdfid);
621 tot = 0;
622 m = nil;
623 if(c->fdmode == OREAD){
624 for(;;){
625 if(verbose) fprint(2, "%T tread...");
626 m = msgnew(0);
627 m->internal = 1;
628 m->c = c;
629 m->tx.type = Tread;
630 m->tx.count = msize - IOHDRSZ;
631 m->tx.fid = fid->fid;
632 m->tx.tag = m->tag;
633 m->tx.offset = tot;
634 m->fid = fid;
635 fid->ref++;
636 msgincref(m);
637 sendomsg(m);
638 recvp(c->internal);
639 if(m->rx.type == Rerror){
640 /* fprint(2, "%T read error: %s\n", m->rx.ename); */
641 break;
643 if(m->rx.count == 0)
644 break;
645 tot += m->rx.count;
646 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
647 /* fprint(2, "%T pipe write error: %r\n"); */
648 break;
650 msgput(m);
651 msgput(m);
652 m = nil;
654 }else{
655 for(;;){
656 if(verbose) fprint(2, "%T twrite...");
657 n = sizeof buf;
658 if(n > msize)
659 n = msize;
660 if((n=ioread(io, c->fd, buf, n)) <= 0){
661 if(n < 0)
662 fprint(2, "%T pipe read error: %r\n");
663 break;
665 m = msgnew(0);
666 m->internal = 1;
667 m->c = c;
668 m->tx.type = Twrite;
669 m->tx.fid = fid->fid;
670 m->tx.data = buf;
671 m->tx.count = n;
672 m->tx.tag = m->tag;
673 m->tx.offset = tot;
674 m->fid = fid;
675 fid->ref++;
676 msgincref(m);
677 sendomsg(m);
678 recvp(c->internal);
679 if(m->rx.type == Rerror){
680 /* fprint(2, "%T write error: %s\n", m->rx.ename); */
682 tot += n;
683 msgput(m);
684 msgput(m);
685 m = nil;
688 if(verbose) fprint(2, "%T eof on %d fid %d\n", c->fd, fid->fid);
689 close(c->fd);
690 closeioproc(io);
691 if(m){
692 msgput(m);
693 msgput(m);
695 if(verbose) fprint(2, "%T eof on %d fid %d ref %d\n", c->fd, fid->fid, fid->ref);
696 if(--fid->openfd == 0){
697 m = msgnew(0);
698 m->internal = 1;
699 m->c = c;
700 m->tx.type = Tclunk;
701 m->tx.tag = m->tag;
702 m->tx.fid = fid->fid;
703 m->fid = fid;
704 fid->ref++;
705 msgincref(m);
706 sendomsg(m);
707 recvp(c->internal);
708 msgput(m);
709 msgput(m);
711 fidput(fid);
712 c->fdfid = nil;
713 chanfree(c->internal);
714 c->internal = 0;
715 free(c);
718 int
719 xopenfd(Msg *m)
721 char errs[ERRMAX];
722 int n, p[2];
723 Conn *nc;
725 if(pipe(p) < 0){
726 rerrstr(errs, sizeof errs);
727 err(m, errs);
728 /* XXX return here? */
730 if(verbose) fprint(2, "%T xopen pipe %d %d...", p[0], p[1]);
732 /* now we're committed. */
734 /* a new connection for this fid */
735 nc = emalloc(sizeof(Conn));
736 nc->internal = chancreate(sizeof(void*), 0);
738 /* a ref for us */
739 nc->fdfid = m->fid;
740 m->fid->ref++;
741 nc->fdfid->openfd++;
742 nc->fdmode = m->tx.mode;
743 nc->fd = p[0];
745 /* a thread to tend the pipe */
746 threadcreate(openfdthread, nc, STACK);
748 /* if mode is ORDWR, that openfdthread will write; start a reader */
749 if((m->tx.mode&3) == ORDWR){
750 nc = emalloc(sizeof(Conn));
751 nc->internal = chancreate(sizeof(void*), 0);
752 nc->fdfid = m->fid;
753 m->fid->ref++;
754 nc->fdfid->openfd++;
755 nc->fdmode = OREAD;
756 nc->fd = dup(p[0], -1);
757 threadcreate(openfdthread, nc, STACK);
760 /* steal fid from other connection */
761 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
762 fidput(m->fid);
764 /* rewrite as Ropenfd */
765 m->rx.type = Ropenfd;
766 n = GBIT32(m->rpkt);
767 m->rpkt = erealloc(m->rpkt, n+4);
768 PBIT32(m->rpkt+n, p[1]);
769 n += 4;
770 PBIT32(m->rpkt, n);
771 m->rpkt[4] = Ropenfd;
772 m->rx.unixfd = p[1];
773 return 0;
776 void
777 connoutthread(void *arg)
779 int err;
780 Conn *c;
781 Msg *m, *om;
782 Ioproc *io;
784 c = arg;
785 io = ioproc();
786 threadsetname("connout %s", c->dir);
787 while((m = recvq(c->outq)) != nil){
788 err = m->tx.type+1 != m->rx.type;
789 if(!err && m->isopenfd)
790 if(xopenfd(m) < 0)
791 continue;
792 switch(m->tx.type){
793 case Tflush:
794 om = m->oldm;
795 if(om)
796 if(delhash(om->c->tag, om->ctag, om) == 0)
797 msgput(om);
798 break;
799 case Tclunk:
800 case Tremove:
801 if(m->fid)
802 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
803 fidput(m->fid);
804 break;
805 case Tauth:
806 if(err && m->afid){
807 if(verbose) fprint(2, "%T auth error\n");
808 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
809 fidput(m->afid);
811 break;
812 case Tattach:
813 if(err && m->fid)
814 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
815 fidput(m->fid);
816 break;
817 case Twalk:
818 if(err || m->rx.nwqid < m->tx.nwname)
819 if(m->tx.fid != m->tx.newfid && m->newfid)
820 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
821 fidput(m->newfid);
822 break;
823 case Tread:
824 break;
825 case Tstat:
826 break;
827 case Topen:
828 case Tcreate:
829 m->fid->isdir = (m->rx.qid.type & QTDIR);
830 break;
832 if(delhash(m->c->tag, m->ctag, m) == 0)
833 msgput(m);
834 if(verbose > 1) fprint(2, "%T fd#%d <- %F\n", c->fd, &m->rx);
835 rewritehdr(&m->rx, m->rpkt);
836 if(mwrite9p(io, c->fd, m->rpkt) < 0)
837 if(verbose) fprint(2, "%T write error: %r\n");
838 msgput(m);
839 if(c->inputstalled && c->nmsg < MAXMSG)
840 nbsendp(c->inc, 0);
842 closeioproc(io);
843 free(c->outq);
844 c->outq = nil;
845 sendp(c->outqdead, nil);
848 void
849 outputthread(void *arg)
851 Msg *m;
852 Ioproc *io;
854 USED(arg);
855 io = ioproc();
856 threadsetname("output");
857 while((m = recvq(outq)) != nil){
858 if(m->sync){
859 sendp(m->c->outqdead, nil);
860 continue;
862 if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
863 rewritehdr(&m->tx, m->tpkt);
864 if(mwrite9p(io, 1, m->tpkt) < 0)
865 sysfatal("output error: %r");
866 msgput(m);
868 closeioproc(io);
869 fprint(2, "%T output eof\n");
870 threadexitsall(0);
873 void
874 inputthread(void *arg)
876 uchar *pkt;
877 int n, nn, tag;
878 Msg *m;
879 Ioproc *io;
881 threadsetname("input");
882 if(verbose) fprint(2, "%T input thread\n");
883 io = ioproc();
884 USED(arg);
885 while((pkt = read9ppkt(io, 0)) != nil){
886 n = GBIT32(pkt);
887 if(n < 7){
888 fprint(2, "%T short 9P packet from server\n");
889 free(pkt);
890 continue;
892 if(verbose > 2) fprint(2, "%T read %.*H\n", n, pkt);
893 tag = GBIT16(pkt+5);
894 if((m = msgget(tag)) == nil){
895 fprint(2, "%T unexpected 9P response tag %d\n", tag);
896 free(pkt);
897 continue;
899 if((nn = convM2S(pkt, n, &m->rx)) != n){
900 fprint(2, "%T bad packet - convM2S %d but %d\n", nn, n);
901 free(pkt);
902 msgput(m);
903 continue;
905 if(verbose > 1) fprint(2, "%T * -> %F%s\n", &m->rx,
906 m->internal ? " (internal)" : "");
907 m->rpkt = pkt;
908 m->rx.tag = m->ctag;
909 if(m->internal)
910 sendp(m->c->internal, m);
911 else if(m->c->outq)
912 sendq(m->c->outq, m);
913 else
914 msgput(m);
916 closeioproc(io);
917 /*fprint(2, "%T input eof\n"); */
918 threadexitsall(0);
921 void*
922 gethash(Hash **ht, uint n)
924 Hash *h;
926 for(h=ht[n%NHASH]; h; h=h->next)
927 if(h->n == n)
928 return h->v;
929 return nil;
932 int
933 delhash(Hash **ht, uint n, void *v)
935 Hash *h, **l;
937 for(l=&ht[n%NHASH]; h=*l; l=&h->next)
938 if(h->n == n){
939 if(h->v != v){
940 if(verbose) fprint(2, "%T delhash %d got %p want %p\n", n, h->v, v);
941 return -1;
943 *l = h->next;
944 free(h);
945 return 0;
947 return -1;
950 int
951 puthash(Hash **ht, uint n, void *v)
953 Hash *h;
955 if(gethash(ht, n))
956 return -1;
957 h = emalloc(sizeof(Hash));
958 h->next = ht[n%NHASH];
959 h->n = n;
960 h->v = v;
961 ht[n%NHASH] = h;
962 return 0;
965 Fid **fidtab;
966 int nfidtab;
967 Fid *freefid;
969 Fid*
970 fidnew(int cfid)
972 Fid *f;
974 if(freefid == nil){
975 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
976 if(nfidtab == xafid){
977 fidtab[nfidtab++] = nil;
978 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
980 fidtab[nfidtab] = emalloc(sizeof(Fid));
981 freefid = fidtab[nfidtab];
982 freefid->fid = nfidtab++;
984 f = freefid;
985 freefid = f->next;
986 f->cfid = cfid;
987 f->ref = 1;
988 f->offset = 0;
989 f->coffset = 0;
990 f->isdir = -1;
991 return f;
994 void
995 fidput(Fid *f)
997 if(f == nil)
998 return;
999 assert(f->ref > 0);
1000 if(--f->ref > 0)
1001 return;
1002 f->next = freefid;
1003 f->cfid = -1;
1004 freefid = f;
1007 Msg **msgtab;
1008 int nmsgtab;
1009 int nmsg;
1010 Msg *freemsg;
1012 void
1013 msgincref(Msg *m)
1015 if(verbose > 1) fprint(2, "%T msgincref @0x%lux %p tag %d/%d ref %d=>%d\n",
1016 getcallerpc(&m), m, m->tag, m->ctag, m->ref, m->ref+1);
1017 m->ref++;
1020 Msg*
1021 msgnew(int x)
1023 Msg *m;
1025 if(freemsg == nil){
1026 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
1027 msgtab[nmsgtab] = emalloc(sizeof(Msg));
1028 freemsg = msgtab[nmsgtab];
1029 freemsg->tag = nmsgtab++;
1031 m = freemsg;
1032 freemsg = m->next;
1033 m->ref = 1;
1034 if(verbose > 1) fprint(2, "%T msgnew @0x%lux %p tag %d ref %d\n",
1035 getcallerpc(&x), m, m->tag, m->ref);
1036 nmsg++;
1037 return m;
1041 * Clear data associated with connections, so that
1042 * if all msgs have been msgcleared, the connection
1043 * can be freed. Note that this does *not* free the tpkt
1044 * and rpkt; they are freed in msgput with the msg itself.
1045 * The io write thread might still be holding a ref to msg
1046 * even once the connection has finished with it.
1048 void
1049 msgclear(Msg *m)
1051 if(m->c){
1052 m->c->nmsg--;
1053 m->c = nil;
1055 if(m->oldm){
1056 msgput(m->oldm);
1057 m->oldm = nil;
1059 if(m->fid){
1060 fidput(m->fid);
1061 m->fid = nil;
1063 if(m->afid){
1064 fidput(m->afid);
1065 m->afid = nil;
1067 if(m->newfid){
1068 fidput(m->newfid);
1069 m->newfid = nil;
1071 if(m->rx.type == Ropenfd && m->rx.unixfd >= 0){
1072 close(m->rx.unixfd);
1073 m->rx.unixfd = -1;
1077 void
1078 msgput(Msg *m)
1080 if(m == nil)
1081 return;
1083 if(verbose > 1) fprint(2, "%T msgput 0x%lux %p tag %d/%d ref %d\n",
1084 getcallerpc(&m), m, m->tag, m->ctag, m->ref);
1085 assert(m->ref > 0);
1086 if(--m->ref > 0)
1087 return;
1088 nmsg--;
1089 msgclear(m);
1090 if(m->tpkt){
1091 free(m->tpkt);
1092 m->tpkt = nil;
1094 if(m->rpkt){
1095 free(m->rpkt);
1096 m->rpkt = nil;
1098 m->isopenfd = 0;
1099 m->internal = 0;
1100 m->next = freemsg;
1101 freemsg = m;
1104 Msg*
1105 msgget(int n)
1107 Msg *m;
1109 if(n < 0 || n >= nmsgtab)
1110 return nil;
1111 m = msgtab[n];
1112 if(m->ref == 0)
1113 return nil;
1114 if(verbose) fprint(2, "%T msgget %d = %p\n", n, m);
1115 msgincref(m);
1116 return m;
1120 void*
1121 emalloc(int n)
1123 void *v;
1125 v = mallocz(n, 1);
1126 if(v == nil){
1127 abort();
1128 sysfatal("out of memory allocating %d", n);
1130 return v;
1133 void*
1134 erealloc(void *v, int n)
1136 v = realloc(v, n);
1137 if(v == nil){
1138 abort();
1139 sysfatal("out of memory reallocating %d", n);
1141 return v;
1144 typedef struct Qel Qel;
1145 struct Qel
1147 Qel *next;
1148 void *p;
1151 struct Queue
1153 QLock lk;
1154 Rendez r;
1155 Qel *head;
1156 Qel *tail;
1159 Queue*
1160 qalloc(void)
1162 Queue *q;
1164 q = mallocz(sizeof(Queue), 1);
1165 if(q == nil)
1166 return nil;
1167 q->r.l = &q->lk;
1168 return q;
1171 int
1172 sendq(Queue *q, void *p)
1174 Qel *e;
1176 e = emalloc(sizeof(Qel));
1177 qlock(&q->lk);
1178 e->p = p;
1179 e->next = nil;
1180 if(q->head == nil)
1181 q->head = e;
1182 else
1183 q->tail->next = e;
1184 q->tail = e;
1185 rwakeup(&q->r);
1186 qunlock(&q->lk);
1187 return 0;
1190 void*
1191 recvq(Queue *q)
1193 void *p;
1194 Qel *e;
1196 qlock(&q->lk);
1197 while(q->head == nil)
1198 rsleep(&q->r);
1199 e = q->head;
1200 q->head = e->next;
1201 qunlock(&q->lk);
1202 p = e->p;
1203 free(e);
1204 return p;
1207 uchar*
1208 read9ppkt(Ioproc *io, int fd)
1210 uchar buf[4], *pkt;
1211 int n, nn;
1213 n = ioreadn(io, fd, buf, 4);
1214 if(n != 4)
1215 return nil;
1216 n = GBIT32(buf);
1217 if(n > MAXMSGSIZE)
1218 return nil;
1219 pkt = emalloc(n);
1220 PBIT32(pkt, n);
1221 nn = ioreadn(io, fd, pkt+4, n-4);
1222 if(nn != n-4){
1223 free(pkt);
1224 return nil;
1226 /* would do this if we ever got one of these, but we only generate them
1227 if(pkt[4] == Ropenfd){
1228 newfd = iorecvfd(io, fd);
1229 PBIT32(pkt+n-4, newfd);
1232 return pkt;
1235 Msg*
1236 mread9p(Ioproc *io, int fd)
1238 int n, nn;
1239 uchar *pkt;
1240 Msg *m;
1242 if((pkt = read9ppkt(io, fd)) == nil)
1243 return nil;
1245 m = msgnew(0);
1246 m->tpkt = pkt;
1247 n = GBIT32(pkt);
1248 nn = convM2S(pkt, n, &m->tx);
1249 if(nn != n){
1250 fprint(2, "%T read bad packet from %d\n", fd);
1251 free(m->tpkt);
1252 free(m);
1253 return nil;
1255 return m;
1258 int
1259 mwrite9p(Ioproc *io, int fd, uchar *pkt)
1261 int n, nfd;
1263 n = GBIT32(pkt);
1264 if(verbose > 2) fprint(2, "%T write %d %d %.*H\n", fd, n, n, pkt);
1265 if(verbose > 1) fprint(2, "%T before iowrite\n");
1266 if(iowrite(io, fd, pkt, n) != n){
1267 fprint(2, "%T write error: %r\n");
1268 return -1;
1270 if(verbose > 1) fprint(2, "%T after iowrite\n");
1271 if(pkt[4] == Ropenfd){
1272 nfd = GBIT32(pkt+n-4);
1273 if(iosendfd(io, fd, nfd) < 0){
1274 fprint(2, "%T send fd error: %r\n");
1275 return -1;
1278 return 0;
1281 void
1282 restring(uchar *pkt, int pn, char *s)
1284 int n;
1286 if(s < (char*)pkt || s >= (char*)pkt+pn)
1287 return;
1289 n = strlen(s);
1290 memmove(s+1, s, n);
1291 PBIT16((uchar*)s-1, n);
1294 void
1295 repack(Fcall *f, uchar **ppkt)
1297 uint n, nn;
1298 uchar *pkt;
1300 pkt = *ppkt;
1301 n = GBIT32(pkt);
1302 nn = sizeS2M(f);
1303 if(nn > n){
1304 free(pkt);
1305 pkt = emalloc(nn);
1306 *ppkt = pkt;
1308 n = convS2M(f, pkt, nn);
1309 if(n <= BIT16SZ)
1310 sysfatal("convS2M conversion error");
1311 if(n != nn)
1312 sysfatal("convS2M and sizeS2M disagree");
1315 void
1316 rewritehdr(Fcall *f, uchar *pkt)
1318 int i, n;
1320 n = GBIT32(pkt);
1321 PBIT16(pkt+5, f->tag);
1322 switch(f->type){
1323 case Tversion:
1324 case Rversion:
1325 restring(pkt, n, f->version);
1326 break;
1327 case Tauth:
1328 PBIT32(pkt+7, f->afid);
1329 restring(pkt, n, f->uname);
1330 restring(pkt, n, f->aname);
1331 break;
1332 case Tflush:
1333 PBIT16(pkt+7, f->oldtag);
1334 break;
1335 case Tattach:
1336 restring(pkt, n, f->uname);
1337 restring(pkt, n, f->aname);
1338 PBIT32(pkt+7, f->fid);
1339 PBIT32(pkt+11, f->afid);
1340 break;
1341 case Twalk:
1342 PBIT32(pkt+7, f->fid);
1343 PBIT32(pkt+11, f->newfid);
1344 for(i=0; i<f->nwname; i++)
1345 restring(pkt, n, f->wname[i]);
1346 break;
1347 case Tcreate:
1348 restring(pkt, n, f->name);
1349 /* fall through */
1350 case Topen:
1351 case Tclunk:
1352 case Tremove:
1353 case Tstat:
1354 case Twstat:
1355 case Twrite:
1356 PBIT32(pkt+7, f->fid);
1357 break;
1358 case Tread:
1359 PBIT32(pkt+7, f->fid);
1360 PBIT64(pkt+11, f->offset);
1361 break;
1362 case Rerror:
1363 restring(pkt, n, f->ename);
1364 break;
1368 static long
1369 _iolisten(va_list *arg)
1371 char *a, *b;
1373 a = va_arg(*arg, char*);
1374 b = va_arg(*arg, char*);
1375 return listen(a, b);
1378 int
1379 iolisten(Ioproc *io, char *a, char *b)
1381 return iocall(io, _iolisten, a, b);
1384 static long
1385 _ioaccept(va_list *arg)
1387 int fd;
1388 char *dir;
1390 fd = va_arg(*arg, int);
1391 dir = va_arg(*arg, char*);
1392 return accept(fd, dir);
1395 int
1396 ioaccept(Ioproc *io, int fd, char *dir)
1398 return iocall(io, _ioaccept, fd, dir);
1401 int
1402 timefmt(Fmt *fmt)
1404 static char *mon[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1405 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1406 vlong ns;
1407 Tm tm;
1408 ns = nsec();
1409 tm = *localtime(time(0));
1410 return fmtprint(fmt, "%s %2d %02d:%02d:%02d.%03d",
1411 mon[tm.mon], tm.mday, tm.hour, tm.min, tm.sec,
1412 (int)(ns%1000000000)/1000000);