Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <fcall.h>
4 #include <thread.h>
5 #include <errno.h>
7 #define err err9pserve /* Darwin x86 */
9 enum
10 {
11 STACK = 32768,
12 NHASH = 31,
13 MAXMSG = 64, /* per connection */
14 MAXMSGSIZE = 4*1024*1024
15 };
17 typedef struct Hash Hash;
18 typedef struct Fid Fid;
19 typedef struct Msg Msg;
20 typedef struct Conn Conn;
21 typedef struct Queue Queue;
23 struct Hash
24 {
25 Hash *next;
26 uint n;
27 void *v;
28 };
30 struct Fid
31 {
32 int fid;
33 int ref;
34 int cfid;
35 int openfd;
36 int offset;
37 int coffset;
38 int isdir;
39 Fid *next;
40 };
42 struct Msg
43 {
44 Conn *c;
45 int internal;
46 int sync;
47 int ref;
48 int ctag;
49 int tag;
50 int isopenfd;
51 Fcall tx;
52 Fcall rx;
53 Fid *fid;
54 Fid *newfid;
55 Fid *afid;
56 Msg *oldm;
57 Msg *next;
58 uchar *tpkt;
59 uchar *rpkt;
60 };
62 struct Conn
63 {
64 int fd;
65 int fdmode;
66 Fid *fdfid;
67 int nmsg;
68 int nfid;
69 Channel *inc;
70 Channel *internal;
71 int inputstalled;
72 char dir[40];
73 Hash *tag[NHASH];
74 Hash *fid[NHASH];
75 Queue *outq;
76 Queue *inq;
77 Channel *outqdead;
78 int dotu;
79 };
81 char *xaname;
82 char *addr;
83 int afd;
84 char adir[40];
85 int isunix;
86 Queue *outq;
87 Queue *inq;
88 int verbose = 0;
89 int logging = 0;
90 int msize = 8192;
91 u32int xafid = NOFID;
92 int attached;
93 int versioned;
94 int dotu;
95 int noauth;
97 void *gethash(Hash**, uint);
98 int puthash(Hash**, uint, void*);
99 int delhash(Hash**, uint, void*);
100 Msg *mread9p(Ioproc*, int, int);
101 int mwrite9p(Ioproc*, int, uchar*);
102 uchar *read9ppkt(Ioproc*, int);
103 int write9ppkt(int, uchar*);
104 Msg *msgnew(int);
105 void msgput(Msg*);
106 void msgclear(Msg*);
107 Msg *msgget(int);
108 void msgincref(Msg*);
109 Fid *fidnew(int);
110 void fidput(Fid*);
111 void *emalloc(int);
112 void *erealloc(void*, int);
113 Queue *qalloc(void);
114 int sendq(Queue*, void*);
115 void *recvq(Queue*);
116 void connthread(void*);
117 void connoutthread(void*);
118 void listenthread(void*);
119 void outputthread(void*);
120 void inputthread(void*);
121 void rewritehdr(Fcall*, uchar*);
122 void repack(Fcall*, uchar**, int);
123 int tlisten(char*, char*);
124 int taccept(int, char*);
125 int iolisten(Ioproc*, char*, char*);
126 int ioaccept(Ioproc*, int, char*);
127 int iorecvfd(Ioproc*, int);
128 int iosendfd(Ioproc*, int, int);
129 void mainproc(void*);
130 int ignorepipe(void*, char*);
131 int timefmt(Fmt*);
132 void dorootstat(void);
133 int stripudirread(Msg*);
134 int cvtustat(Fcall*, uchar**, int);
136 void
137 usage(void)
139 fprint(2, "usage: 9pserve [-lnv] [-A aname afid] [-c addr] [-M msize] address\n");
140 fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
141 threadexitsall("usage");
144 uchar vbuf[128];
145 extern int _threaddebuglevel;
146 void
147 threadmain(int argc, char **argv)
149 char *file, *x, *addr;
150 int fd;
152 x = getenv("verbose9pserve");
153 if(x){
154 verbose = atoi(x);
155 fprint(2, "verbose9pserve %s => %d\n", x, verbose);
157 ARGBEGIN{
158 default:
159 usage();
160 case 'A':
161 attached = 1;
162 xaname = EARGF(usage());
163 xafid = atoi(EARGF(usage()));
164 break;
165 case 'M':
166 versioned = 1;
167 msize = atoi(EARGF(usage()));
168 break;
169 case 'c':
170 addr = netmkaddr(EARGF(usage()), "net", "9fs");
171 if((fd = dial(addr, nil, nil, nil)) < 0)
172 sysfatal("dial %s: %r", addr);
173 dup(fd, 0);
174 dup(fd, 1);
175 if(fd > 1)
176 close(fd);
177 break;
178 case 'n':
179 noauth = 1;
180 break;
181 case 'v':
182 verbose++;
183 break;
184 case 'u':
185 isunix++;
186 break;
187 case 'l':
188 logging++;
189 break;
190 }ARGEND
192 if(attached && !versioned){
193 fprint(2, "-A must be used with -M\n");
194 usage();
197 if(argc != 1)
198 usage();
199 addr = argv[0];
201 fmtinstall('T', timefmt);
203 if((afd = announce(addr, adir)) < 0)
204 sysfatal("announce %s: %r", addr);
205 if(logging){
206 if(strncmp(addr, "unix!", 5) == 0)
207 addr += 5;
208 file = smprint("%s.log", addr);
209 if(file == nil)
210 sysfatal("smprint log: %r");
211 if((fd = create(file, OWRITE, 0666)) < 0)
212 sysfatal("create %s: %r", file);
213 dup(fd, 2);
214 if(fd > 2)
215 close(fd);
217 if(verbose) fprint(2, "%T 9pserve running\n");
218 proccreate(mainproc, nil, STACK);
221 void
222 mainproc(void *v)
224 int n, nn;
225 Fcall f;
226 USED(v);
228 atnotify(ignorepipe, 1);
229 fmtinstall('D', dirfmt);
230 fmtinstall('M', dirmodefmt);
231 fmtinstall('F', fcallfmt);
232 fmtinstall('H', encodefmt);
234 outq = qalloc();
235 inq = qalloc();
237 if(!versioned){
238 f.type = Tversion;
239 f.version = "9P2000.u";
240 f.msize = msize;
241 f.tag = NOTAG;
242 n = convS2M(&f, vbuf, sizeof vbuf);
243 if(n <= BIT16SZ)
244 sysfatal("convS2M conversion error");
245 if(verbose > 1) fprint(2, "%T * <- %F\n", &f);
246 nn = write(1, vbuf, n);
247 if(n != nn)
248 sysfatal("error writing Tversion: %r\n");
249 n = read9pmsg(0, vbuf, sizeof vbuf);
250 if(n < 0)
251 sysfatal("read9pmsg failure");
252 if(convM2S(vbuf, n, &f) != n)
253 sysfatal("convM2S failure");
254 if(f.msize < msize)
255 msize = f.msize;
256 if(verbose > 1) fprint(2, "%T * -> %F\n", &f);
257 dotu = strncmp(f.version, "9P2000.u", 8) == 0;
260 threadcreate(inputthread, nil, STACK);
261 threadcreate(outputthread, nil, STACK);
263 /* if(rootfid) */
264 /* dorootstat(); */
266 threadcreate(listenthread, nil, STACK);
267 threadexits(0);
270 int
271 ignorepipe(void *v, char *s)
273 USED(v);
274 if(strcmp(s, "sys: write on closed pipe") == 0)
275 return 1;
276 if(strcmp(s, "sys: tstp") == 0)
277 return 1;
278 if(strcmp(s, "sys: window size change") == 0)
279 return 1;
280 fprint(2, "9pserve %s: %T note: %s\n", addr, s);
281 return 0;
284 void
285 listenthread(void *arg)
287 Conn *c;
288 Ioproc *io;
290 io = ioproc();
291 USED(arg);
292 threadsetname("listen %s", adir);
293 for(;;){
294 c = emalloc(sizeof(Conn));
295 c->fd = iolisten(io, adir, c->dir);
296 if(c->fd < 0){
297 if(verbose) fprint(2, "%T listen: %r\n");
298 close(afd);
299 free(c);
300 return;
302 c->inc = chancreate(sizeof(void*), 0);
303 c->internal = chancreate(sizeof(void*), 0);
304 c->inq = qalloc();
305 c->outq = qalloc();
306 c->outqdead = chancreate(sizeof(void*), 0);
307 if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
308 threadcreate(connthread, c, STACK);
312 void
313 send9pmsg(Msg *m)
315 int n, nn;
317 n = sizeS2Mu(&m->rx, m->c->dotu);
318 m->rpkt = emalloc(n);
319 nn = convS2Mu(&m->rx, m->rpkt, n, m->c->dotu);
320 if(nn <= BIT16SZ)
321 sysfatal("convS2Mu conversion error");
322 if(nn != n)
323 sysfatal("sizeS2Mu and convS2Mu disagree");
324 sendq(m->c->outq, m);
327 void
328 sendomsg(Msg *m)
330 int n, nn;
332 n = sizeS2Mu(&m->tx, m->c->dotu);
333 m->tpkt = emalloc(n);
334 nn = convS2Mu(&m->tx, m->tpkt, n, m->c->dotu);
335 if(nn <= BIT16SZ)
336 sysfatal("convS2Mu conversion error");
337 if(nn != n)
338 sysfatal("sizeS2Mu and convS2Mu disagree");
339 sendq(outq, m);
342 void
343 err(Msg *m, char *ename)
345 m->rx.type = Rerror;
346 m->rx.ename = ename;
347 m->rx.tag = m->tx.tag;
348 send9pmsg(m);
351 char*
352 estrdup(char *s)
354 char *t;
356 t = emalloc(strlen(s)+1);
357 strcpy(t, s);
358 return t;
361 void
362 connthread(void *arg)
364 int i, fd;
365 Conn *c;
366 Hash *h, *hnext;
367 Msg *m, *om, *mm, sync;
368 Fid *f;
369 Ioproc *io;
371 c = arg;
372 threadsetname("conn %s", c->dir);
373 io = ioproc();
374 fd = ioaccept(io, c->fd, c->dir);
375 if(fd < 0){
376 if(verbose) fprint(2, "%T accept %s: %r\n", c->dir);
377 goto out;
379 close(c->fd);
380 c->fd = fd;
381 threadcreate(connoutthread, c, STACK);
382 while((m = mread9p(io, c->fd, c->dotu)) != nil){
383 if(verbose > 1) fprint(2, "%T fd#%d -> %F\n", c->fd, &m->tx);
384 m->c = c;
385 m->ctag = m->tx.tag;
386 c->nmsg++;
387 if(verbose > 1) fprint(2, "%T fd#%d: new msg %p\n", c->fd, m);
388 if(puthash(c->tag, m->tx.tag, m) < 0){
389 err(m, "duplicate tag");
390 continue;
392 msgincref(m);
393 switch(m->tx.type){
394 case Tversion:
395 m->rx.tag = m->tx.tag;
396 m->rx.msize = m->tx.msize;
397 if(m->rx.msize > msize)
398 m->rx.msize = msize;
399 m->rx.version = "9P2000";
400 c->dotu = 0;
401 if(dotu && strncmp(m->tx.version, "9P2000.u", 8) == 0){
402 m->rx.version = "9P2000.u";
403 c->dotu = 1;
405 m->rx.type = Rversion;
406 send9pmsg(m);
407 continue;
408 case Tflush:
409 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
410 m->rx.tag = m->tx.tag;
411 m->rx.type = Rflush;
412 send9pmsg(m);
413 continue;
415 msgincref(m->oldm);
416 break;
417 case Tattach:
418 m->afid = nil;
419 if(m->tx.afid != NOFID
420 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){
421 err(m, "unknown fid");
422 continue;
424 if(m->afid)
425 m->afid->ref++;
426 m->fid = fidnew(m->tx.fid);
427 if(puthash(c->fid, m->tx.fid, m->fid) < 0){
428 err(m, "duplicate fid");
429 continue;
431 m->fid->ref++;
432 if(attached && m->afid==nil){
433 if(m->tx.aname[0] && strcmp(xaname, m->tx.aname) != 0){
434 err(m, "invalid attach name");
435 continue;
437 m->tx.afid = xafid;
438 m->tx.aname = xaname;
439 m->tx.uname = getuser(); /* what srv.c used */
440 repack(&m->tx, &m->tpkt, c->dotu);
442 break;
443 case Twalk:
444 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
445 err(m, "unknown fid");
446 continue;
448 m->fid->ref++;
449 if(m->tx.newfid == m->tx.fid){
450 m->fid->ref++;
451 m->newfid = m->fid;
452 }else{
453 m->newfid = fidnew(m->tx.newfid);
454 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
455 err(m, "duplicate fid");
456 continue;
458 m->newfid->ref++;
460 break;
461 case Tauth:
462 if(attached){
463 err(m, "authentication not required");
464 continue;
466 if(noauth){
467 err(m, "authentication rejected");
468 continue;
470 m->afid = fidnew(m->tx.afid);
471 if(puthash(c->fid, m->tx.afid, m->afid) < 0){
472 err(m, "duplicate fid");
473 continue;
475 m->afid->ref++;
476 break;
477 case Tcreate:
478 if(dotu && !c->dotu && (m->tx.perm&(DMSYMLINK|DMDEVICE|DMNAMEDPIPE|DMSOCKET))){
479 err(m, "unsupported file type");
480 continue;
482 goto caseTopen;
483 case Topenfd:
484 if(m->tx.mode&~(OTRUNC|3)){
485 err(m, "bad openfd mode");
486 continue;
488 m->isopenfd = 1;
489 m->tx.type = Topen;
490 m->tpkt[4] = Topen;
491 /* fall through */
492 caseTopen:
493 case Topen:
494 case Tclunk:
495 case Tread:
496 case Twrite:
497 case Tremove:
498 case Tstat:
499 case Twstat:
500 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
501 err(m, "unknown fid");
502 continue;
504 m->fid->ref++;
505 if(m->tx.type==Twstat && dotu && !c->dotu){
506 if(cvtustat(&m->tx, &m->tpkt, 1) < 0){
507 err(m, "cannot convert stat buffer");
508 continue;
511 if(m->tx.type==Tread && m->fid->isdir && dotu && !c->dotu){
512 if(m->tx.offset = m->fid->coffset)
513 m->tx.offset = m->fid->offset;
514 else
515 m->fid->offset = m->fid->coffset;
517 break;
520 /* have everything - translate and send */
521 m->c = c;
522 m->ctag = m->tx.tag;
523 m->tx.tag = m->tag;
524 if(m->fid)
525 m->tx.fid = m->fid->fid;
526 if(m->newfid)
527 m->tx.newfid = m->newfid->fid;
528 if(m->afid)
529 m->tx.afid = m->afid->fid;
530 if(m->oldm)
531 m->tx.oldtag = m->oldm->tag;
532 /* reference passes to outq */
533 sendq(outq, m);
534 while(c->nmsg >= MAXMSG){
535 c->inputstalled = 1;
536 recvp(c->inc);
540 if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
542 /* flush all outstanding messages */
543 for(i=0; i<NHASH; i++){
544 while((h = c->tag[i]) != nil){
545 om = h->v;
546 msgincref(om); /* for us */
547 m = msgnew(0);
548 m->internal = 1;
549 m->c = c;
550 c->nmsg++;
551 m->tx.type = Tflush;
552 m->tx.tag = m->tag;
553 m->tx.oldtag = om->tag;
554 m->oldm = om;
555 msgincref(om);
556 msgincref(m); /* for outq */
557 sendomsg(m);
558 mm = recvp(c->internal);
559 assert(mm == m);
560 msgput(m); /* got from recvp */
561 msgput(m); /* got from msgnew */
562 if(delhash(c->tag, om->ctag, om) == 0)
563 msgput(om); /* got from hash table */
564 msgput(om); /* got from msgincref */
568 /*
569 * outputthread has written all its messages
570 * to the remote connection (because we've gotten all the replies!),
571 * but it might not have gotten a chance to msgput
572 * the very last one. sync up to make sure.
573 */
574 memset(&sync, 0, sizeof sync);
575 sync.sync = 1;
576 sync.c = c;
577 sendq(outq, &sync);
578 recvp(c->outqdead);
580 /* everything is quiet; can close the local output queue. */
581 sendq(c->outq, nil);
582 recvp(c->outqdead);
584 /* should be no messages left anywhere. */
585 assert(c->nmsg == 0);
587 /* clunk all outstanding fids */
588 for(i=0; i<NHASH; i++){
589 for(h=c->fid[i]; h; h=hnext){
590 f = h->v;
591 m = msgnew(0);
592 m->internal = 1;
593 m->c = c;
594 c->nmsg++;
595 m->tx.type = Tclunk;
596 m->tx.tag = m->tag;
597 m->tx.fid = f->fid;
598 m->fid = f;
599 f->ref++;
600 msgincref(m);
601 sendomsg(m);
602 mm = recvp(c->internal);
603 assert(mm == m);
604 msgclear(m);
605 msgput(m); /* got from recvp */
606 msgput(m); /* got from msgnew */
607 fidput(f); /* got from hash table */
608 hnext = h->next;
609 free(h);
613 out:
614 closeioproc(io);
615 assert(c->nmsg == 0);
616 assert(c->nfid == 0);
617 close(c->fd);
618 chanfree(c->internal);
619 c->internal = 0;
620 chanfree(c->inc);
621 c->inc = 0;
622 free(c->inq);
623 c->inq = 0;
624 free(c);
627 static void
628 openfdthread(void *v)
630 Conn *c;
631 Fid *fid;
632 Msg *m;
633 int n;
634 vlong tot;
635 Ioproc *io;
636 char buf[1024];
638 c = v;
639 fid = c->fdfid;
640 io = ioproc();
641 threadsetname("openfd %s", c->fdfid);
642 tot = 0;
643 m = nil;
644 if(c->fdmode == OREAD){
645 for(;;){
646 if(verbose) fprint(2, "%T tread...");
647 m = msgnew(0);
648 m->internal = 1;
649 m->c = c;
650 m->tx.type = Tread;
651 m->tx.count = msize - IOHDRSZ;
652 m->tx.fid = fid->fid;
653 m->tx.tag = m->tag;
654 m->tx.offset = tot;
655 m->fid = fid;
656 fid->ref++;
657 msgincref(m);
658 sendomsg(m);
659 recvp(c->internal);
660 if(m->rx.type == Rerror){
661 /* fprint(2, "%T read error: %s\n", m->rx.ename); */
662 break;
664 if(m->rx.count == 0)
665 break;
666 tot += m->rx.count;
667 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
668 /* fprint(2, "%T pipe write error: %r\n"); */
669 break;
671 msgput(m);
672 msgput(m);
673 m = nil;
675 }else{
676 for(;;){
677 if(verbose) fprint(2, "%T twrite...");
678 n = sizeof buf;
679 if(n > msize)
680 n = msize;
681 if((n=ioread(io, c->fd, buf, n)) <= 0){
682 if(n < 0)
683 fprint(2, "%T pipe read error: %r\n");
684 break;
686 m = msgnew(0);
687 m->internal = 1;
688 m->c = c;
689 m->tx.type = Twrite;
690 m->tx.fid = fid->fid;
691 m->tx.data = buf;
692 m->tx.count = n;
693 m->tx.tag = m->tag;
694 m->tx.offset = tot;
695 m->fid = fid;
696 fid->ref++;
697 msgincref(m);
698 sendomsg(m);
699 recvp(c->internal);
700 if(m->rx.type == Rerror){
701 /* fprint(2, "%T write error: %s\n", m->rx.ename); */
703 tot += n;
704 msgput(m);
705 msgput(m);
706 m = nil;
709 if(verbose) fprint(2, "%T eof on %d fid %d\n", c->fd, fid->fid);
710 close(c->fd);
711 closeioproc(io);
712 if(m){
713 msgput(m);
714 msgput(m);
716 if(verbose) fprint(2, "%T eof on %d fid %d ref %d\n", c->fd, fid->fid, fid->ref);
717 if(--fid->openfd == 0){
718 m = msgnew(0);
719 m->internal = 1;
720 m->c = c;
721 m->tx.type = Tclunk;
722 m->tx.tag = m->tag;
723 m->tx.fid = fid->fid;
724 m->fid = fid;
725 fid->ref++;
726 msgincref(m);
727 sendomsg(m);
728 recvp(c->internal);
729 msgput(m);
730 msgput(m);
732 fidput(fid);
733 c->fdfid = nil;
734 chanfree(c->internal);
735 c->internal = 0;
736 free(c);
739 int
740 xopenfd(Msg *m)
742 char errs[ERRMAX];
743 int n, p[2];
744 Conn *nc;
746 if(pipe(p) < 0){
747 rerrstr(errs, sizeof errs);
748 err(m, errs);
749 /* XXX return here? */
751 if(verbose) fprint(2, "%T xopen pipe %d %d...", p[0], p[1]);
753 /* now we're committed. */
755 /* a new connection for this fid */
756 nc = emalloc(sizeof(Conn));
757 nc->internal = chancreate(sizeof(void*), 0);
759 /* a ref for us */
760 nc->fdfid = m->fid;
761 m->fid->ref++;
762 nc->fdfid->openfd++;
763 nc->fdmode = m->tx.mode;
764 nc->fd = p[0];
766 /* a thread to tend the pipe */
767 threadcreate(openfdthread, nc, STACK);
769 /* if mode is ORDWR, that openfdthread will write; start a reader */
770 if((m->tx.mode&3) == ORDWR){
771 nc = emalloc(sizeof(Conn));
772 nc->internal = chancreate(sizeof(void*), 0);
773 nc->fdfid = m->fid;
774 m->fid->ref++;
775 nc->fdfid->openfd++;
776 nc->fdmode = OREAD;
777 nc->fd = dup(p[0], -1);
778 threadcreate(openfdthread, nc, STACK);
781 /* steal fid from other connection */
782 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
783 fidput(m->fid);
785 /* rewrite as Ropenfd */
786 m->rx.type = Ropenfd;
787 n = GBIT32(m->rpkt);
788 m->rpkt = erealloc(m->rpkt, n+4);
789 PBIT32(m->rpkt+n, p[1]);
790 n += 4;
791 PBIT32(m->rpkt, n);
792 m->rpkt[4] = Ropenfd;
793 m->rx.unixfd = p[1];
794 return 0;
797 void
798 connoutthread(void *arg)
800 char *ename;
801 int err;
802 Conn *c;
803 Msg *m, *om;
804 Ioproc *io;
806 c = arg;
807 io = ioproc();
808 threadsetname("connout %s", c->dir);
809 while((m = recvq(c->outq)) != nil){
810 err = m->tx.type+1 != m->rx.type;
811 if(!err && m->isopenfd)
812 if(xopenfd(m) < 0)
813 continue;
814 switch(m->tx.type){
815 case Tflush:
816 om = m->oldm;
817 if(om)
818 if(delhash(om->c->tag, om->ctag, om) == 0)
819 msgput(om);
820 break;
821 case Tclunk:
822 case Tremove:
823 if(m->fid)
824 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
825 fidput(m->fid);
826 break;
827 case Tauth:
828 if(err && m->afid){
829 if(verbose) fprint(2, "%T auth error\n");
830 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
831 fidput(m->afid);
833 break;
834 case Tattach:
835 if(err && m->fid)
836 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
837 fidput(m->fid);
838 break;
839 case Twalk:
840 if(err || m->rx.nwqid < m->tx.nwname)
841 if(m->tx.fid != m->tx.newfid && m->newfid)
842 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
843 fidput(m->newfid);
844 break;
845 case Tread:
846 if(!err && m->fid->isdir && dotu && !m->c->dotu){
847 m->fid->offset += m->rx.count;
848 stripudirread(m);
849 m->fid->coffset += m->rx.count;
851 break;
852 case Tstat:
853 if(!err && dotu && !m->c->dotu)
854 cvtustat(&m->rx, &m->rpkt, 0);
855 break;
856 case Topen:
857 case Tcreate:
858 m->fid->isdir = (m->rx.qid.type & QTDIR);
859 break;
861 if(m->rx.type==Rerror && dotu && !c->dotu){
862 ename = estrdup(m->rx.ename);
863 m->rx.ename = ename;
864 repack(&m->rx, &m->rpkt, c->dotu);
865 free(ename);
866 m->rx.ename = "XXX";
868 if(delhash(m->c->tag, m->ctag, m) == 0)
869 msgput(m);
870 if(verbose > 1) fprint(2, "%T fd#%d <- %F\n", c->fd, &m->rx);
871 rewritehdr(&m->rx, m->rpkt);
872 if(mwrite9p(io, c->fd, m->rpkt) < 0)
873 if(verbose) fprint(2, "%T write error: %r\n");
874 msgput(m);
875 if(c->inputstalled && c->nmsg < MAXMSG)
876 nbsendp(c->inc, 0);
878 closeioproc(io);
879 free(c->outq);
880 c->outq = nil;
881 sendp(c->outqdead, nil);
884 void
885 outputthread(void *arg)
887 Msg *m;
888 Ioproc *io;
890 USED(arg);
891 io = ioproc();
892 threadsetname("output");
893 while((m = recvq(outq)) != nil){
894 if(m->sync){
895 sendp(m->c->outqdead, nil);
896 continue;
898 if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
899 rewritehdr(&m->tx, m->tpkt);
900 if(mwrite9p(io, 1, m->tpkt) < 0)
901 sysfatal("output error: %r");
902 msgput(m);
904 closeioproc(io);
905 fprint(2, "%T output eof\n");
906 threadexitsall(0);
909 void
910 inputthread(void *arg)
912 uchar *pkt;
913 int n, nn, tag;
914 Msg *m;
915 Ioproc *io;
917 threadsetname("input");
918 if(verbose) fprint(2, "%T input thread\n");
919 io = ioproc();
920 USED(arg);
921 while((pkt = read9ppkt(io, 0)) != nil){
922 n = GBIT32(pkt);
923 if(n < 7){
924 fprint(2, "%T short 9P packet from server\n");
925 free(pkt);
926 continue;
928 if(verbose > 2) fprint(2, "%T read %.*H\n", n, pkt);
929 tag = GBIT16(pkt+5);
930 if((m = msgget(tag)) == nil){
931 fprint(2, "%T unexpected 9P response tag %d\n", tag);
932 free(pkt);
933 continue;
935 if((nn = convM2Su(pkt, n, &m->rx, dotu)) != n){
936 fprint(2, "%T bad packet - convM2S %d but %d\n", nn, n);
937 free(pkt);
938 msgput(m);
939 continue;
941 if(verbose > 1) fprint(2, "%T * -> %F%s\n", &m->rx,
942 m->internal ? " (internal)" : "");
943 m->rpkt = pkt;
944 m->rx.tag = m->ctag;
945 if(m->internal)
946 sendp(m->c->internal, m);
947 else if(m->c->outq)
948 sendq(m->c->outq, m);
949 else
950 msgput(m);
952 closeioproc(io);
953 /*fprint(2, "%T input eof\n"); */
954 threadexitsall(0);
957 void*
958 gethash(Hash **ht, uint n)
960 Hash *h;
962 for(h=ht[n%NHASH]; h; h=h->next)
963 if(h->n == n)
964 return h->v;
965 return nil;
968 int
969 delhash(Hash **ht, uint n, void *v)
971 Hash *h, **l;
973 for(l=&ht[n%NHASH]; h=*l; l=&h->next)
974 if(h->n == n){
975 if(h->v != v){
976 if(verbose) fprint(2, "%T delhash %d got %p want %p\n", n, h->v, v);
977 return -1;
979 *l = h->next;
980 free(h);
981 return 0;
983 return -1;
986 int
987 puthash(Hash **ht, uint n, void *v)
989 Hash *h;
991 if(gethash(ht, n))
992 return -1;
993 h = emalloc(sizeof(Hash));
994 h->next = ht[n%NHASH];
995 h->n = n;
996 h->v = v;
997 ht[n%NHASH] = h;
998 return 0;
1001 Fid **fidtab;
1002 int nfidtab;
1003 Fid *freefid;
1005 Fid*
1006 fidnew(int cfid)
1008 Fid *f;
1010 if(freefid == nil){
1011 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
1012 if(nfidtab == xafid){
1013 fidtab[nfidtab++] = nil;
1014 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
1016 fidtab[nfidtab] = emalloc(sizeof(Fid));
1017 freefid = fidtab[nfidtab];
1018 freefid->fid = nfidtab++;
1020 f = freefid;
1021 freefid = f->next;
1022 f->cfid = cfid;
1023 f->ref = 1;
1024 f->offset = 0;
1025 f->coffset = 0;
1026 f->isdir = -1;
1027 return f;
1030 void
1031 fidput(Fid *f)
1033 if(f == nil)
1034 return;
1035 assert(f->ref > 0);
1036 if(--f->ref > 0)
1037 return;
1038 f->next = freefid;
1039 f->cfid = -1;
1040 freefid = f;
1043 Msg **msgtab;
1044 int nmsgtab;
1045 int nmsg;
1046 Msg *freemsg;
1048 void
1049 msgincref(Msg *m)
1051 if(verbose > 1) fprint(2, "%T msgincref @0x%lux %p tag %d/%d ref %d=>%d\n",
1052 getcallerpc(&m), m, m->tag, m->ctag, m->ref, m->ref+1);
1053 m->ref++;
1056 Msg*
1057 msgnew(int x)
1059 Msg *m;
1061 if(freemsg == nil){
1062 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
1063 msgtab[nmsgtab] = emalloc(sizeof(Msg));
1064 freemsg = msgtab[nmsgtab];
1065 freemsg->tag = nmsgtab++;
1067 m = freemsg;
1068 freemsg = m->next;
1069 m->ref = 1;
1070 if(verbose > 1) fprint(2, "%T msgnew @0x%lux %p tag %d ref %d\n",
1071 getcallerpc(&x), m, m->tag, m->ref);
1072 nmsg++;
1073 return m;
1077 * Clear data associated with connections, so that
1078 * if all msgs have been msgcleared, the connection
1079 * can be freed. Note that this does *not* free the tpkt
1080 * and rpkt; they are freed in msgput with the msg itself.
1081 * The io write thread might still be holding a ref to msg
1082 * even once the connection has finished with it.
1084 void
1085 msgclear(Msg *m)
1087 if(m->c){
1088 m->c->nmsg--;
1089 m->c = nil;
1091 if(m->oldm){
1092 msgput(m->oldm);
1093 m->oldm = nil;
1095 if(m->fid){
1096 fidput(m->fid);
1097 m->fid = nil;
1099 if(m->afid){
1100 fidput(m->afid);
1101 m->afid = nil;
1103 if(m->newfid){
1104 fidput(m->newfid);
1105 m->newfid = nil;
1107 if(m->rx.type == Ropenfd && m->rx.unixfd >= 0){
1108 close(m->rx.unixfd);
1109 m->rx.unixfd = -1;
1113 void
1114 msgput(Msg *m)
1116 if(m == nil)
1117 return;
1119 if(verbose > 1) fprint(2, "%T msgput 0x%lux %p tag %d/%d ref %d\n",
1120 getcallerpc(&m), m, m->tag, m->ctag, m->ref);
1121 assert(m->ref > 0);
1122 if(--m->ref > 0)
1123 return;
1124 nmsg--;
1125 msgclear(m);
1126 if(m->tpkt){
1127 free(m->tpkt);
1128 m->tpkt = nil;
1130 if(m->rpkt){
1131 free(m->rpkt);
1132 m->rpkt = nil;
1134 m->isopenfd = 0;
1135 m->internal = 0;
1136 m->next = freemsg;
1137 freemsg = m;
1140 Msg*
1141 msgget(int n)
1143 Msg *m;
1145 if(n < 0 || n >= nmsgtab)
1146 return nil;
1147 m = msgtab[n];
1148 if(m->ref == 0)
1149 return nil;
1150 if(verbose) fprint(2, "%T msgget %d = %p\n", n, m);
1151 msgincref(m);
1152 return m;
1156 void*
1157 emalloc(int n)
1159 void *v;
1161 v = mallocz(n, 1);
1162 if(v == nil){
1163 abort();
1164 sysfatal("out of memory allocating %d", n);
1166 return v;
1169 void*
1170 erealloc(void *v, int n)
1172 v = realloc(v, n);
1173 if(v == nil){
1174 abort();
1175 sysfatal("out of memory reallocating %d", n);
1177 return v;
1180 typedef struct Qel Qel;
1181 struct Qel
1183 Qel *next;
1184 void *p;
1187 struct Queue
1189 QLock lk;
1190 Rendez r;
1191 Qel *head;
1192 Qel *tail;
1195 Queue*
1196 qalloc(void)
1198 Queue *q;
1200 q = mallocz(sizeof(Queue), 1);
1201 if(q == nil)
1202 return nil;
1203 q->r.l = &q->lk;
1204 return q;
1207 int
1208 sendq(Queue *q, void *p)
1210 Qel *e;
1212 e = emalloc(sizeof(Qel));
1213 qlock(&q->lk);
1214 e->p = p;
1215 e->next = nil;
1216 if(q->head == nil)
1217 q->head = e;
1218 else
1219 q->tail->next = e;
1220 q->tail = e;
1221 rwakeup(&q->r);
1222 qunlock(&q->lk);
1223 return 0;
1226 void*
1227 recvq(Queue *q)
1229 void *p;
1230 Qel *e;
1232 qlock(&q->lk);
1233 while(q->head == nil)
1234 rsleep(&q->r);
1235 e = q->head;
1236 q->head = e->next;
1237 qunlock(&q->lk);
1238 p = e->p;
1239 free(e);
1240 return p;
1243 uchar*
1244 read9ppkt(Ioproc *io, int fd)
1246 uchar buf[4], *pkt;
1247 int n, nn;
1249 n = ioreadn(io, fd, buf, 4);
1250 if(n != 4)
1251 return nil;
1252 n = GBIT32(buf);
1253 if(n > MAXMSGSIZE)
1254 return nil;
1255 pkt = emalloc(n);
1256 PBIT32(pkt, n);
1257 nn = ioreadn(io, fd, pkt+4, n-4);
1258 if(nn != n-4){
1259 free(pkt);
1260 return nil;
1262 /* would do this if we ever got one of these, but we only generate them
1263 if(pkt[4] == Ropenfd){
1264 newfd = iorecvfd(io, fd);
1265 PBIT32(pkt+n-4, newfd);
1268 return pkt;
1271 Msg*
1272 mread9p(Ioproc *io, int fd, int dotu)
1274 int n, nn;
1275 uchar *pkt;
1276 Msg *m;
1278 if((pkt = read9ppkt(io, fd)) == nil)
1279 return nil;
1281 m = msgnew(0);
1282 m->tpkt = pkt;
1283 n = GBIT32(pkt);
1284 nn = convM2Su(pkt, n, &m->tx, dotu);
1285 if(nn != n){
1286 fprint(2, "%T read bad packet from %d\n", fd);
1287 return nil;
1289 return m;
1292 int
1293 mwrite9p(Ioproc *io, int fd, uchar *pkt)
1295 int n, nfd;
1297 n = GBIT32(pkt);
1298 if(verbose > 2) fprint(2, "%T write %d %d %.*H\n", fd, n, n, pkt);
1299 if(verbose > 1) fprint(2, "%T before iowrite\n");
1300 if(iowrite(io, fd, pkt, n) != n){
1301 fprint(2, "%T write error: %r\n");
1302 return -1;
1304 if(verbose > 1) fprint(2, "%T after iowrite\n");
1305 if(pkt[4] == Ropenfd){
1306 nfd = GBIT32(pkt+n-4);
1307 if(iosendfd(io, fd, nfd) < 0){
1308 fprint(2, "%T send fd error: %r\n");
1309 return -1;
1312 return 0;
1315 void
1316 restring(uchar *pkt, int pn, char *s)
1318 int n;
1320 if(s < (char*)pkt || s >= (char*)pkt+pn)
1321 return;
1323 n = strlen(s);
1324 memmove(s+1, s, n);
1325 PBIT16((uchar*)s-1, n);
1328 void
1329 repack(Fcall *f, uchar **ppkt, int dotu)
1331 uint n, nn;
1332 uchar *pkt;
1334 pkt = *ppkt;
1335 n = GBIT32(pkt);
1336 nn = sizeS2Mu(f, dotu);
1337 if(nn > n){
1338 free(pkt);
1339 pkt = emalloc(nn);
1340 *ppkt = pkt;
1342 n = convS2Mu(f, pkt, nn, dotu);
1343 if(n <= BIT16SZ)
1344 sysfatal("convS2M conversion error");
1345 if(n != nn)
1346 sysfatal("convS2Mu and sizeS2Mu disagree");
1349 void
1350 rewritehdr(Fcall *f, uchar *pkt)
1352 int i, n;
1354 n = GBIT32(pkt);
1355 PBIT16(pkt+5, f->tag);
1356 switch(f->type){
1357 case Tversion:
1358 case Rversion:
1359 restring(pkt, n, f->version);
1360 break;
1361 case Tauth:
1362 PBIT32(pkt+7, f->afid);
1363 restring(pkt, n, f->uname);
1364 restring(pkt, n, f->aname);
1365 break;
1366 case Tflush:
1367 PBIT16(pkt+7, f->oldtag);
1368 break;
1369 case Tattach:
1370 restring(pkt, n, f->uname);
1371 restring(pkt, n, f->aname);
1372 PBIT32(pkt+7, f->fid);
1373 PBIT32(pkt+11, f->afid);
1374 break;
1375 case Twalk:
1376 PBIT32(pkt+7, f->fid);
1377 PBIT32(pkt+11, f->newfid);
1378 for(i=0; i<f->nwname; i++)
1379 restring(pkt, n, f->wname[i]);
1380 break;
1381 case Tcreate:
1382 restring(pkt, n, f->name);
1383 /* fall through */
1384 case Topen:
1385 case Tclunk:
1386 case Tremove:
1387 case Tstat:
1388 case Twstat:
1389 case Twrite:
1390 PBIT32(pkt+7, f->fid);
1391 break;
1392 case Tread:
1393 PBIT32(pkt+7, f->fid);
1394 PBIT64(pkt+11, f->offset);
1395 break;
1396 case Rerror:
1397 restring(pkt, n, f->ename);
1398 break;
1402 static long
1403 _iolisten(va_list *arg)
1405 char *a, *b;
1407 a = va_arg(*arg, char*);
1408 b = va_arg(*arg, char*);
1409 return listen(a, b);
1412 int
1413 iolisten(Ioproc *io, char *a, char *b)
1415 return iocall(io, _iolisten, a, b);
1418 static long
1419 _ioaccept(va_list *arg)
1421 int fd;
1422 char *dir;
1424 fd = va_arg(*arg, int);
1425 dir = va_arg(*arg, char*);
1426 return accept(fd, dir);
1429 int
1430 ioaccept(Ioproc *io, int fd, char *dir)
1432 return iocall(io, _ioaccept, fd, dir);
1435 int
1436 timefmt(Fmt *fmt)
1438 static char *mon[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1439 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1440 vlong ns;
1441 Tm tm;
1442 ns = nsec();
1443 tm = *localtime(time(0));
1444 return fmtprint(fmt, "%s %2d %02d:%02d:%02d.%03d",
1445 mon[tm.mon], tm.mday, tm.hour, tm.min, tm.sec,
1446 (int)(ns%1000000000)/1000000);
1449 int
1450 cvtustat(Fcall *f, uchar **fpkt, int tounix)
1452 int n;
1453 uchar *buf;
1454 char *str;
1455 Dir dir;
1457 str = emalloc(f->nstat);
1458 n = convM2Du(f->stat, f->nstat, &dir, str, !tounix);
1459 if(n <= BIT16SZ){
1460 free(str);
1461 return -1;
1464 n = sizeD2Mu(&dir, tounix);
1465 buf = emalloc(n);
1466 if(convD2Mu(&dir, buf, n, tounix) != n)
1467 sysfatal("convD2Mu conversion error");
1468 f->nstat = n;
1469 f->stat = buf;
1471 repack(f, fpkt, dotu);
1472 free(buf);
1473 f->stat = nil; /* is this okay ??? */
1474 free(str);
1476 return 0;
1479 int
1480 stripudirread(Msg* msg)
1482 char *str;
1483 int i, m, n, nn;
1484 uchar *buf;
1485 Dir d;
1486 Fcall* rx;
1488 buf = nil;
1489 str = nil;
1490 rx = &msg->rx;
1491 n = 0;
1492 nn = 0;
1493 for(i = 0; i < rx->count; i += m){
1494 m = BIT16SZ + GBIT16(&rx->data[i]);
1495 if(statchecku((uchar*)&rx->data[i], m, 1) < 0)
1496 return -1;
1497 if(nn < m)
1498 nn = m;
1499 n++;
1502 str = emalloc(nn);
1503 buf = emalloc(rx->count);
1505 nn = 0;
1506 for(i = 0; i < rx->count; i += m){
1507 m = BIT16SZ + GBIT16(&rx->data[i]);
1508 if(convM2Du((uchar*)&rx->data[i], m, &d, str, 1) != m){
1509 free(buf);
1510 free(str);
1511 return -1;
1514 n = convD2M(&d, &buf[nn], rx->count - nn);
1515 if(n <= BIT16SZ){
1516 free(buf);
1517 free(str);
1518 return -1;
1521 nn += n;
1524 rx->count = nn;
1525 rx->data = (char*)buf;
1527 repack(&msg->rx, &msg->rpkt, 0);
1528 free(str);
1529 free(buf);
1530 rx->data = nil; /* is this okay ??? */
1532 return 0;