Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <fcall.h>
4 #include <thread.h>
5 #include <errno.h>
7 #define err err9pserve /* Darwin x86 */
9 enum
10 {
11 STACK = 32768,
12 NHASH = 31,
13 MAXMSG = 64, /* per connection */
14 MAXMSGSIZE = 4*1024*1024
15 };
17 typedef struct Hash Hash;
18 typedef struct Fid Fid;
19 typedef struct Msg Msg;
20 typedef struct Conn Conn;
21 typedef struct Queue Queue;
23 struct Hash
24 {
25 Hash *next;
26 uint n;
27 void *v;
28 };
30 struct Fid
31 {
32 int fid;
33 int ref;
34 int cfid;
35 int openfd;
36 int offset;
37 int coffset;
38 int isdir;
39 Fid *next;
40 };
42 struct Msg
43 {
44 Conn *c;
45 int internal;
46 int sync;
47 int ref;
48 int ctag;
49 int tag;
50 int isopenfd;
51 Fcall tx;
52 Fcall rx;
53 Fid *fid;
54 Fid *newfid;
55 Fid *afid;
56 Msg *oldm;
57 Msg *next;
58 uchar *tpkt;
59 uchar *rpkt;
60 };
62 struct Conn
63 {
64 int fd;
65 int fdmode;
66 Fid *fdfid;
67 int nmsg;
68 int nfid;
69 Channel *inc;
70 Channel *internal;
71 int inputstalled;
72 char dir[40];
73 Hash *tag[NHASH];
74 Hash *fid[NHASH];
75 Queue *outq;
76 Queue *inq;
77 Channel *outqdead;
78 };
80 char *xaname;
81 char *addr;
82 int afd;
83 char adir[40];
84 int isunix;
85 Queue *outq;
86 Queue *inq;
87 int verbose = 0;
88 int logging = 0;
89 int msize = 8192;
90 u32int xafid = NOFID;
91 int attached;
92 int versioned;
93 int noauth;
95 void *gethash(Hash**, uint);
96 int puthash(Hash**, uint, void*);
97 int delhash(Hash**, uint, void*);
98 Msg *mread9p(Ioproc*, int);
99 int mwrite9p(Ioproc*, int, uchar*);
100 uchar *read9ppkt(Ioproc*, int);
101 int write9ppkt(int, uchar*);
102 Msg *msgnew(int);
103 void msgput(Msg*);
104 void msgclear(Msg*);
105 Msg *msgget(int);
106 void msgincref(Msg*);
107 Fid *fidnew(int);
108 void fidput(Fid*);
109 void *emalloc(int);
110 void *erealloc(void*, int);
111 Queue *qalloc(void);
112 int sendq(Queue*, void*);
113 void *recvq(Queue*);
114 void connthread(void*);
115 void connoutthread(void*);
116 void listenthread(void*);
117 void outputthread(void*);
118 void inputthread(void*);
119 void rewritehdr(Fcall*, uchar*);
120 void repack(Fcall*, uchar**);
121 int tlisten(char*, char*);
122 int taccept(int, char*);
123 int iolisten(Ioproc*, char*, char*);
124 int ioaccept(Ioproc*, int, char*);
125 int iorecvfd(Ioproc*, int);
126 int iosendfd(Ioproc*, int, int);
127 void mainproc(void*);
128 int ignorepipe(void*, char*);
129 int timefmt(Fmt*);
130 void dorootstat(void);
132 void
133 usage(void)
135 fprint(2, "usage: 9pserve [-lnv] [-A aname afid] [-c addr] [-M msize] address\n");
136 fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
137 threadexitsall("usage");
140 int
141 threadmaybackground(void)
143 return 1;
146 uchar vbuf[128];
147 extern int _threaddebuglevel;
148 void
149 threadmain(int argc, char **argv)
151 char *file, *x, *addr;
152 int fd;
154 rfork(RFNOTEG);
155 x = getenv("verbose9pserve");
156 if(x){
157 verbose = atoi(x);
158 fprint(2, "verbose9pserve %s => %d\n", x, verbose);
160 ARGBEGIN{
161 default:
162 usage();
163 case 'A':
164 attached = 1;
165 xaname = EARGF(usage());
166 xafid = atoi(EARGF(usage()));
167 break;
168 case 'M':
169 versioned = 1;
170 msize = atoi(EARGF(usage()));
171 break;
172 case 'c':
173 addr = netmkaddr(EARGF(usage()), "net", "9fs");
174 if((fd = dial(addr, nil, nil, nil)) < 0)
175 sysfatal("dial %s: %r", addr);
176 dup(fd, 0);
177 dup(fd, 1);
178 if(fd > 1)
179 close(fd);
180 break;
181 case 'n':
182 noauth = 1;
183 break;
184 case 'v':
185 verbose++;
186 break;
187 case 'u':
188 isunix++;
189 break;
190 case 'l':
191 logging++;
192 break;
193 }ARGEND
195 if(attached && !versioned){
196 fprint(2, "-A must be used with -M\n");
197 usage();
200 if(argc != 1)
201 usage();
202 addr = argv[0];
204 fmtinstall('T', timefmt);
206 if((afd = announce(addr, adir)) < 0)
207 sysfatal("announce %s: %r", addr);
208 if(logging){
209 if(strncmp(addr, "unix!", 5) == 0)
210 addr += 5;
211 file = smprint("%s.log", addr);
212 if(file == nil)
213 sysfatal("smprint log: %r");
214 if((fd = create(file, OWRITE, 0666)) < 0)
215 sysfatal("create %s: %r", file);
216 dup(fd, 2);
217 if(fd > 2)
218 close(fd);
220 if(verbose) fprint(2, "%T 9pserve running\n");
221 proccreate(mainproc, nil, STACK);
224 void
225 mainproc(void *v)
227 int n, nn;
228 Fcall f;
229 USED(v);
231 atnotify(ignorepipe, 1);
232 fmtinstall('D', dirfmt);
233 fmtinstall('M', dirmodefmt);
234 fmtinstall('F', fcallfmt);
235 fmtinstall('H', encodefmt);
237 outq = qalloc();
238 inq = qalloc();
240 if(!versioned){
241 f.type = Tversion;
242 f.version = "9P2000";
243 f.msize = msize;
244 f.tag = NOTAG;
245 n = convS2M(&f, vbuf, sizeof vbuf);
246 if(n <= BIT16SZ)
247 sysfatal("convS2M conversion error");
248 if(verbose > 1) fprint(2, "%T * <- %F\n", &f);
249 nn = write(1, vbuf, n);
250 if(n != nn)
251 sysfatal("error writing Tversion: %r\n");
252 n = read9pmsg(0, vbuf, sizeof vbuf);
253 if(n < 0)
254 sysfatal("read9pmsg failure");
255 if(convM2S(vbuf, n, &f) != n)
256 sysfatal("convM2S failure");
257 if(f.msize < msize)
258 msize = f.msize;
259 if(verbose > 1) fprint(2, "%T * -> %F\n", &f);
262 threadcreate(inputthread, nil, STACK);
263 threadcreate(outputthread, nil, STACK);
265 /* if(rootfid) */
266 /* dorootstat(); */
268 threadcreate(listenthread, nil, STACK);
269 threadexits(0);
272 int
273 ignorepipe(void *v, char *s)
275 USED(v);
276 if(strcmp(s, "sys: write on closed pipe") == 0)
277 return 1;
278 if(strcmp(s, "sys: tstp") == 0)
279 return 1;
280 if(strcmp(s, "sys: window size change") == 0)
281 return 1;
282 fprint(2, "9pserve %s: %T note: %s\n", addr, s);
283 return 0;
286 void
287 listenthread(void *arg)
289 Conn *c;
290 Ioproc *io;
292 io = ioproc();
293 USED(arg);
294 threadsetname("listen %s", adir);
295 for(;;){
296 c = emalloc(sizeof(Conn));
297 c->fd = iolisten(io, adir, c->dir);
298 if(c->fd < 0){
299 if(verbose) fprint(2, "%T listen: %r\n");
300 close(afd);
301 free(c);
302 return;
304 c->inc = chancreate(sizeof(void*), 0);
305 c->internal = chancreate(sizeof(void*), 0);
306 c->inq = qalloc();
307 c->outq = qalloc();
308 c->outqdead = chancreate(sizeof(void*), 0);
309 if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
310 threadcreate(connthread, c, STACK);
314 void
315 send9pmsg(Msg *m)
317 int n, nn;
319 n = sizeS2M(&m->rx);
320 m->rpkt = emalloc(n);
321 nn = convS2M(&m->rx, m->rpkt, n);
322 if(nn <= BIT16SZ)
323 sysfatal("convS2M conversion error");
324 if(nn != n)
325 sysfatal("sizeS2M and convS2M disagree");
326 sendq(m->c->outq, m);
329 void
330 sendomsg(Msg *m)
332 int n, nn;
334 n = sizeS2M(&m->tx);
335 m->tpkt = emalloc(n);
336 nn = convS2M(&m->tx, m->tpkt, n);
337 if(nn <= BIT16SZ)
338 sysfatal("convS2M conversion error");
339 if(nn != n)
340 sysfatal("sizeS2M and convS2M disagree");
341 sendq(outq, m);
344 void
345 err(Msg *m, char *ename)
347 m->rx.type = Rerror;
348 m->rx.ename = ename;
349 m->rx.tag = m->tx.tag;
350 send9pmsg(m);
353 char*
354 estrdup(char *s)
356 char *t;
358 t = emalloc(strlen(s)+1);
359 strcpy(t, s);
360 return t;
363 void
364 connthread(void *arg)
366 int i, fd;
367 Conn *c;
368 Hash *h, *hnext;
369 Msg *m, *om, *mm, sync;
370 Fid *f;
371 Ioproc *io;
373 c = arg;
374 threadsetname("conn %s", c->dir);
375 io = ioproc();
376 fd = ioaccept(io, c->fd, c->dir);
377 if(fd < 0){
378 if(verbose) fprint(2, "%T accept %s: %r\n", c->dir);
379 goto out;
381 close(c->fd);
382 c->fd = fd;
383 threadcreate(connoutthread, c, STACK);
384 while((m = mread9p(io, c->fd)) != nil){
385 if(verbose > 1) fprint(2, "%T fd#%d -> %F\n", c->fd, &m->tx);
386 m->c = c;
387 m->ctag = m->tx.tag;
388 c->nmsg++;
389 if(verbose > 1) fprint(2, "%T fd#%d: new msg %p\n", c->fd, m);
390 if(puthash(c->tag, m->tx.tag, m) < 0){
391 err(m, "duplicate tag");
392 continue;
394 msgincref(m);
395 switch(m->tx.type){
396 case Tversion:
397 m->rx.tag = m->tx.tag;
398 m->rx.msize = m->tx.msize;
399 if(m->rx.msize > msize)
400 m->rx.msize = msize;
401 m->rx.version = "9P2000";
402 m->rx.type = Rversion;
403 send9pmsg(m);
404 continue;
405 case Tflush:
406 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
407 m->rx.tag = m->tx.tag;
408 m->rx.type = Rflush;
409 send9pmsg(m);
410 continue;
412 msgincref(m->oldm);
413 break;
414 case Tattach:
415 m->afid = nil;
416 if(m->tx.afid != NOFID
417 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){
418 err(m, "unknown fid");
419 continue;
421 if(m->afid)
422 m->afid->ref++;
423 m->fid = fidnew(m->tx.fid);
424 if(puthash(c->fid, m->tx.fid, m->fid) < 0){
425 err(m, "duplicate fid");
426 continue;
428 m->fid->ref++;
429 if(attached && m->afid==nil){
430 if(m->tx.aname[0] && strcmp(xaname, m->tx.aname) != 0){
431 err(m, "invalid attach name");
432 continue;
434 m->tx.afid = xafid;
435 m->tx.aname = xaname;
436 m->tx.uname = getuser(); /* what srv.c used */
437 repack(&m->tx, &m->tpkt);
439 break;
440 case Twalk:
441 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
442 err(m, "unknown fid");
443 continue;
445 m->fid->ref++;
446 if(m->tx.newfid == m->tx.fid){
447 m->fid->ref++;
448 m->newfid = m->fid;
449 }else{
450 m->newfid = fidnew(m->tx.newfid);
451 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
452 err(m, "duplicate fid");
453 continue;
455 m->newfid->ref++;
457 break;
458 case Tauth:
459 if(attached){
460 err(m, "authentication not required");
461 continue;
463 if(noauth){
464 err(m, "authentication rejected");
465 continue;
467 m->afid = fidnew(m->tx.afid);
468 if(puthash(c->fid, m->tx.afid, m->afid) < 0){
469 err(m, "duplicate fid");
470 continue;
472 m->afid->ref++;
473 break;
474 case Tcreate:
475 if(m->tx.perm&(DMSYMLINK|DMDEVICE|DMNAMEDPIPE|DMSOCKET)){
476 err(m, "unsupported file type");
477 continue;
479 goto caseTopen;
480 case Topenfd:
481 if(m->tx.mode&~(OTRUNC|3)){
482 err(m, "bad openfd mode");
483 continue;
485 m->isopenfd = 1;
486 m->tx.type = Topen;
487 m->tpkt[4] = Topen;
488 /* fall through */
489 caseTopen:
490 case Topen:
491 case Tclunk:
492 case Tread:
493 case Twrite:
494 case Tremove:
495 case Tstat:
496 case Twstat:
497 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
498 err(m, "unknown fid");
499 continue;
501 m->fid->ref++;
502 break;
505 /* have everything - translate and send */
506 m->c = c;
507 m->ctag = m->tx.tag;
508 m->tx.tag = m->tag;
509 if(m->fid)
510 m->tx.fid = m->fid->fid;
511 if(m->newfid)
512 m->tx.newfid = m->newfid->fid;
513 if(m->afid)
514 m->tx.afid = m->afid->fid;
515 if(m->oldm)
516 m->tx.oldtag = m->oldm->tag;
517 /* reference passes to outq */
518 sendq(outq, m);
519 while(c->nmsg >= MAXMSG){
520 c->inputstalled = 1;
521 recvp(c->inc);
525 if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
527 /* flush all outstanding messages */
528 for(i=0; i<NHASH; i++){
529 while((h = c->tag[i]) != nil){
530 om = h->v;
531 msgincref(om); /* for us */
532 m = msgnew(0);
533 m->internal = 1;
534 m->c = c;
535 c->nmsg++;
536 m->tx.type = Tflush;
537 m->tx.tag = m->tag;
538 m->tx.oldtag = om->tag;
539 m->oldm = om;
540 msgincref(om);
541 msgincref(m); /* for outq */
542 sendomsg(m);
543 mm = recvp(c->internal);
544 assert(mm == m);
545 msgput(m); /* got from recvp */
546 msgput(m); /* got from msgnew */
547 if(delhash(c->tag, om->ctag, om) == 0)
548 msgput(om); /* got from hash table */
549 msgput(om); /* got from msgincref */
553 /*
554 * outputthread has written all its messages
555 * to the remote connection (because we've gotten all the replies!),
556 * but it might not have gotten a chance to msgput
557 * the very last one. sync up to make sure.
558 */
559 memset(&sync, 0, sizeof sync);
560 sync.sync = 1;
561 sync.c = c;
562 sendq(outq, &sync);
563 recvp(c->outqdead);
565 /* everything is quiet; can close the local output queue. */
566 sendq(c->outq, nil);
567 recvp(c->outqdead);
569 /* should be no messages left anywhere. */
570 assert(c->nmsg == 0);
572 /* clunk all outstanding fids */
573 for(i=0; i<NHASH; i++){
574 for(h=c->fid[i]; h; h=hnext){
575 f = h->v;
576 m = msgnew(0);
577 m->internal = 1;
578 m->c = c;
579 c->nmsg++;
580 m->tx.type = Tclunk;
581 m->tx.tag = m->tag;
582 m->tx.fid = f->fid;
583 m->fid = f;
584 f->ref++;
585 msgincref(m);
586 sendomsg(m);
587 mm = recvp(c->internal);
588 assert(mm == m);
589 msgclear(m);
590 msgput(m); /* got from recvp */
591 msgput(m); /* got from msgnew */
592 fidput(f); /* got from hash table */
593 hnext = h->next;
594 free(h);
598 out:
599 closeioproc(io);
600 assert(c->nmsg == 0);
601 assert(c->nfid == 0);
602 close(c->fd);
603 chanfree(c->internal);
604 c->internal = 0;
605 chanfree(c->inc);
606 c->inc = 0;
607 free(c->inq);
608 c->inq = 0;
609 free(c);
612 static void
613 openfdthread(void *v)
615 Conn *c;
616 Fid *fid;
617 Msg *m;
618 int n;
619 vlong tot;
620 Ioproc *io;
621 char buf[1024];
623 c = v;
624 fid = c->fdfid;
625 io = ioproc();
626 threadsetname("openfd %s", c->fdfid);
627 tot = 0;
628 m = nil;
629 if(c->fdmode == OREAD){
630 for(;;){
631 if(verbose) fprint(2, "%T tread...");
632 m = msgnew(0);
633 m->internal = 1;
634 m->c = c;
635 m->tx.type = Tread;
636 m->tx.count = msize - IOHDRSZ;
637 m->tx.fid = fid->fid;
638 m->tx.tag = m->tag;
639 m->tx.offset = tot;
640 m->fid = fid;
641 fid->ref++;
642 msgincref(m);
643 sendomsg(m);
644 recvp(c->internal);
645 if(m->rx.type == Rerror){
646 /* fprint(2, "%T read error: %s\n", m->rx.ename); */
647 break;
649 if(m->rx.count == 0)
650 break;
651 tot += m->rx.count;
652 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
653 /* fprint(2, "%T pipe write error: %r\n"); */
654 break;
656 msgput(m);
657 msgput(m);
658 m = nil;
660 }else{
661 for(;;){
662 if(verbose) fprint(2, "%T twrite...");
663 n = sizeof buf;
664 if(n > msize)
665 n = msize;
666 if((n=ioread(io, c->fd, buf, n)) <= 0){
667 if(n < 0)
668 fprint(2, "%T pipe read error: %r\n");
669 break;
671 m = msgnew(0);
672 m->internal = 1;
673 m->c = c;
674 m->tx.type = Twrite;
675 m->tx.fid = fid->fid;
676 m->tx.data = buf;
677 m->tx.count = n;
678 m->tx.tag = m->tag;
679 m->tx.offset = tot;
680 m->fid = fid;
681 fid->ref++;
682 msgincref(m);
683 sendomsg(m);
684 recvp(c->internal);
685 if(m->rx.type == Rerror){
686 /* fprint(2, "%T write error: %s\n", m->rx.ename); */
688 tot += n;
689 msgput(m);
690 msgput(m);
691 m = nil;
694 if(verbose) fprint(2, "%T eof on %d fid %d\n", c->fd, fid->fid);
695 close(c->fd);
696 closeioproc(io);
697 if(m){
698 msgput(m);
699 msgput(m);
701 if(verbose) fprint(2, "%T eof on %d fid %d ref %d\n", c->fd, fid->fid, fid->ref);
702 if(--fid->openfd == 0){
703 m = msgnew(0);
704 m->internal = 1;
705 m->c = c;
706 m->tx.type = Tclunk;
707 m->tx.tag = m->tag;
708 m->tx.fid = fid->fid;
709 m->fid = fid;
710 fid->ref++;
711 msgincref(m);
712 sendomsg(m);
713 recvp(c->internal);
714 msgput(m);
715 msgput(m);
717 fidput(fid);
718 c->fdfid = nil;
719 chanfree(c->internal);
720 c->internal = 0;
721 free(c);
724 int
725 xopenfd(Msg *m)
727 char errs[ERRMAX];
728 int n, p[2];
729 Conn *nc;
731 if(pipe(p) < 0){
732 rerrstr(errs, sizeof errs);
733 err(m, errs);
734 /* XXX return here? */
736 if(verbose) fprint(2, "%T xopen pipe %d %d...", p[0], p[1]);
738 /* now we're committed. */
740 /* a new connection for this fid */
741 nc = emalloc(sizeof(Conn));
742 nc->internal = chancreate(sizeof(void*), 0);
744 /* a ref for us */
745 nc->fdfid = m->fid;
746 m->fid->ref++;
747 nc->fdfid->openfd++;
748 nc->fdmode = m->tx.mode;
749 nc->fd = p[0];
751 /* a thread to tend the pipe */
752 threadcreate(openfdthread, nc, STACK);
754 /* if mode is ORDWR, that openfdthread will write; start a reader */
755 if((m->tx.mode&3) == ORDWR){
756 nc = emalloc(sizeof(Conn));
757 nc->internal = chancreate(sizeof(void*), 0);
758 nc->fdfid = m->fid;
759 m->fid->ref++;
760 nc->fdfid->openfd++;
761 nc->fdmode = OREAD;
762 nc->fd = dup(p[0], -1);
763 threadcreate(openfdthread, nc, STACK);
766 /* steal fid from other connection */
767 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
768 fidput(m->fid);
770 /* rewrite as Ropenfd */
771 m->rx.type = Ropenfd;
772 n = GBIT32(m->rpkt);
773 m->rpkt = erealloc(m->rpkt, n+4);
774 PBIT32(m->rpkt+n, p[1]);
775 n += 4;
776 PBIT32(m->rpkt, n);
777 m->rpkt[4] = Ropenfd;
778 m->rx.unixfd = p[1];
779 return 0;
782 void
783 connoutthread(void *arg)
785 int err;
786 Conn *c;
787 Msg *m, *om;
788 Ioproc *io;
790 c = arg;
791 io = ioproc();
792 threadsetname("connout %s", c->dir);
793 while((m = recvq(c->outq)) != nil){
794 err = m->tx.type+1 != m->rx.type;
795 if(!err && m->isopenfd)
796 if(xopenfd(m) < 0)
797 continue;
798 switch(m->tx.type){
799 case Tflush:
800 om = m->oldm;
801 if(om)
802 if(delhash(om->c->tag, om->ctag, om) == 0)
803 msgput(om);
804 break;
805 case Tclunk:
806 case Tremove:
807 if(m->fid)
808 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
809 fidput(m->fid);
810 break;
811 case Tauth:
812 if(err && m->afid){
813 if(verbose) fprint(2, "%T auth error\n");
814 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
815 fidput(m->afid);
817 break;
818 case Tattach:
819 if(err && m->fid)
820 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
821 fidput(m->fid);
822 break;
823 case Twalk:
824 if(err || m->rx.nwqid < m->tx.nwname)
825 if(m->tx.fid != m->tx.newfid && m->newfid)
826 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
827 fidput(m->newfid);
828 break;
829 case Tread:
830 break;
831 case Tstat:
832 break;
833 case Topen:
834 case Tcreate:
835 m->fid->isdir = (m->rx.qid.type & QTDIR);
836 break;
838 if(delhash(m->c->tag, m->ctag, m) == 0)
839 msgput(m);
840 if(verbose > 1) fprint(2, "%T fd#%d <- %F\n", c->fd, &m->rx);
841 rewritehdr(&m->rx, m->rpkt);
842 if(mwrite9p(io, c->fd, m->rpkt) < 0)
843 if(verbose) fprint(2, "%T write error: %r\n");
844 msgput(m);
845 if(c->inputstalled && c->nmsg < MAXMSG)
846 nbsendp(c->inc, 0);
848 closeioproc(io);
849 free(c->outq);
850 c->outq = nil;
851 sendp(c->outqdead, nil);
854 void
855 outputthread(void *arg)
857 Msg *m;
858 Ioproc *io;
860 USED(arg);
861 io = ioproc();
862 threadsetname("output");
863 while((m = recvq(outq)) != nil){
864 if(m->sync){
865 sendp(m->c->outqdead, nil);
866 continue;
868 if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
869 rewritehdr(&m->tx, m->tpkt);
870 if(mwrite9p(io, 1, m->tpkt) < 0)
871 sysfatal("output error: %r");
872 msgput(m);
874 closeioproc(io);
875 fprint(2, "%T output eof\n");
876 threadexitsall(0);
879 void
880 inputthread(void *arg)
882 uchar *pkt;
883 int n, nn, tag;
884 Msg *m;
885 Ioproc *io;
887 threadsetname("input");
888 if(verbose) fprint(2, "%T input thread\n");
889 io = ioproc();
890 USED(arg);
891 while((pkt = read9ppkt(io, 0)) != nil){
892 n = GBIT32(pkt);
893 if(n < 7){
894 fprint(2, "%T short 9P packet from server\n");
895 free(pkt);
896 continue;
898 if(verbose > 2) fprint(2, "%T read %.*H\n", n, pkt);
899 tag = GBIT16(pkt+5);
900 if((m = msgget(tag)) == nil){
901 fprint(2, "%T unexpected 9P response tag %d\n", tag);
902 free(pkt);
903 continue;
905 if((nn = convM2S(pkt, n, &m->rx)) != n){
906 fprint(2, "%T bad packet - convM2S %d but %d\n", nn, n);
907 free(pkt);
908 msgput(m);
909 continue;
911 if(verbose > 1) fprint(2, "%T * -> %F%s\n", &m->rx,
912 m->internal ? " (internal)" : "");
913 m->rpkt = pkt;
914 m->rx.tag = m->ctag;
915 if(m->internal)
916 sendp(m->c->internal, m);
917 else if(m->c->outq)
918 sendq(m->c->outq, m);
919 else
920 msgput(m);
922 closeioproc(io);
923 /*fprint(2, "%T input eof\n"); */
924 threadexitsall(0);
927 void*
928 gethash(Hash **ht, uint n)
930 Hash *h;
932 for(h=ht[n%NHASH]; h; h=h->next)
933 if(h->n == n)
934 return h->v;
935 return nil;
938 int
939 delhash(Hash **ht, uint n, void *v)
941 Hash *h, **l;
943 for(l=&ht[n%NHASH]; h=*l; l=&h->next)
944 if(h->n == n){
945 if(h->v != v){
946 if(verbose) fprint(2, "%T delhash %d got %p want %p\n", n, h->v, v);
947 return -1;
949 *l = h->next;
950 free(h);
951 return 0;
953 return -1;
956 int
957 puthash(Hash **ht, uint n, void *v)
959 Hash *h;
961 if(gethash(ht, n))
962 return -1;
963 h = emalloc(sizeof(Hash));
964 h->next = ht[n%NHASH];
965 h->n = n;
966 h->v = v;
967 ht[n%NHASH] = h;
968 return 0;
971 Fid **fidtab;
972 int nfidtab;
973 Fid *freefid;
975 Fid*
976 fidnew(int cfid)
978 Fid *f;
980 if(freefid == nil){
981 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
982 if(nfidtab == xafid){
983 fidtab[nfidtab++] = nil;
984 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
986 fidtab[nfidtab] = emalloc(sizeof(Fid));
987 freefid = fidtab[nfidtab];
988 freefid->fid = nfidtab++;
990 f = freefid;
991 freefid = f->next;
992 f->cfid = cfid;
993 f->ref = 1;
994 f->offset = 0;
995 f->coffset = 0;
996 f->isdir = -1;
997 return f;
1000 void
1001 fidput(Fid *f)
1003 if(f == nil)
1004 return;
1005 assert(f->ref > 0);
1006 if(--f->ref > 0)
1007 return;
1008 f->next = freefid;
1009 f->cfid = -1;
1010 freefid = f;
1013 Msg **msgtab;
1014 int nmsgtab;
1015 int nmsg;
1016 Msg *freemsg;
1018 void
1019 msgincref(Msg *m)
1021 if(verbose > 1) fprint(2, "%T msgincref @0x%lux %p tag %d/%d ref %d=>%d\n",
1022 getcallerpc(&m), m, m->tag, m->ctag, m->ref, m->ref+1);
1023 m->ref++;
1026 Msg*
1027 msgnew(int x)
1029 Msg *m;
1031 if(freemsg == nil){
1032 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
1033 msgtab[nmsgtab] = emalloc(sizeof(Msg));
1034 freemsg = msgtab[nmsgtab];
1035 freemsg->tag = nmsgtab++;
1037 m = freemsg;
1038 freemsg = m->next;
1039 m->ref = 1;
1040 if(verbose > 1) fprint(2, "%T msgnew @0x%lux %p tag %d ref %d\n",
1041 getcallerpc(&x), m, m->tag, m->ref);
1042 nmsg++;
1043 return m;
1047 * Clear data associated with connections, so that
1048 * if all msgs have been msgcleared, the connection
1049 * can be freed. Note that this does *not* free the tpkt
1050 * and rpkt; they are freed in msgput with the msg itself.
1051 * The io write thread might still be holding a ref to msg
1052 * even once the connection has finished with it.
1054 void
1055 msgclear(Msg *m)
1057 if(m->c){
1058 m->c->nmsg--;
1059 m->c = nil;
1061 if(m->oldm){
1062 msgput(m->oldm);
1063 m->oldm = nil;
1065 if(m->fid){
1066 fidput(m->fid);
1067 m->fid = nil;
1069 if(m->afid){
1070 fidput(m->afid);
1071 m->afid = nil;
1073 if(m->newfid){
1074 fidput(m->newfid);
1075 m->newfid = nil;
1077 if(m->rx.type == Ropenfd && m->rx.unixfd >= 0){
1078 close(m->rx.unixfd);
1079 m->rx.unixfd = -1;
1083 void
1084 msgput(Msg *m)
1086 if(m == nil)
1087 return;
1089 if(verbose > 1) fprint(2, "%T msgput 0x%lux %p tag %d/%d ref %d\n",
1090 getcallerpc(&m), m, m->tag, m->ctag, m->ref);
1091 assert(m->ref > 0);
1092 if(--m->ref > 0)
1093 return;
1094 nmsg--;
1095 msgclear(m);
1096 if(m->tpkt){
1097 free(m->tpkt);
1098 m->tpkt = nil;
1100 if(m->rpkt){
1101 free(m->rpkt);
1102 m->rpkt = nil;
1104 m->isopenfd = 0;
1105 m->internal = 0;
1106 m->next = freemsg;
1107 freemsg = m;
1110 Msg*
1111 msgget(int n)
1113 Msg *m;
1115 if(n < 0 || n >= nmsgtab)
1116 return nil;
1117 m = msgtab[n];
1118 if(m->ref == 0)
1119 return nil;
1120 if(verbose) fprint(2, "%T msgget %d = %p\n", n, m);
1121 msgincref(m);
1122 return m;
1126 void*
1127 emalloc(int n)
1129 void *v;
1131 v = mallocz(n, 1);
1132 if(v == nil){
1133 abort();
1134 sysfatal("out of memory allocating %d", n);
1136 return v;
1139 void*
1140 erealloc(void *v, int n)
1142 v = realloc(v, n);
1143 if(v == nil){
1144 abort();
1145 sysfatal("out of memory reallocating %d", n);
1147 return v;
1150 typedef struct Qel Qel;
1151 struct Qel
1153 Qel *next;
1154 void *p;
1157 struct Queue
1159 QLock lk;
1160 Rendez r;
1161 Qel *head;
1162 Qel *tail;
1165 Queue*
1166 qalloc(void)
1168 Queue *q;
1170 q = mallocz(sizeof(Queue), 1);
1171 if(q == nil)
1172 return nil;
1173 q->r.l = &q->lk;
1174 return q;
1177 int
1178 sendq(Queue *q, void *p)
1180 Qel *e;
1182 e = emalloc(sizeof(Qel));
1183 qlock(&q->lk);
1184 e->p = p;
1185 e->next = nil;
1186 if(q->head == nil)
1187 q->head = e;
1188 else
1189 q->tail->next = e;
1190 q->tail = e;
1191 rwakeup(&q->r);
1192 qunlock(&q->lk);
1193 return 0;
1196 void*
1197 recvq(Queue *q)
1199 void *p;
1200 Qel *e;
1202 qlock(&q->lk);
1203 while(q->head == nil)
1204 rsleep(&q->r);
1205 e = q->head;
1206 q->head = e->next;
1207 qunlock(&q->lk);
1208 p = e->p;
1209 free(e);
1210 return p;
1213 uchar*
1214 read9ppkt(Ioproc *io, int fd)
1216 uchar buf[4], *pkt;
1217 int n, nn;
1219 n = ioreadn(io, fd, buf, 4);
1220 if(n != 4)
1221 return nil;
1222 n = GBIT32(buf);
1223 if(n > MAXMSGSIZE)
1224 return nil;
1225 pkt = emalloc(n);
1226 PBIT32(pkt, n);
1227 nn = ioreadn(io, fd, pkt+4, n-4);
1228 if(nn != n-4){
1229 free(pkt);
1230 return nil;
1232 /* would do this if we ever got one of these, but we only generate them
1233 if(pkt[4] == Ropenfd){
1234 newfd = iorecvfd(io, fd);
1235 PBIT32(pkt+n-4, newfd);
1238 return pkt;
1241 Msg*
1242 mread9p(Ioproc *io, int fd)
1244 int n, nn;
1245 uchar *pkt;
1246 Msg *m;
1248 if((pkt = read9ppkt(io, fd)) == nil)
1249 return nil;
1251 m = msgnew(0);
1252 m->tpkt = pkt;
1253 n = GBIT32(pkt);
1254 nn = convM2S(pkt, n, &m->tx);
1255 if(nn != n){
1256 fprint(2, "%T read bad packet from %d\n", fd);
1257 free(m->tpkt);
1258 free(m);
1259 return nil;
1261 return m;
1264 int
1265 mwrite9p(Ioproc *io, int fd, uchar *pkt)
1267 int n, nfd;
1269 n = GBIT32(pkt);
1270 if(verbose > 2) fprint(2, "%T write %d %d %.*H\n", fd, n, n, pkt);
1271 if(verbose > 1) fprint(2, "%T before iowrite\n");
1272 if(iowrite(io, fd, pkt, n) != n){
1273 fprint(2, "%T write error: %r\n");
1274 return -1;
1276 if(verbose > 1) fprint(2, "%T after iowrite\n");
1277 if(pkt[4] == Ropenfd){
1278 nfd = GBIT32(pkt+n-4);
1279 if(iosendfd(io, fd, nfd) < 0){
1280 fprint(2, "%T send fd error: %r\n");
1281 return -1;
1284 return 0;
1287 void
1288 restring(uchar *pkt, int pn, char *s)
1290 int n;
1292 if(s < (char*)pkt || s >= (char*)pkt+pn)
1293 return;
1295 n = strlen(s);
1296 memmove(s+1, s, n);
1297 PBIT16((uchar*)s-1, n);
1300 void
1301 repack(Fcall *f, uchar **ppkt)
1303 uint n, nn;
1304 uchar *pkt;
1306 pkt = *ppkt;
1307 n = GBIT32(pkt);
1308 nn = sizeS2M(f);
1309 if(nn > n){
1310 free(pkt);
1311 pkt = emalloc(nn);
1312 *ppkt = pkt;
1314 n = convS2M(f, pkt, nn);
1315 if(n <= BIT16SZ)
1316 sysfatal("convS2M conversion error");
1317 if(n != nn)
1318 sysfatal("convS2M and sizeS2M disagree");
1321 void
1322 rewritehdr(Fcall *f, uchar *pkt)
1324 int i, n;
1326 n = GBIT32(pkt);
1327 PBIT16(pkt+5, f->tag);
1328 switch(f->type){
1329 case Tversion:
1330 case Rversion:
1331 restring(pkt, n, f->version);
1332 break;
1333 case Tauth:
1334 PBIT32(pkt+7, f->afid);
1335 restring(pkt, n, f->uname);
1336 restring(pkt, n, f->aname);
1337 break;
1338 case Tflush:
1339 PBIT16(pkt+7, f->oldtag);
1340 break;
1341 case Tattach:
1342 restring(pkt, n, f->uname);
1343 restring(pkt, n, f->aname);
1344 PBIT32(pkt+7, f->fid);
1345 PBIT32(pkt+11, f->afid);
1346 break;
1347 case Twalk:
1348 PBIT32(pkt+7, f->fid);
1349 PBIT32(pkt+11, f->newfid);
1350 for(i=0; i<f->nwname; i++)
1351 restring(pkt, n, f->wname[i]);
1352 break;
1353 case Tcreate:
1354 restring(pkt, n, f->name);
1355 /* fall through */
1356 case Topen:
1357 case Tclunk:
1358 case Tremove:
1359 case Tstat:
1360 case Twstat:
1361 case Twrite:
1362 PBIT32(pkt+7, f->fid);
1363 break;
1364 case Tread:
1365 PBIT32(pkt+7, f->fid);
1366 PBIT64(pkt+11, f->offset);
1367 break;
1368 case Rerror:
1369 restring(pkt, n, f->ename);
1370 break;
1374 static long
1375 _iolisten(va_list *arg)
1377 char *a, *b;
1379 a = va_arg(*arg, char*);
1380 b = va_arg(*arg, char*);
1381 return listen(a, b);
1384 int
1385 iolisten(Ioproc *io, char *a, char *b)
1387 return iocall(io, _iolisten, a, b);
1390 static long
1391 _ioaccept(va_list *arg)
1393 int fd;
1394 char *dir;
1396 fd = va_arg(*arg, int);
1397 dir = va_arg(*arg, char*);
1398 return accept(fd, dir);
1401 int
1402 ioaccept(Ioproc *io, int fd, char *dir)
1404 return iocall(io, _ioaccept, fd, dir);
1407 int
1408 timefmt(Fmt *fmt)
1410 static char *mon[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1411 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1412 vlong ns;
1413 Tm tm;
1414 ns = nsec();
1415 tm = *localtime(time(0));
1416 return fmtprint(fmt, "%s %2d %02d:%02d:%02d.%03d",
1417 mon[tm.mon], tm.mday, tm.hour, tm.min, tm.sec,
1418 (int)(ns%1000000000)/1000000);