Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <fcall.h>
4 #include <thread.h>
5 #include <errno.h>
7 #define err err9pserve /* Darwin x86 */
9 enum
10 {
11 STACK = 32768,
12 NHASH = 31,
13 MAXMSG = 64, /* per connection */
14 MAXMSGSIZE = 4*1024*1024
15 };
17 typedef struct Hash Hash;
18 typedef struct Fid Fid;
19 typedef struct Msg Msg;
20 typedef struct Conn Conn;
21 typedef struct Queue Queue;
23 struct Hash
24 {
25 Hash *next;
26 uint n;
27 void *v;
28 };
30 struct Fid
31 {
32 int fid;
33 int ref;
34 int cfid;
35 int openfd;
36 int offset;
37 int coffset;
38 int isdir;
39 Fid *next;
40 };
42 struct Msg
43 {
44 Conn *c;
45 int internal;
46 int sync;
47 int ref;
48 int ctag;
49 int tag;
50 int isopenfd;
51 Fcall tx;
52 Fcall rx;
53 Fid *fid;
54 Fid *newfid;
55 Fid *afid;
56 Msg *oldm;
57 Msg *next;
58 uchar *tpkt;
59 uchar *rpkt;
60 };
62 struct Conn
63 {
64 int fd;
65 int fdmode;
66 Fid *fdfid;
67 int nmsg;
68 int nfid;
69 Channel *inc;
70 Channel *internal;
71 int inputstalled;
72 char dir[40];
73 Hash *tag[NHASH];
74 Hash *fid[NHASH];
75 Queue *outq;
76 Queue *inq;
77 Channel *outqdead;
78 int dotu;
79 };
81 char *xaname;
82 char *addr;
83 int afd;
84 char adir[40];
85 int isunix;
86 Queue *outq;
87 Queue *inq;
88 int verbose = 0;
89 int logging = 0;
90 int msize = 8192;
91 u32int xafid = NOFID;
92 int attached;
93 int versioned;
94 int dotu;
95 int noauth;
97 void *gethash(Hash**, uint);
98 int puthash(Hash**, uint, void*);
99 int delhash(Hash**, uint, void*);
100 Msg *mread9p(Ioproc*, int, int);
101 int mwrite9p(Ioproc*, int, uchar*);
102 uchar *read9ppkt(Ioproc*, int);
103 int write9ppkt(int, uchar*);
104 Msg *msgnew(int);
105 void msgput(Msg*);
106 void msgclear(Msg*);
107 Msg *msgget(int);
108 void msgincref(Msg*);
109 Fid *fidnew(int);
110 void fidput(Fid*);
111 void *emalloc(int);
112 void *erealloc(void*, int);
113 Queue *qalloc(void);
114 int sendq(Queue*, void*);
115 void *recvq(Queue*);
116 void connthread(void*);
117 void connoutthread(void*);
118 void listenthread(void*);
119 void outputthread(void*);
120 void inputthread(void*);
121 void rewritehdr(Fcall*, uchar*);
122 void repack(Fcall*, uchar**, int);
123 int tlisten(char*, char*);
124 int taccept(int, char*);
125 int iolisten(Ioproc*, char*, char*);
126 int ioaccept(Ioproc*, int, char*);
127 int iorecvfd(Ioproc*, int);
128 int iosendfd(Ioproc*, int, int);
129 void mainproc(void*);
130 int ignorepipe(void*, char*);
131 int timefmt(Fmt*);
132 void dorootstat(void);
133 int stripudirread(Msg*);
134 int cvtustat(Fcall*, uchar**, int);
136 void
137 usage(void)
139 fprint(2, "usage: 9pserve [-lnv] [-A aname afid] [-c addr] [-M msize] address\n");
140 fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
141 threadexitsall("usage");
144 uchar vbuf[128];
145 extern int _threaddebuglevel;
146 void
147 threadmain(int argc, char **argv)
149 char *file, *x, *addr;
150 int fd;
152 rfork(RFNOTEG);
153 x = getenv("verbose9pserve");
154 if(x){
155 verbose = atoi(x);
156 fprint(2, "verbose9pserve %s => %d\n", x, verbose);
158 ARGBEGIN{
159 default:
160 usage();
161 case 'A':
162 attached = 1;
163 xaname = EARGF(usage());
164 xafid = atoi(EARGF(usage()));
165 break;
166 case 'M':
167 versioned = 1;
168 msize = atoi(EARGF(usage()));
169 break;
170 case 'c':
171 addr = netmkaddr(EARGF(usage()), "net", "9fs");
172 if((fd = dial(addr, nil, nil, nil)) < 0)
173 sysfatal("dial %s: %r", addr);
174 dup(fd, 0);
175 dup(fd, 1);
176 if(fd > 1)
177 close(fd);
178 break;
179 case 'n':
180 noauth = 1;
181 break;
182 case 'v':
183 verbose++;
184 break;
185 case 'u':
186 isunix++;
187 break;
188 case 'l':
189 logging++;
190 break;
191 }ARGEND
193 if(attached && !versioned){
194 fprint(2, "-A must be used with -M\n");
195 usage();
198 if(argc != 1)
199 usage();
200 addr = argv[0];
202 fmtinstall('T', timefmt);
204 if((afd = announce(addr, adir)) < 0)
205 sysfatal("announce %s: %r", addr);
206 if(logging){
207 if(strncmp(addr, "unix!", 5) == 0)
208 addr += 5;
209 file = smprint("%s.log", addr);
210 if(file == nil)
211 sysfatal("smprint log: %r");
212 if((fd = create(file, OWRITE, 0666)) < 0)
213 sysfatal("create %s: %r", file);
214 dup(fd, 2);
215 if(fd > 2)
216 close(fd);
218 if(verbose) fprint(2, "%T 9pserve running\n");
219 proccreate(mainproc, nil, STACK);
222 void
223 mainproc(void *v)
225 int n, nn;
226 Fcall f;
227 USED(v);
229 atnotify(ignorepipe, 1);
230 fmtinstall('D', dirfmt);
231 fmtinstall('M', dirmodefmt);
232 fmtinstall('F', fcallfmt);
233 fmtinstall('H', encodefmt);
235 outq = qalloc();
236 inq = qalloc();
238 if(!versioned){
239 f.type = Tversion;
240 f.version = "9P2000.u";
241 f.msize = msize;
242 f.tag = NOTAG;
243 n = convS2M(&f, vbuf, sizeof vbuf);
244 if(n <= BIT16SZ)
245 sysfatal("convS2M conversion error");
246 if(verbose > 1) fprint(2, "%T * <- %F\n", &f);
247 nn = write(1, vbuf, n);
248 if(n != nn)
249 sysfatal("error writing Tversion: %r\n");
250 n = read9pmsg(0, vbuf, sizeof vbuf);
251 if(n < 0)
252 sysfatal("read9pmsg failure");
253 if(convM2S(vbuf, n, &f) != n)
254 sysfatal("convM2S failure");
255 if(f.msize < msize)
256 msize = f.msize;
257 if(verbose > 1) fprint(2, "%T * -> %F\n", &f);
258 dotu = strncmp(f.version, "9P2000.u", 8) == 0;
261 threadcreate(inputthread, nil, STACK);
262 threadcreate(outputthread, nil, STACK);
264 /* if(rootfid) */
265 /* dorootstat(); */
267 threadcreate(listenthread, nil, STACK);
268 threadexits(0);
271 int
272 ignorepipe(void *v, char *s)
274 USED(v);
275 if(strcmp(s, "sys: write on closed pipe") == 0)
276 return 1;
277 if(strcmp(s, "sys: tstp") == 0)
278 return 1;
279 if(strcmp(s, "sys: window size change") == 0)
280 return 1;
281 fprint(2, "9pserve %s: %T note: %s\n", addr, s);
282 return 0;
285 void
286 listenthread(void *arg)
288 Conn *c;
289 Ioproc *io;
291 io = ioproc();
292 USED(arg);
293 threadsetname("listen %s", adir);
294 for(;;){
295 c = emalloc(sizeof(Conn));
296 c->fd = iolisten(io, adir, c->dir);
297 if(c->fd < 0){
298 if(verbose) fprint(2, "%T listen: %r\n");
299 close(afd);
300 free(c);
301 return;
303 c->inc = chancreate(sizeof(void*), 0);
304 c->internal = chancreate(sizeof(void*), 0);
305 c->inq = qalloc();
306 c->outq = qalloc();
307 c->outqdead = chancreate(sizeof(void*), 0);
308 if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
309 threadcreate(connthread, c, STACK);
313 void
314 send9pmsg(Msg *m)
316 int n, nn;
318 n = sizeS2Mu(&m->rx, m->c->dotu);
319 m->rpkt = emalloc(n);
320 nn = convS2Mu(&m->rx, m->rpkt, n, m->c->dotu);
321 if(nn <= BIT16SZ)
322 sysfatal("convS2Mu conversion error");
323 if(nn != n)
324 sysfatal("sizeS2Mu and convS2Mu disagree");
325 sendq(m->c->outq, m);
328 void
329 sendomsg(Msg *m)
331 int n, nn;
333 n = sizeS2Mu(&m->tx, m->c->dotu);
334 m->tpkt = emalloc(n);
335 nn = convS2Mu(&m->tx, m->tpkt, n, m->c->dotu);
336 if(nn <= BIT16SZ)
337 sysfatal("convS2Mu conversion error");
338 if(nn != n)
339 sysfatal("sizeS2Mu and convS2Mu disagree");
340 sendq(outq, m);
343 void
344 err(Msg *m, char *ename)
346 m->rx.type = Rerror;
347 m->rx.ename = ename;
348 m->rx.tag = m->tx.tag;
349 send9pmsg(m);
352 char*
353 estrdup(char *s)
355 char *t;
357 t = emalloc(strlen(s)+1);
358 strcpy(t, s);
359 return t;
362 void
363 connthread(void *arg)
365 int i, fd;
366 Conn *c;
367 Hash *h, *hnext;
368 Msg *m, *om, *mm, sync;
369 Fid *f;
370 Ioproc *io;
372 c = arg;
373 threadsetname("conn %s", c->dir);
374 io = ioproc();
375 fd = ioaccept(io, c->fd, c->dir);
376 if(fd < 0){
377 if(verbose) fprint(2, "%T accept %s: %r\n", c->dir);
378 goto out;
380 close(c->fd);
381 c->fd = fd;
382 threadcreate(connoutthread, c, STACK);
383 while((m = mread9p(io, c->fd, c->dotu)) != nil){
384 if(verbose > 1) fprint(2, "%T fd#%d -> %F\n", c->fd, &m->tx);
385 m->c = c;
386 m->ctag = m->tx.tag;
387 c->nmsg++;
388 if(verbose > 1) fprint(2, "%T fd#%d: new msg %p\n", c->fd, m);
389 if(puthash(c->tag, m->tx.tag, m) < 0){
390 err(m, "duplicate tag");
391 continue;
393 msgincref(m);
394 switch(m->tx.type){
395 case Tversion:
396 m->rx.tag = m->tx.tag;
397 m->rx.msize = m->tx.msize;
398 if(m->rx.msize > msize)
399 m->rx.msize = msize;
400 m->rx.version = "9P2000";
401 c->dotu = 0;
402 if(dotu && strncmp(m->tx.version, "9P2000.u", 8) == 0){
403 m->rx.version = "9P2000.u";
404 c->dotu = 1;
406 m->rx.type = Rversion;
407 send9pmsg(m);
408 continue;
409 case Tflush:
410 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
411 m->rx.tag = m->tx.tag;
412 m->rx.type = Rflush;
413 send9pmsg(m);
414 continue;
416 msgincref(m->oldm);
417 break;
418 case Tattach:
419 m->afid = nil;
420 if(m->tx.afid != NOFID
421 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){
422 err(m, "unknown fid");
423 continue;
425 if(m->afid)
426 m->afid->ref++;
427 m->fid = fidnew(m->tx.fid);
428 if(puthash(c->fid, m->tx.fid, m->fid) < 0){
429 err(m, "duplicate fid");
430 continue;
432 m->fid->ref++;
433 if(attached && m->afid==nil){
434 if(m->tx.aname[0] && strcmp(xaname, m->tx.aname) != 0){
435 err(m, "invalid attach name");
436 continue;
438 m->tx.afid = xafid;
439 m->tx.aname = xaname;
440 m->tx.uname = getuser(); /* what srv.c used */
441 repack(&m->tx, &m->tpkt, c->dotu);
443 break;
444 case Twalk:
445 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
446 err(m, "unknown fid");
447 continue;
449 m->fid->ref++;
450 if(m->tx.newfid == m->tx.fid){
451 m->fid->ref++;
452 m->newfid = m->fid;
453 }else{
454 m->newfid = fidnew(m->tx.newfid);
455 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
456 err(m, "duplicate fid");
457 continue;
459 m->newfid->ref++;
461 break;
462 case Tauth:
463 if(attached){
464 err(m, "authentication not required");
465 continue;
467 if(noauth){
468 err(m, "authentication rejected");
469 continue;
471 m->afid = fidnew(m->tx.afid);
472 if(puthash(c->fid, m->tx.afid, m->afid) < 0){
473 err(m, "duplicate fid");
474 continue;
476 m->afid->ref++;
477 break;
478 case Tcreate:
479 if(dotu && !c->dotu && (m->tx.perm&(DMSYMLINK|DMDEVICE|DMNAMEDPIPE|DMSOCKET))){
480 err(m, "unsupported file type");
481 continue;
483 goto caseTopen;
484 case Topenfd:
485 if(m->tx.mode&~(OTRUNC|3)){
486 err(m, "bad openfd mode");
487 continue;
489 m->isopenfd = 1;
490 m->tx.type = Topen;
491 m->tpkt[4] = Topen;
492 /* fall through */
493 caseTopen:
494 case Topen:
495 case Tclunk:
496 case Tread:
497 case Twrite:
498 case Tremove:
499 case Tstat:
500 case Twstat:
501 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
502 err(m, "unknown fid");
503 continue;
505 m->fid->ref++;
506 if(m->tx.type==Twstat && dotu && !c->dotu){
507 if(cvtustat(&m->tx, &m->tpkt, 1) < 0){
508 err(m, "cannot convert stat buffer");
509 continue;
512 if(m->tx.type==Tread && m->fid->isdir && dotu && !c->dotu){
513 if(m->tx.offset = m->fid->coffset)
514 m->tx.offset = m->fid->offset;
515 else
516 m->fid->offset = m->fid->coffset;
518 break;
521 /* have everything - translate and send */
522 m->c = c;
523 m->ctag = m->tx.tag;
524 m->tx.tag = m->tag;
525 if(m->fid)
526 m->tx.fid = m->fid->fid;
527 if(m->newfid)
528 m->tx.newfid = m->newfid->fid;
529 if(m->afid)
530 m->tx.afid = m->afid->fid;
531 if(m->oldm)
532 m->tx.oldtag = m->oldm->tag;
533 /* reference passes to outq */
534 sendq(outq, m);
535 while(c->nmsg >= MAXMSG){
536 c->inputstalled = 1;
537 recvp(c->inc);
541 if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
543 /* flush all outstanding messages */
544 for(i=0; i<NHASH; i++){
545 while((h = c->tag[i]) != nil){
546 om = h->v;
547 msgincref(om); /* for us */
548 m = msgnew(0);
549 m->internal = 1;
550 m->c = c;
551 c->nmsg++;
552 m->tx.type = Tflush;
553 m->tx.tag = m->tag;
554 m->tx.oldtag = om->tag;
555 m->oldm = om;
556 msgincref(om);
557 msgincref(m); /* for outq */
558 sendomsg(m);
559 mm = recvp(c->internal);
560 assert(mm == m);
561 msgput(m); /* got from recvp */
562 msgput(m); /* got from msgnew */
563 if(delhash(c->tag, om->ctag, om) == 0)
564 msgput(om); /* got from hash table */
565 msgput(om); /* got from msgincref */
569 /*
570 * outputthread has written all its messages
571 * to the remote connection (because we've gotten all the replies!),
572 * but it might not have gotten a chance to msgput
573 * the very last one. sync up to make sure.
574 */
575 memset(&sync, 0, sizeof sync);
576 sync.sync = 1;
577 sync.c = c;
578 sendq(outq, &sync);
579 recvp(c->outqdead);
581 /* everything is quiet; can close the local output queue. */
582 sendq(c->outq, nil);
583 recvp(c->outqdead);
585 /* should be no messages left anywhere. */
586 assert(c->nmsg == 0);
588 /* clunk all outstanding fids */
589 for(i=0; i<NHASH; i++){
590 for(h=c->fid[i]; h; h=hnext){
591 f = h->v;
592 m = msgnew(0);
593 m->internal = 1;
594 m->c = c;
595 c->nmsg++;
596 m->tx.type = Tclunk;
597 m->tx.tag = m->tag;
598 m->tx.fid = f->fid;
599 m->fid = f;
600 f->ref++;
601 msgincref(m);
602 sendomsg(m);
603 mm = recvp(c->internal);
604 assert(mm == m);
605 msgclear(m);
606 msgput(m); /* got from recvp */
607 msgput(m); /* got from msgnew */
608 fidput(f); /* got from hash table */
609 hnext = h->next;
610 free(h);
614 out:
615 closeioproc(io);
616 assert(c->nmsg == 0);
617 assert(c->nfid == 0);
618 close(c->fd);
619 chanfree(c->internal);
620 c->internal = 0;
621 chanfree(c->inc);
622 c->inc = 0;
623 free(c->inq);
624 c->inq = 0;
625 free(c);
628 static void
629 openfdthread(void *v)
631 Conn *c;
632 Fid *fid;
633 Msg *m;
634 int n;
635 vlong tot;
636 Ioproc *io;
637 char buf[1024];
639 c = v;
640 fid = c->fdfid;
641 io = ioproc();
642 threadsetname("openfd %s", c->fdfid);
643 tot = 0;
644 m = nil;
645 if(c->fdmode == OREAD){
646 for(;;){
647 if(verbose) fprint(2, "%T tread...");
648 m = msgnew(0);
649 m->internal = 1;
650 m->c = c;
651 m->tx.type = Tread;
652 m->tx.count = msize - IOHDRSZ;
653 m->tx.fid = fid->fid;
654 m->tx.tag = m->tag;
655 m->tx.offset = tot;
656 m->fid = fid;
657 fid->ref++;
658 msgincref(m);
659 sendomsg(m);
660 recvp(c->internal);
661 if(m->rx.type == Rerror){
662 /* fprint(2, "%T read error: %s\n", m->rx.ename); */
663 break;
665 if(m->rx.count == 0)
666 break;
667 tot += m->rx.count;
668 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
669 /* fprint(2, "%T pipe write error: %r\n"); */
670 break;
672 msgput(m);
673 msgput(m);
674 m = nil;
676 }else{
677 for(;;){
678 if(verbose) fprint(2, "%T twrite...");
679 n = sizeof buf;
680 if(n > msize)
681 n = msize;
682 if((n=ioread(io, c->fd, buf, n)) <= 0){
683 if(n < 0)
684 fprint(2, "%T pipe read error: %r\n");
685 break;
687 m = msgnew(0);
688 m->internal = 1;
689 m->c = c;
690 m->tx.type = Twrite;
691 m->tx.fid = fid->fid;
692 m->tx.data = buf;
693 m->tx.count = n;
694 m->tx.tag = m->tag;
695 m->tx.offset = tot;
696 m->fid = fid;
697 fid->ref++;
698 msgincref(m);
699 sendomsg(m);
700 recvp(c->internal);
701 if(m->rx.type == Rerror){
702 /* fprint(2, "%T write error: %s\n", m->rx.ename); */
704 tot += n;
705 msgput(m);
706 msgput(m);
707 m = nil;
710 if(verbose) fprint(2, "%T eof on %d fid %d\n", c->fd, fid->fid);
711 close(c->fd);
712 closeioproc(io);
713 if(m){
714 msgput(m);
715 msgput(m);
717 if(verbose) fprint(2, "%T eof on %d fid %d ref %d\n", c->fd, fid->fid, fid->ref);
718 if(--fid->openfd == 0){
719 m = msgnew(0);
720 m->internal = 1;
721 m->c = c;
722 m->tx.type = Tclunk;
723 m->tx.tag = m->tag;
724 m->tx.fid = fid->fid;
725 m->fid = fid;
726 fid->ref++;
727 msgincref(m);
728 sendomsg(m);
729 recvp(c->internal);
730 msgput(m);
731 msgput(m);
733 fidput(fid);
734 c->fdfid = nil;
735 chanfree(c->internal);
736 c->internal = 0;
737 free(c);
740 int
741 xopenfd(Msg *m)
743 char errs[ERRMAX];
744 int n, p[2];
745 Conn *nc;
747 if(pipe(p) < 0){
748 rerrstr(errs, sizeof errs);
749 err(m, errs);
750 /* XXX return here? */
752 if(verbose) fprint(2, "%T xopen pipe %d %d...", p[0], p[1]);
754 /* now we're committed. */
756 /* a new connection for this fid */
757 nc = emalloc(sizeof(Conn));
758 nc->internal = chancreate(sizeof(void*), 0);
760 /* a ref for us */
761 nc->fdfid = m->fid;
762 m->fid->ref++;
763 nc->fdfid->openfd++;
764 nc->fdmode = m->tx.mode;
765 nc->fd = p[0];
767 /* a thread to tend the pipe */
768 threadcreate(openfdthread, nc, STACK);
770 /* if mode is ORDWR, that openfdthread will write; start a reader */
771 if((m->tx.mode&3) == ORDWR){
772 nc = emalloc(sizeof(Conn));
773 nc->internal = chancreate(sizeof(void*), 0);
774 nc->fdfid = m->fid;
775 m->fid->ref++;
776 nc->fdfid->openfd++;
777 nc->fdmode = OREAD;
778 nc->fd = dup(p[0], -1);
779 threadcreate(openfdthread, nc, STACK);
782 /* steal fid from other connection */
783 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
784 fidput(m->fid);
786 /* rewrite as Ropenfd */
787 m->rx.type = Ropenfd;
788 n = GBIT32(m->rpkt);
789 m->rpkt = erealloc(m->rpkt, n+4);
790 PBIT32(m->rpkt+n, p[1]);
791 n += 4;
792 PBIT32(m->rpkt, n);
793 m->rpkt[4] = Ropenfd;
794 m->rx.unixfd = p[1];
795 return 0;
798 void
799 connoutthread(void *arg)
801 char *ename;
802 int err;
803 Conn *c;
804 Msg *m, *om;
805 Ioproc *io;
807 c = arg;
808 io = ioproc();
809 threadsetname("connout %s", c->dir);
810 while((m = recvq(c->outq)) != nil){
811 err = m->tx.type+1 != m->rx.type;
812 if(!err && m->isopenfd)
813 if(xopenfd(m) < 0)
814 continue;
815 switch(m->tx.type){
816 case Tflush:
817 om = m->oldm;
818 if(om)
819 if(delhash(om->c->tag, om->ctag, om) == 0)
820 msgput(om);
821 break;
822 case Tclunk:
823 case Tremove:
824 if(m->fid)
825 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
826 fidput(m->fid);
827 break;
828 case Tauth:
829 if(err && m->afid){
830 if(verbose) fprint(2, "%T auth error\n");
831 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
832 fidput(m->afid);
834 break;
835 case Tattach:
836 if(err && m->fid)
837 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
838 fidput(m->fid);
839 break;
840 case Twalk:
841 if(err || m->rx.nwqid < m->tx.nwname)
842 if(m->tx.fid != m->tx.newfid && m->newfid)
843 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
844 fidput(m->newfid);
845 break;
846 case Tread:
847 if(!err && m->fid->isdir && dotu && !m->c->dotu){
848 m->fid->offset += m->rx.count;
849 stripudirread(m);
850 m->fid->coffset += m->rx.count;
852 break;
853 case Tstat:
854 if(!err && dotu && !m->c->dotu)
855 cvtustat(&m->rx, &m->rpkt, 0);
856 break;
857 case Topen:
858 case Tcreate:
859 m->fid->isdir = (m->rx.qid.type & QTDIR);
860 break;
862 if(m->rx.type==Rerror && dotu && !c->dotu){
863 ename = estrdup(m->rx.ename);
864 m->rx.ename = ename;
865 repack(&m->rx, &m->rpkt, c->dotu);
866 free(ename);
867 m->rx.ename = "XXX";
869 if(delhash(m->c->tag, m->ctag, m) == 0)
870 msgput(m);
871 if(verbose > 1) fprint(2, "%T fd#%d <- %F\n", c->fd, &m->rx);
872 rewritehdr(&m->rx, m->rpkt);
873 if(mwrite9p(io, c->fd, m->rpkt) < 0)
874 if(verbose) fprint(2, "%T write error: %r\n");
875 msgput(m);
876 if(c->inputstalled && c->nmsg < MAXMSG)
877 nbsendp(c->inc, 0);
879 closeioproc(io);
880 free(c->outq);
881 c->outq = nil;
882 sendp(c->outqdead, nil);
885 void
886 outputthread(void *arg)
888 Msg *m;
889 Ioproc *io;
891 USED(arg);
892 io = ioproc();
893 threadsetname("output");
894 while((m = recvq(outq)) != nil){
895 if(m->sync){
896 sendp(m->c->outqdead, nil);
897 continue;
899 if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
900 rewritehdr(&m->tx, m->tpkt);
901 if(mwrite9p(io, 1, m->tpkt) < 0)
902 sysfatal("output error: %r");
903 msgput(m);
905 closeioproc(io);
906 fprint(2, "%T output eof\n");
907 threadexitsall(0);
910 void
911 inputthread(void *arg)
913 uchar *pkt;
914 int n, nn, tag;
915 Msg *m;
916 Ioproc *io;
918 threadsetname("input");
919 if(verbose) fprint(2, "%T input thread\n");
920 io = ioproc();
921 USED(arg);
922 while((pkt = read9ppkt(io, 0)) != nil){
923 n = GBIT32(pkt);
924 if(n < 7){
925 fprint(2, "%T short 9P packet from server\n");
926 free(pkt);
927 continue;
929 if(verbose > 2) fprint(2, "%T read %.*H\n", n, pkt);
930 tag = GBIT16(pkt+5);
931 if((m = msgget(tag)) == nil){
932 fprint(2, "%T unexpected 9P response tag %d\n", tag);
933 free(pkt);
934 continue;
936 if((nn = convM2Su(pkt, n, &m->rx, dotu)) != n){
937 fprint(2, "%T bad packet - convM2S %d but %d\n", nn, n);
938 free(pkt);
939 msgput(m);
940 continue;
942 if(verbose > 1) fprint(2, "%T * -> %F%s\n", &m->rx,
943 m->internal ? " (internal)" : "");
944 m->rpkt = pkt;
945 m->rx.tag = m->ctag;
946 if(m->internal)
947 sendp(m->c->internal, m);
948 else if(m->c->outq)
949 sendq(m->c->outq, m);
950 else
951 msgput(m);
953 closeioproc(io);
954 /*fprint(2, "%T input eof\n"); */
955 threadexitsall(0);
958 void*
959 gethash(Hash **ht, uint n)
961 Hash *h;
963 for(h=ht[n%NHASH]; h; h=h->next)
964 if(h->n == n)
965 return h->v;
966 return nil;
969 int
970 delhash(Hash **ht, uint n, void *v)
972 Hash *h, **l;
974 for(l=&ht[n%NHASH]; h=*l; l=&h->next)
975 if(h->n == n){
976 if(h->v != v){
977 if(verbose) fprint(2, "%T delhash %d got %p want %p\n", n, h->v, v);
978 return -1;
980 *l = h->next;
981 free(h);
982 return 0;
984 return -1;
987 int
988 puthash(Hash **ht, uint n, void *v)
990 Hash *h;
992 if(gethash(ht, n))
993 return -1;
994 h = emalloc(sizeof(Hash));
995 h->next = ht[n%NHASH];
996 h->n = n;
997 h->v = v;
998 ht[n%NHASH] = h;
999 return 0;
1002 Fid **fidtab;
1003 int nfidtab;
1004 Fid *freefid;
1006 Fid*
1007 fidnew(int cfid)
1009 Fid *f;
1011 if(freefid == nil){
1012 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
1013 if(nfidtab == xafid){
1014 fidtab[nfidtab++] = nil;
1015 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
1017 fidtab[nfidtab] = emalloc(sizeof(Fid));
1018 freefid = fidtab[nfidtab];
1019 freefid->fid = nfidtab++;
1021 f = freefid;
1022 freefid = f->next;
1023 f->cfid = cfid;
1024 f->ref = 1;
1025 f->offset = 0;
1026 f->coffset = 0;
1027 f->isdir = -1;
1028 return f;
1031 void
1032 fidput(Fid *f)
1034 if(f == nil)
1035 return;
1036 assert(f->ref > 0);
1037 if(--f->ref > 0)
1038 return;
1039 f->next = freefid;
1040 f->cfid = -1;
1041 freefid = f;
1044 Msg **msgtab;
1045 int nmsgtab;
1046 int nmsg;
1047 Msg *freemsg;
1049 void
1050 msgincref(Msg *m)
1052 if(verbose > 1) fprint(2, "%T msgincref @0x%lux %p tag %d/%d ref %d=>%d\n",
1053 getcallerpc(&m), m, m->tag, m->ctag, m->ref, m->ref+1);
1054 m->ref++;
1057 Msg*
1058 msgnew(int x)
1060 Msg *m;
1062 if(freemsg == nil){
1063 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
1064 msgtab[nmsgtab] = emalloc(sizeof(Msg));
1065 freemsg = msgtab[nmsgtab];
1066 freemsg->tag = nmsgtab++;
1068 m = freemsg;
1069 freemsg = m->next;
1070 m->ref = 1;
1071 if(verbose > 1) fprint(2, "%T msgnew @0x%lux %p tag %d ref %d\n",
1072 getcallerpc(&x), m, m->tag, m->ref);
1073 nmsg++;
1074 return m;
1078 * Clear data associated with connections, so that
1079 * if all msgs have been msgcleared, the connection
1080 * can be freed. Note that this does *not* free the tpkt
1081 * and rpkt; they are freed in msgput with the msg itself.
1082 * The io write thread might still be holding a ref to msg
1083 * even once the connection has finished with it.
1085 void
1086 msgclear(Msg *m)
1088 if(m->c){
1089 m->c->nmsg--;
1090 m->c = nil;
1092 if(m->oldm){
1093 msgput(m->oldm);
1094 m->oldm = nil;
1096 if(m->fid){
1097 fidput(m->fid);
1098 m->fid = nil;
1100 if(m->afid){
1101 fidput(m->afid);
1102 m->afid = nil;
1104 if(m->newfid){
1105 fidput(m->newfid);
1106 m->newfid = nil;
1108 if(m->rx.type == Ropenfd && m->rx.unixfd >= 0){
1109 close(m->rx.unixfd);
1110 m->rx.unixfd = -1;
1114 void
1115 msgput(Msg *m)
1117 if(m == nil)
1118 return;
1120 if(verbose > 1) fprint(2, "%T msgput 0x%lux %p tag %d/%d ref %d\n",
1121 getcallerpc(&m), m, m->tag, m->ctag, m->ref);
1122 assert(m->ref > 0);
1123 if(--m->ref > 0)
1124 return;
1125 nmsg--;
1126 msgclear(m);
1127 if(m->tpkt){
1128 free(m->tpkt);
1129 m->tpkt = nil;
1131 if(m->rpkt){
1132 free(m->rpkt);
1133 m->rpkt = nil;
1135 m->isopenfd = 0;
1136 m->internal = 0;
1137 m->next = freemsg;
1138 freemsg = m;
1141 Msg*
1142 msgget(int n)
1144 Msg *m;
1146 if(n < 0 || n >= nmsgtab)
1147 return nil;
1148 m = msgtab[n];
1149 if(m->ref == 0)
1150 return nil;
1151 if(verbose) fprint(2, "%T msgget %d = %p\n", n, m);
1152 msgincref(m);
1153 return m;
1157 void*
1158 emalloc(int n)
1160 void *v;
1162 v = mallocz(n, 1);
1163 if(v == nil){
1164 abort();
1165 sysfatal("out of memory allocating %d", n);
1167 return v;
1170 void*
1171 erealloc(void *v, int n)
1173 v = realloc(v, n);
1174 if(v == nil){
1175 abort();
1176 sysfatal("out of memory reallocating %d", n);
1178 return v;
1181 typedef struct Qel Qel;
1182 struct Qel
1184 Qel *next;
1185 void *p;
1188 struct Queue
1190 QLock lk;
1191 Rendez r;
1192 Qel *head;
1193 Qel *tail;
1196 Queue*
1197 qalloc(void)
1199 Queue *q;
1201 q = mallocz(sizeof(Queue), 1);
1202 if(q == nil)
1203 return nil;
1204 q->r.l = &q->lk;
1205 return q;
1208 int
1209 sendq(Queue *q, void *p)
1211 Qel *e;
1213 e = emalloc(sizeof(Qel));
1214 qlock(&q->lk);
1215 e->p = p;
1216 e->next = nil;
1217 if(q->head == nil)
1218 q->head = e;
1219 else
1220 q->tail->next = e;
1221 q->tail = e;
1222 rwakeup(&q->r);
1223 qunlock(&q->lk);
1224 return 0;
1227 void*
1228 recvq(Queue *q)
1230 void *p;
1231 Qel *e;
1233 qlock(&q->lk);
1234 while(q->head == nil)
1235 rsleep(&q->r);
1236 e = q->head;
1237 q->head = e->next;
1238 qunlock(&q->lk);
1239 p = e->p;
1240 free(e);
1241 return p;
1244 uchar*
1245 read9ppkt(Ioproc *io, int fd)
1247 uchar buf[4], *pkt;
1248 int n, nn;
1250 n = ioreadn(io, fd, buf, 4);
1251 if(n != 4)
1252 return nil;
1253 n = GBIT32(buf);
1254 if(n > MAXMSGSIZE)
1255 return nil;
1256 pkt = emalloc(n);
1257 PBIT32(pkt, n);
1258 nn = ioreadn(io, fd, pkt+4, n-4);
1259 if(nn != n-4){
1260 free(pkt);
1261 return nil;
1263 /* would do this if we ever got one of these, but we only generate them
1264 if(pkt[4] == Ropenfd){
1265 newfd = iorecvfd(io, fd);
1266 PBIT32(pkt+n-4, newfd);
1269 return pkt;
1272 Msg*
1273 mread9p(Ioproc *io, int fd, int dotu)
1275 int n, nn;
1276 uchar *pkt;
1277 Msg *m;
1279 if((pkt = read9ppkt(io, fd)) == nil)
1280 return nil;
1282 m = msgnew(0);
1283 m->tpkt = pkt;
1284 n = GBIT32(pkt);
1285 nn = convM2Su(pkt, n, &m->tx, dotu);
1286 if(nn != n){
1287 fprint(2, "%T read bad packet from %d\n", fd);
1288 return nil;
1290 return m;
1293 int
1294 mwrite9p(Ioproc *io, int fd, uchar *pkt)
1296 int n, nfd;
1298 n = GBIT32(pkt);
1299 if(verbose > 2) fprint(2, "%T write %d %d %.*H\n", fd, n, n, pkt);
1300 if(verbose > 1) fprint(2, "%T before iowrite\n");
1301 if(iowrite(io, fd, pkt, n) != n){
1302 fprint(2, "%T write error: %r\n");
1303 return -1;
1305 if(verbose > 1) fprint(2, "%T after iowrite\n");
1306 if(pkt[4] == Ropenfd){
1307 nfd = GBIT32(pkt+n-4);
1308 if(iosendfd(io, fd, nfd) < 0){
1309 fprint(2, "%T send fd error: %r\n");
1310 return -1;
1313 return 0;
1316 void
1317 restring(uchar *pkt, int pn, char *s)
1319 int n;
1321 if(s < (char*)pkt || s >= (char*)pkt+pn)
1322 return;
1324 n = strlen(s);
1325 memmove(s+1, s, n);
1326 PBIT16((uchar*)s-1, n);
1329 void
1330 repack(Fcall *f, uchar **ppkt, int dotu)
1332 uint n, nn;
1333 uchar *pkt;
1335 pkt = *ppkt;
1336 n = GBIT32(pkt);
1337 nn = sizeS2Mu(f, dotu);
1338 if(nn > n){
1339 free(pkt);
1340 pkt = emalloc(nn);
1341 *ppkt = pkt;
1343 n = convS2Mu(f, pkt, nn, dotu);
1344 if(n <= BIT16SZ)
1345 sysfatal("convS2M conversion error");
1346 if(n != nn)
1347 sysfatal("convS2Mu and sizeS2Mu disagree");
1350 void
1351 rewritehdr(Fcall *f, uchar *pkt)
1353 int i, n;
1355 n = GBIT32(pkt);
1356 PBIT16(pkt+5, f->tag);
1357 switch(f->type){
1358 case Tversion:
1359 case Rversion:
1360 restring(pkt, n, f->version);
1361 break;
1362 case Tauth:
1363 PBIT32(pkt+7, f->afid);
1364 restring(pkt, n, f->uname);
1365 restring(pkt, n, f->aname);
1366 break;
1367 case Tflush:
1368 PBIT16(pkt+7, f->oldtag);
1369 break;
1370 case Tattach:
1371 restring(pkt, n, f->uname);
1372 restring(pkt, n, f->aname);
1373 PBIT32(pkt+7, f->fid);
1374 PBIT32(pkt+11, f->afid);
1375 break;
1376 case Twalk:
1377 PBIT32(pkt+7, f->fid);
1378 PBIT32(pkt+11, f->newfid);
1379 for(i=0; i<f->nwname; i++)
1380 restring(pkt, n, f->wname[i]);
1381 break;
1382 case Tcreate:
1383 restring(pkt, n, f->name);
1384 /* fall through */
1385 case Topen:
1386 case Tclunk:
1387 case Tremove:
1388 case Tstat:
1389 case Twstat:
1390 case Twrite:
1391 PBIT32(pkt+7, f->fid);
1392 break;
1393 case Tread:
1394 PBIT32(pkt+7, f->fid);
1395 PBIT64(pkt+11, f->offset);
1396 break;
1397 case Rerror:
1398 restring(pkt, n, f->ename);
1399 break;
1403 static long
1404 _iolisten(va_list *arg)
1406 char *a, *b;
1408 a = va_arg(*arg, char*);
1409 b = va_arg(*arg, char*);
1410 return listen(a, b);
1413 int
1414 iolisten(Ioproc *io, char *a, char *b)
1416 return iocall(io, _iolisten, a, b);
1419 static long
1420 _ioaccept(va_list *arg)
1422 int fd;
1423 char *dir;
1425 fd = va_arg(*arg, int);
1426 dir = va_arg(*arg, char*);
1427 return accept(fd, dir);
1430 int
1431 ioaccept(Ioproc *io, int fd, char *dir)
1433 return iocall(io, _ioaccept, fd, dir);
1436 int
1437 timefmt(Fmt *fmt)
1439 static char *mon[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1440 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1441 vlong ns;
1442 Tm tm;
1443 ns = nsec();
1444 tm = *localtime(time(0));
1445 return fmtprint(fmt, "%s %2d %02d:%02d:%02d.%03d",
1446 mon[tm.mon], tm.mday, tm.hour, tm.min, tm.sec,
1447 (int)(ns%1000000000)/1000000);
1450 int
1451 cvtustat(Fcall *f, uchar **fpkt, int tounix)
1453 int n;
1454 uchar *buf;
1455 char *str;
1456 Dir dir;
1458 str = emalloc(f->nstat);
1459 n = convM2Du(f->stat, f->nstat, &dir, str, !tounix);
1460 if(n <= BIT16SZ){
1461 free(str);
1462 return -1;
1465 n = sizeD2Mu(&dir, tounix);
1466 buf = emalloc(n);
1467 if(convD2Mu(&dir, buf, n, tounix) != n)
1468 sysfatal("convD2Mu conversion error");
1469 f->nstat = n;
1470 f->stat = buf;
1472 repack(f, fpkt, dotu);
1473 free(buf);
1474 f->stat = nil; /* is this okay ??? */
1475 free(str);
1477 return 0;
1480 int
1481 stripudirread(Msg* msg)
1483 char *str;
1484 int i, m, n, nn;
1485 uchar *buf;
1486 Dir d;
1487 Fcall* rx;
1489 buf = nil;
1490 str = nil;
1491 rx = &msg->rx;
1492 n = 0;
1493 nn = 0;
1494 for(i = 0; i < rx->count; i += m){
1495 m = BIT16SZ + GBIT16(&rx->data[i]);
1496 if(statchecku((uchar*)&rx->data[i], m, 1) < 0)
1497 return -1;
1498 if(nn < m)
1499 nn = m;
1500 n++;
1503 str = emalloc(nn);
1504 buf = emalloc(rx->count);
1506 nn = 0;
1507 for(i = 0; i < rx->count; i += m){
1508 m = BIT16SZ + GBIT16(&rx->data[i]);
1509 if(convM2Du((uchar*)&rx->data[i], m, &d, str, 1) != m){
1510 free(buf);
1511 free(str);
1512 return -1;
1515 n = convD2M(&d, &buf[nn], rx->count - nn);
1516 if(n <= BIT16SZ){
1517 free(buf);
1518 free(str);
1519 return -1;
1522 nn += n;
1525 rx->count = nn;
1526 rx->data = (char*)buf;
1528 repack(&msg->rx, &msg->rpkt, 0);
1529 free(str);
1530 free(buf);
1531 rx->data = nil; /* is this okay ??? */
1533 return 0;