Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <fcall.h>
4 #include <thread.h>
5 #include <errno.h>
7 #define err err9pserve /* Darwin x86 */
9 enum
10 {
11 STACK = 32768,
12 NHASH = 31,
13 MAXMSG = 64, /* per connection */
14 MAXMSGSIZE = 4*1024*1024
15 };
17 typedef struct Hash Hash;
18 typedef struct Fid Fid;
19 typedef struct Msg Msg;
20 typedef struct Conn Conn;
21 typedef struct Queue Queue;
23 struct Hash
24 {
25 Hash *next;
26 uint n;
27 void *v;
28 };
30 struct Fid
31 {
32 int fid;
33 int ref;
34 int cfid;
35 int openfd;
36 int offset;
37 int coffset;
38 int isdir;
39 Fid *next;
40 };
42 struct Msg
43 {
44 Conn *c;
45 int internal;
46 int ref;
47 int ctag;
48 int tag;
49 int isopenfd;
50 Fcall tx;
51 Fcall rx;
52 Fid *fid;
53 Fid *newfid;
54 Fid *afid;
55 Msg *oldm;
56 Msg *next;
57 uchar *tpkt;
58 uchar *rpkt;
59 };
61 struct Conn
62 {
63 int fd;
64 int fdmode;
65 Fid *fdfid;
66 int nmsg;
67 int nfid;
68 Channel *inc;
69 Channel *internal;
70 int inputstalled;
71 char dir[40];
72 Hash *tag[NHASH];
73 Hash *fid[NHASH];
74 Queue *outq;
75 Queue *inq;
76 int dotu;
77 };
79 char *xaname;
80 char *addr;
81 int afd;
82 char adir[40];
83 int isunix;
84 Queue *outq;
85 Queue *inq;
86 int verbose = 0;
87 int logging = 0;
88 int msize = 8192;
89 u32int xafid = NOFID;
90 int attached;
91 int versioned;
92 int dotu;
94 void *gethash(Hash**, uint);
95 int puthash(Hash**, uint, void*);
96 int delhash(Hash**, uint, void*);
97 Msg *mread9p(Ioproc*, int, int);
98 int mwrite9p(Ioproc*, int, uchar*);
99 uchar *read9ppkt(Ioproc*, int);
100 int write9ppkt(int, uchar*);
101 Msg *msgnew(int);
102 void msgput(Msg*);
103 void msgclear(Msg*);
104 Msg *msgget(int);
105 void msgincref(Msg*);
106 Fid *fidnew(int);
107 void fidput(Fid*);
108 void *emalloc(int);
109 void *erealloc(void*, int);
110 Queue *qalloc(void);
111 int sendq(Queue*, void*);
112 void *recvq(Queue*);
113 void connthread(void*);
114 void connoutthread(void*);
115 void listenthread(void*);
116 void outputthread(void*);
117 void inputthread(void*);
118 void rewritehdr(Fcall*, uchar*);
119 void repack(Fcall*, uchar**, int);
120 int tlisten(char*, char*);
121 int taccept(int, char*);
122 int iolisten(Ioproc*, char*, char*);
123 int ioaccept(Ioproc*, int, char*);
124 int iorecvfd(Ioproc*, int);
125 int iosendfd(Ioproc*, int, int);
126 void mainproc(void*);
127 int ignorepipe(void*, char*);
128 int timefmt(Fmt*);
129 void dorootstat(void);
130 int stripudirread(Msg*);
131 int cvtustat(Fcall*, uchar**, int);
133 void
134 usage(void)
136 fprint(2, "usage: 9pserve [-lv] [-A aname afid] [-M msize] address\n");
137 fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
138 threadexitsall("usage");
141 uchar vbuf[128];
142 extern int _threaddebuglevel;
143 void
144 threadmain(int argc, char **argv)
146 char *file, *x;
147 int fd;
149 x = getenv("verbose9pserve");
150 if(x){
151 verbose = atoi(x);
152 fprint(2, "verbose9pserve %s => %d\n", x, verbose);
154 ARGBEGIN{
155 default:
156 usage();
157 case 'A':
158 attached = 1;
159 xaname = EARGF(usage());
160 xafid = atoi(EARGF(usage()));
161 break;
162 case 'M':
163 versioned = 1;
164 msize = atoi(EARGF(usage()));
165 break;
166 case 'v':
167 verbose++;
168 break;
169 case 'u':
170 isunix++;
171 break;
172 case 'l':
173 logging++;
174 break;
175 }ARGEND
177 if(attached && !versioned){
178 fprint(2, "-A must be used with -M\n");
179 usage();
182 if(argc != 1)
183 usage();
184 addr = argv[0];
186 fmtinstall('T', timefmt);
188 if((afd = announce(addr, adir)) < 0)
189 sysfatal("announce %s: %r", addr);
190 if(logging){
191 if(strncmp(addr, "unix!", 5) == 0)
192 addr += 5;
193 file = smprint("%s.log", addr);
194 if(file == nil)
195 sysfatal("smprint log: %r");
196 if((fd = create(file, OWRITE, 0666)) < 0)
197 sysfatal("create %s: %r", file);
198 dup(fd, 2);
199 if(fd > 2)
200 close(fd);
202 if(verbose) fprint(2, "%T 9pserve running\n");
203 proccreate(mainproc, nil, STACK);
206 void
207 mainproc(void *v)
209 int n, nn;
210 Fcall f;
211 USED(v);
213 atnotify(ignorepipe, 1);
214 fmtinstall('D', dirfmt);
215 fmtinstall('M', dirmodefmt);
216 fmtinstall('F', fcallfmt);
217 fmtinstall('H', encodefmt);
219 outq = qalloc();
220 inq = qalloc();
222 if(!versioned){
223 f.type = Tversion;
224 f.version = "9P2000.u";
225 f.msize = msize;
226 f.tag = NOTAG;
227 n = convS2M(&f, vbuf, sizeof vbuf);
228 if(n <= BIT16SZ)
229 sysfatal("convS2M conversion error");
230 if(verbose > 1) fprint(2, "%T * <- %F\n", &f);
231 nn = write(1, vbuf, n);
232 if(n != nn)
233 sysfatal("error writing Tversion: %r\n");
234 n = read9pmsg(0, vbuf, sizeof vbuf);
235 if(n < 0)
236 sysfatal("read9pmsg failure");
237 if(convM2S(vbuf, n, &f) != n)
238 sysfatal("convM2S failure");
239 if(f.msize < msize)
240 msize = f.msize;
241 if(verbose > 1) fprint(2, "%T * -> %F\n", &f);
242 dotu = strncmp(f.version, "9P2000.u", 8) == 0;
245 threadcreate(inputthread, nil, STACK);
246 threadcreate(outputthread, nil, STACK);
248 /* if(rootfid) */
249 /* dorootstat(); */
251 threadcreate(listenthread, nil, STACK);
252 threadexits(0);
255 int
256 ignorepipe(void *v, char *s)
258 USED(v);
259 if(strcmp(s, "sys: write on closed pipe") == 0)
260 return 1;
261 if(strcmp(s, "sys: tstp") == 0)
262 return 1;
263 fprint(2, "9pserve %s: %T note: %s\n", addr, s);
264 return 0;
267 void
268 listenthread(void *arg)
270 Conn *c;
271 Ioproc *io;
273 io = ioproc();
274 USED(arg);
275 threadsetname("listen %s", adir);
276 for(;;){
277 c = emalloc(sizeof(Conn));
278 c->fd = iolisten(io, adir, c->dir);
279 if(c->fd < 0){
280 if(verbose) fprint(2, "%T listen: %r\n");
281 close(afd);
282 free(c);
283 return;
285 c->inc = chancreate(sizeof(void*), 0);
286 c->internal = chancreate(sizeof(void*), 0);
287 c->inq = qalloc();
288 c->outq = qalloc();
289 if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
290 threadcreate(connthread, c, STACK);
294 void
295 send9pmsg(Msg *m)
297 int n, nn;
299 n = sizeS2Mu(&m->rx, m->c->dotu);
300 m->rpkt = emalloc(n);
301 nn = convS2Mu(&m->rx, m->rpkt, n, m->c->dotu);
302 if(nn <= BIT16SZ)
303 sysfatal("convS2Mu conversion error");
304 if(nn != n)
305 sysfatal("sizeS2Mu and convS2Mu disagree");
306 sendq(m->c->outq, m);
309 void
310 sendomsg(Msg *m)
312 int n, nn;
314 n = sizeS2Mu(&m->tx, m->c->dotu);
315 m->tpkt = emalloc(n);
316 nn = convS2Mu(&m->tx, m->tpkt, n, m->c->dotu);
317 if(nn <= BIT16SZ)
318 sysfatal("convS2Mu conversion error");
319 if(nn != n)
320 sysfatal("sizeS2Mu and convS2Mu disagree");
321 sendq(outq, m);
324 void
325 err(Msg *m, char *ename)
327 m->rx.type = Rerror;
328 m->rx.ename = ename;
329 m->rx.tag = m->tx.tag;
330 send9pmsg(m);
333 char*
334 estrdup(char *s)
336 char *t;
338 t = emalloc(strlen(s)+1);
339 strcpy(t, s);
340 return t;
343 void
344 connthread(void *arg)
346 int i, fd;
347 Conn *c;
348 Hash *h, *hnext;
349 Msg *m, *om, *mm;
350 Fid *f;
351 Ioproc *io;
353 c = arg;
354 threadsetname("conn %s", c->dir);
355 io = ioproc();
356 fd = ioaccept(io, c->fd, c->dir);
357 if(fd < 0){
358 if(verbose) fprint(2, "%T accept %s: %r\n", c->dir);
359 goto out;
361 close(c->fd);
362 c->fd = fd;
363 threadcreate(connoutthread, c, STACK);
364 while((m = mread9p(io, c->fd, c->dotu)) != nil){
365 if(verbose > 1) fprint(2, "%T fd#%d -> %F\n", c->fd, &m->tx);
366 m->c = c;
367 m->ctag = m->tx.tag;
368 c->nmsg++;
369 if(verbose > 1) fprint(2, "%T fd#%d: new msg %p\n", c->fd, m);
370 if(puthash(c->tag, m->tx.tag, m) < 0){
371 err(m, "duplicate tag");
372 continue;
374 msgincref(m);
375 switch(m->tx.type){
376 case Tversion:
377 m->rx.tag = m->tx.tag;
378 m->rx.msize = m->tx.msize;
379 if(m->rx.msize > msize)
380 m->rx.msize = msize;
381 m->rx.version = "9P2000";
382 c->dotu = 0;
383 if(dotu && strncmp(m->tx.version, "9P2000.u", 8) == 0){
384 m->rx.version = "9P2000.u";
385 c->dotu = 1;
387 m->rx.type = Rversion;
388 send9pmsg(m);
389 continue;
390 case Tflush:
391 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
392 m->rx.tag = m->tx.tag;
393 m->rx.type = Rflush;
394 send9pmsg(m);
395 continue;
397 msgincref(m->oldm);
398 break;
399 case Tattach:
400 m->afid = nil;
401 if(m->tx.afid != NOFID
402 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){
403 err(m, "unknown fid");
404 continue;
406 if(m->afid)
407 m->afid->ref++;
408 m->fid = fidnew(m->tx.fid);
409 if(puthash(c->fid, m->tx.fid, m->fid) < 0){
410 err(m, "duplicate fid");
411 continue;
413 m->fid->ref++;
414 if(attached && m->afid==nil){
415 if(m->tx.aname[0] && strcmp(xaname, m->tx.aname) != 0){
416 err(m, "invalid attach name");
417 continue;
419 m->tx.afid = xafid;
420 m->tx.aname = xaname;
421 m->tx.uname = estrdup(m->tx.uname);
422 repack(&m->tx, &m->tpkt, c->dotu);
423 free(m->tx.uname);
424 m->tx.uname = "XXX";
426 break;
427 case Twalk:
428 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
429 err(m, "unknown fid");
430 continue;
432 m->fid->ref++;
433 if(m->tx.newfid == m->tx.fid){
434 m->fid->ref++;
435 m->newfid = m->fid;
436 }else{
437 m->newfid = fidnew(m->tx.newfid);
438 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
439 err(m, "duplicate fid");
440 continue;
442 m->newfid->ref++;
444 break;
445 case Tauth:
446 if(attached){
447 err(m, "authentication not required");
448 continue;
450 m->afid = fidnew(m->tx.afid);
451 if(puthash(c->fid, m->tx.afid, m->afid) < 0){
452 err(m, "duplicate fid");
453 continue;
455 m->afid->ref++;
456 break;
457 case Tcreate:
458 if(dotu && !c->dotu && (m->tx.perm&(DMSYMLINK|DMDEVICE|DMNAMEDPIPE|DMSOCKET))){
459 err(m, "unsupported file type");
460 continue;
462 goto caseTopen;
463 case Topenfd:
464 if(m->tx.mode&~(OTRUNC|3)){
465 err(m, "bad openfd mode");
466 continue;
468 m->isopenfd = 1;
469 m->tx.type = Topen;
470 m->tpkt[4] = Topen;
471 /* fall through */
472 caseTopen:
473 case Topen:
474 case Tclunk:
475 case Tread:
476 case Twrite:
477 case Tremove:
478 case Tstat:
479 case Twstat:
480 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
481 err(m, "unknown fid");
482 continue;
484 m->fid->ref++;
485 if(m->tx.type==Twstat && dotu && !c->dotu){
486 if(cvtustat(&m->tx, &m->tpkt, 1) < 0){
487 err(m, "cannot convert stat buffer");
488 continue;
491 if(m->tx.type==Tread && m->fid->isdir && dotu && !c->dotu){
492 if(m->tx.offset = m->fid->coffset)
493 m->tx.offset = m->fid->offset;
494 else
495 m->fid->offset = m->fid->coffset;
497 break;
500 /* have everything - translate and send */
501 m->c = c;
502 m->ctag = m->tx.tag;
503 m->tx.tag = m->tag;
504 if(m->fid)
505 m->tx.fid = m->fid->fid;
506 if(m->newfid)
507 m->tx.newfid = m->newfid->fid;
508 if(m->afid)
509 m->tx.afid = m->afid->fid;
510 if(m->oldm)
511 m->tx.oldtag = m->oldm->tag;
512 /* reference passes to outq */
513 sendq(outq, m);
514 while(c->nmsg >= MAXMSG){
515 c->inputstalled = 1;
516 recvp(c->inc);
520 if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
522 /* flush the output queue */
523 sendq(c->outq, nil);
524 while(c->outq != nil)
525 yield();
527 /* flush all outstanding messages */
528 for(i=0; i<NHASH; i++){
529 for(h=c->tag[i]; h; h=hnext){
530 om = h->v;
531 m = msgnew(0);
532 m->internal = 1;
533 m->c = c;
534 c->nmsg++;
535 m->tx.type = Tflush;
536 m->tx.tag = m->tag;
537 m->tx.oldtag = om->tag;
538 m->oldm = om;
539 msgincref(om);
540 msgincref(m); /* for outq */
541 sendomsg(m);
542 mm = recvp(c->internal);
543 assert(mm == m);
544 msgput(m); /* got from recvp */
545 msgput(m); /* got from msgnew */
546 msgput(om); /* got from hash table */
547 hnext = h->next;
548 free(h);
552 /* clunk all outstanding fids */
553 for(i=0; i<NHASH; i++){
554 for(h=c->fid[i]; h; h=hnext){
555 f = h->v;
556 m = msgnew(0);
557 m->internal = 1;
558 m->c = c;
559 c->nmsg++;
560 m->tx.type = Tclunk;
561 m->tx.tag = m->tag;
562 m->tx.fid = f->fid;
563 m->fid = f;
564 f->ref++;
565 msgincref(m);
566 sendomsg(m);
567 mm = recvp(c->internal);
568 assert(mm == m);
569 msgclear(m);
570 msgput(m); /* got from recvp */
571 msgput(m); /* got from msgnew */
572 fidput(f); /* got from hash table */
573 hnext = h->next;
574 free(h);
578 out:
579 closeioproc(io);
580 assert(c->nmsg == 0);
581 assert(c->nfid == 0);
582 close(c->fd);
583 chanfree(c->internal);
584 c->internal = 0;
585 chanfree(c->inc);
586 c->inc = 0;
587 free(c->inq);
588 c->inq = 0;
589 free(c);
592 static void
593 openfdthread(void *v)
595 Conn *c;
596 Fid *fid;
597 Msg *m;
598 int n;
599 vlong tot;
600 Ioproc *io;
601 char buf[1024];
603 c = v;
604 fid = c->fdfid;
605 io = ioproc();
606 threadsetname("openfd %s", c->fdfid);
607 tot = 0;
608 m = nil;
609 if(c->fdmode == OREAD){
610 for(;;){
611 if(verbose) fprint(2, "%T tread...");
612 m = msgnew(0);
613 m->internal = 1;
614 m->c = c;
615 m->tx.type = Tread;
616 m->tx.count = msize - IOHDRSZ;
617 m->tx.fid = fid->fid;
618 m->tx.tag = m->tag;
619 m->tx.offset = tot;
620 m->fid = fid;
621 fid->ref++;
622 msgincref(m);
623 sendomsg(m);
624 recvp(c->internal);
625 if(m->rx.type == Rerror){
626 /* fprint(2, "%T read error: %s\n", m->rx.ename); */
627 break;
629 if(m->rx.count == 0)
630 break;
631 tot += m->rx.count;
632 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
633 /* fprint(2, "%T pipe write error: %r\n"); */
634 break;
636 msgput(m);
637 msgput(m);
638 m = nil;
640 }else{
641 for(;;){
642 if(verbose) fprint(2, "%T twrite...");
643 n = sizeof buf;
644 if(n > msize)
645 n = msize;
646 if((n=ioread(io, c->fd, buf, n)) <= 0){
647 if(n < 0)
648 fprint(2, "%T pipe read error: %r\n");
649 break;
651 m = msgnew(0);
652 m->internal = 1;
653 m->c = c;
654 m->tx.type = Twrite;
655 m->tx.fid = fid->fid;
656 m->tx.data = buf;
657 m->tx.count = n;
658 m->tx.tag = m->tag;
659 m->tx.offset = tot;
660 m->fid = fid;
661 fid->ref++;
662 msgincref(m);
663 sendomsg(m);
664 recvp(c->internal);
665 if(m->rx.type == Rerror){
666 /* fprint(2, "%T write error: %s\n", m->rx.ename); */
668 tot += n;
669 msgput(m);
670 msgput(m);
671 m = nil;
674 if(verbose) fprint(2, "%T eof on %d fid %d\n", c->fd, fid->fid);
675 close(c->fd);
676 closeioproc(io);
677 if(m){
678 msgput(m);
679 msgput(m);
681 if(verbose) fprint(2, "%T eof on %d fid %d ref %d\n", c->fd, fid->fid, fid->ref);
682 if(--fid->openfd == 0){
683 m = msgnew(0);
684 m->internal = 1;
685 m->c = c;
686 m->tx.type = Tclunk;
687 m->tx.tag = m->tag;
688 m->tx.fid = fid->fid;
689 m->fid = fid;
690 fid->ref++;
691 msgincref(m);
692 sendomsg(m);
693 recvp(c->internal);
694 msgput(m);
695 msgput(m);
697 fidput(fid);
698 c->fdfid = nil;
699 chanfree(c->internal);
700 c->internal = 0;
701 free(c);
704 int
705 xopenfd(Msg *m)
707 char errs[ERRMAX];
708 int n, p[2];
709 Conn *nc;
711 if(pipe(p) < 0){
712 rerrstr(errs, sizeof errs);
713 err(m, errs);
714 /* XXX return here? */
716 if(verbose) fprint(2, "%T xopen pipe %d %d...", p[0], p[1]);
718 /* now we're committed. */
720 /* a new connection for this fid */
721 nc = emalloc(sizeof(Conn));
722 nc->internal = chancreate(sizeof(void*), 0);
724 /* a ref for us */
725 nc->fdfid = m->fid;
726 m->fid->ref++;
727 nc->fdfid->openfd++;
728 nc->fdmode = m->tx.mode;
729 nc->fd = p[0];
731 /* a thread to tend the pipe */
732 threadcreate(openfdthread, nc, STACK);
734 /* if mode is ORDWR, that openfdthread will write; start a reader */
735 if((m->tx.mode&3) == ORDWR){
736 nc = emalloc(sizeof(Conn));
737 nc->internal = chancreate(sizeof(void*), 0);
738 nc->fdfid = m->fid;
739 m->fid->ref++;
740 nc->fdfid->openfd++;
741 nc->fdmode = OREAD;
742 nc->fd = dup(p[0], -1);
743 threadcreate(openfdthread, nc, STACK);
746 /* steal fid from other connection */
747 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
748 fidput(m->fid);
750 /* rewrite as Ropenfd */
751 m->rx.type = Ropenfd;
752 n = GBIT32(m->rpkt);
753 m->rpkt = erealloc(m->rpkt, n+4);
754 PBIT32(m->rpkt+n, p[1]);
755 n += 4;
756 PBIT32(m->rpkt, n);
757 m->rpkt[4] = Ropenfd;
758 m->rx.unixfd = p[1];
759 return 0;
762 void
763 connoutthread(void *arg)
765 char *ename;
766 int err;
767 Conn *c;
768 Queue *outq;
769 Msg *m, *om;
770 Ioproc *io;
772 c = arg;
773 outq = c->outq;
774 io = ioproc();
775 threadsetname("connout %s", c->dir);
776 while((m = recvq(outq)) != nil){
777 err = m->tx.type+1 != m->rx.type;
778 if(!err && m->isopenfd)
779 if(xopenfd(m) < 0)
780 continue;
781 switch(m->tx.type){
782 case Tflush:
783 om = m->oldm;
784 if(om)
785 if(delhash(om->c->tag, om->ctag, om) == 0)
786 msgput(om);
787 break;
788 case Tclunk:
789 case Tremove:
790 if(m->fid)
791 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
792 fidput(m->fid);
793 break;
794 case Tauth:
795 if(err && m->afid){
796 if(verbose) fprint(2, "%T auth error\n");
797 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
798 fidput(m->afid);
800 break;
801 case Tattach:
802 if(err && m->fid)
803 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
804 fidput(m->fid);
805 break;
806 case Twalk:
807 if(err || m->rx.nwqid < m->tx.nwname)
808 if(m->tx.fid != m->tx.newfid && m->newfid)
809 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
810 fidput(m->newfid);
811 break;
812 case Tread:
813 if(!err && m->fid->isdir && dotu && !m->c->dotu){
814 m->fid->offset += m->rx.count;
815 stripudirread(m);
816 m->fid->coffset += m->rx.count;
818 break;
819 case Tstat:
820 if(!err && dotu && !m->c->dotu)
821 cvtustat(&m->rx, &m->rpkt, 0);
822 break;
823 case Topen:
824 case Tcreate:
825 m->fid->isdir = (m->rx.qid.type & QTDIR);
826 break;
828 if(m->rx.type==Rerror && dotu && !c->dotu){
829 ename = estrdup(m->rx.ename);
830 m->rx.ename = ename;
831 repack(&m->rx, &m->rpkt, c->dotu);
832 free(ename);
833 m->rx.ename = "XXX";
835 if(delhash(m->c->tag, m->ctag, m) == 0)
836 msgput(m);
837 if(verbose > 1) fprint(2, "%T fd#%d <- %F\n", c->fd, &m->rx);
838 rewritehdr(&m->rx, m->rpkt);
839 if(mwrite9p(io, c->fd, m->rpkt) < 0)
840 if(verbose) fprint(2, "%T write error: %r\n");
841 msgput(m);
842 if(c->inputstalled && c->nmsg < MAXMSG)
843 nbsendp(c->inc, 0);
845 closeioproc(io);
846 free(outq);
847 c->outq = nil;
850 void
851 outputthread(void *arg)
853 Msg *m;
854 Ioproc *io;
856 USED(arg);
857 io = ioproc();
858 threadsetname("output");
859 while((m = recvq(outq)) != nil){
860 if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
861 rewritehdr(&m->tx, m->tpkt);
862 if(mwrite9p(io, 1, m->tpkt) < 0)
863 sysfatal("output error: %r");
864 msgput(m);
866 closeioproc(io);
867 fprint(2, "%T output eof\n");
868 threadexitsall(0);
871 void
872 inputthread(void *arg)
874 uchar *pkt;
875 int n, nn, tag;
876 Msg *m;
877 Ioproc *io;
879 threadsetname("input");
880 if(verbose) fprint(2, "%T input thread\n");
881 io = ioproc();
882 USED(arg);
883 while((pkt = read9ppkt(io, 0)) != nil){
884 n = GBIT32(pkt);
885 if(n < 7){
886 fprint(2, "%T short 9P packet from server\n");
887 free(pkt);
888 continue;
890 if(verbose > 2) fprint(2, "%T read %.*H\n", n, pkt);
891 tag = GBIT16(pkt+5);
892 if((m = msgget(tag)) == nil){
893 fprint(2, "%T unexpected 9P response tag %d\n", tag);
894 free(pkt);
895 continue;
897 if((nn = convM2Su(pkt, n, &m->rx, dotu)) != n){
898 fprint(2, "%T bad packet - convM2S %d but %d\n", nn, n);
899 free(pkt);
900 msgput(m);
901 continue;
903 if(verbose > 1) fprint(2, "%T * -> %F%s\n", &m->rx,
904 m->internal ? " (internal)" : "");
905 m->rpkt = pkt;
906 m->rx.tag = m->ctag;
907 if(m->internal)
908 sendp(m->c->internal, m);
909 else if(m->c->outq)
910 sendq(m->c->outq, m);
911 else
912 msgput(m);
914 closeioproc(io);
915 /*fprint(2, "%T input eof\n"); */
916 threadexitsall(0);
919 void*
920 gethash(Hash **ht, uint n)
922 Hash *h;
924 for(h=ht[n%NHASH]; h; h=h->next)
925 if(h->n == n)
926 return h->v;
927 return nil;
930 int
931 delhash(Hash **ht, uint n, void *v)
933 Hash *h, **l;
935 for(l=&ht[n%NHASH]; h=*l; l=&h->next)
936 if(h->n == n){
937 if(h->v != v){
938 if(verbose) fprint(2, "%T delhash %d got %p want %p\n", n, h->v, v);
939 return -1;
941 *l = h->next;
942 free(h);
943 return 0;
945 return -1;
948 int
949 puthash(Hash **ht, uint n, void *v)
951 Hash *h;
953 if(gethash(ht, n))
954 return -1;
955 h = emalloc(sizeof(Hash));
956 h->next = ht[n%NHASH];
957 h->n = n;
958 h->v = v;
959 ht[n%NHASH] = h;
960 return 0;
963 Fid **fidtab;
964 int nfidtab;
965 Fid *freefid;
967 Fid*
968 fidnew(int cfid)
970 Fid *f;
972 if(freefid == nil){
973 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
974 if(nfidtab == xafid){
975 fidtab[nfidtab++] = nil;
976 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
978 fidtab[nfidtab] = emalloc(sizeof(Fid));
979 freefid = fidtab[nfidtab];
980 freefid->fid = nfidtab++;
982 f = freefid;
983 freefid = f->next;
984 f->cfid = cfid;
985 f->ref = 1;
986 f->offset = 0;
987 f->coffset = 0;
988 f->isdir = -1;
989 return f;
992 void
993 fidput(Fid *f)
995 if(f == nil)
996 return;
997 assert(f->ref > 0);
998 if(--f->ref > 0)
999 return;
1000 f->next = freefid;
1001 f->cfid = -1;
1002 freefid = f;
1005 Msg **msgtab;
1006 int nmsgtab;
1007 int nmsg;
1008 Msg *freemsg;
1010 void
1011 msgincref(Msg *m)
1013 if(verbose > 1) fprint(2, "%T msgincref @0x%lux %p tag %d/%d ref %d=>%d\n",
1014 getcallerpc(&m), m, m->tag, m->ctag, m->ref, m->ref+1);
1015 m->ref++;
1018 Msg*
1019 msgnew(int x)
1021 Msg *m;
1023 if(freemsg == nil){
1024 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
1025 msgtab[nmsgtab] = emalloc(sizeof(Msg));
1026 freemsg = msgtab[nmsgtab];
1027 freemsg->tag = nmsgtab++;
1029 m = freemsg;
1030 freemsg = m->next;
1031 m->ref = 1;
1032 if(verbose > 1) fprint(2, "%T msgnew @0x%lux %p tag %d ref %d\n",
1033 getcallerpc(&x), m, m->tag, m->ref);
1034 nmsg++;
1035 return m;
1039 * Clear data associated with connections, so that
1040 * if all msgs have been msgcleared, the connection
1041 * can be freed. Note that this does *not* free the tpkt
1042 * and rpkt; they are freed in msgput with the msg itself.
1043 * The io write thread might still be holding a ref to msg
1044 * even once the connection has finished with it.
1046 void
1047 msgclear(Msg *m)
1049 if(m->c){
1050 m->c->nmsg--;
1051 m->c = nil;
1053 if(m->oldm){
1054 msgput(m->oldm);
1055 m->oldm = nil;
1057 if(m->fid){
1058 fidput(m->fid);
1059 m->fid = nil;
1061 if(m->afid){
1062 fidput(m->afid);
1063 m->afid = nil;
1065 if(m->newfid){
1066 fidput(m->newfid);
1067 m->newfid = nil;
1069 if(m->rx.type == Ropenfd && m->rx.unixfd >= 0){
1070 close(m->rx.unixfd);
1071 m->rx.unixfd = -1;
1075 void
1076 msgput(Msg *m)
1078 if(m == nil)
1079 return;
1081 if(verbose > 1) fprint(2, "%T msgput 0x%lux %p tag %d/%d ref %d\n",
1082 getcallerpc(&m), m, m->tag, m->ctag, m->ref);
1083 assert(m->ref > 0);
1084 if(--m->ref > 0)
1085 return;
1086 nmsg--;
1087 msgclear(m);
1088 if(m->tpkt){
1089 free(m->tpkt);
1090 m->tpkt = nil;
1092 if(m->rpkt){
1093 free(m->rpkt);
1094 m->rpkt = nil;
1096 m->isopenfd = 0;
1097 m->internal = 0;
1098 m->next = freemsg;
1099 freemsg = m;
1102 Msg*
1103 msgget(int n)
1105 Msg *m;
1107 if(n < 0 || n >= nmsgtab)
1108 return nil;
1109 m = msgtab[n];
1110 if(m->ref == 0)
1111 return nil;
1112 if(verbose) fprint(2, "%T msgget %d = %p\n", n, m);
1113 msgincref(m);
1114 return m;
1118 void*
1119 emalloc(int n)
1121 void *v;
1123 v = mallocz(n, 1);
1124 if(v == nil){
1125 abort();
1126 sysfatal("out of memory allocating %d", n);
1128 return v;
1131 void*
1132 erealloc(void *v, int n)
1134 v = realloc(v, n);
1135 if(v == nil){
1136 abort();
1137 sysfatal("out of memory reallocating %d", n);
1139 return v;
1142 typedef struct Qel Qel;
1143 struct Qel
1145 Qel *next;
1146 void *p;
1149 struct Queue
1151 int hungup;
1152 QLock lk;
1153 Rendez r;
1154 Qel *head;
1155 Qel *tail;
1158 Queue*
1159 qalloc(void)
1161 Queue *q;
1163 q = mallocz(sizeof(Queue), 1);
1164 if(q == nil)
1165 return nil;
1166 q->r.l = &q->lk;
1167 return q;
1170 int
1171 sendq(Queue *q, void *p)
1173 Qel *e;
1175 e = emalloc(sizeof(Qel));
1176 qlock(&q->lk);
1177 if(q->hungup){
1178 free(e);
1179 werrstr("hungup queue");
1180 qunlock(&q->lk);
1181 return -1;
1183 e->p = p;
1184 e->next = nil;
1185 if(q->head == nil)
1186 q->head = e;
1187 else
1188 q->tail->next = e;
1189 q->tail = e;
1190 rwakeup(&q->r);
1191 qunlock(&q->lk);
1192 return 0;
1195 void*
1196 recvq(Queue *q)
1198 void *p;
1199 Qel *e;
1201 qlock(&q->lk);
1202 while(q->head == nil && !q->hungup)
1203 rsleep(&q->r);
1204 if(q->hungup){
1205 qunlock(&q->lk);
1206 return nil;
1208 e = q->head;
1209 q->head = e->next;
1210 qunlock(&q->lk);
1211 p = e->p;
1212 free(e);
1213 return p;
1216 uchar*
1217 read9ppkt(Ioproc *io, int fd)
1219 uchar buf[4], *pkt;
1220 int n, nn;
1222 n = ioreadn(io, fd, buf, 4);
1223 if(n != 4)
1224 return nil;
1225 n = GBIT32(buf);
1226 if(n > MAXMSGSIZE)
1227 return nil;
1228 pkt = emalloc(n);
1229 PBIT32(pkt, n);
1230 nn = ioreadn(io, fd, pkt+4, n-4);
1231 if(nn != n-4){
1232 free(pkt);
1233 return nil;
1235 /* would do this if we ever got one of these, but we only generate them
1236 if(pkt[4] == Ropenfd){
1237 newfd = iorecvfd(io, fd);
1238 PBIT32(pkt+n-4, newfd);
1241 return pkt;
1244 Msg*
1245 mread9p(Ioproc *io, int fd, int dotu)
1247 int n, nn;
1248 uchar *pkt;
1249 Msg *m;
1251 if((pkt = read9ppkt(io, fd)) == nil)
1252 return nil;
1254 m = msgnew(0);
1255 m->tpkt = pkt;
1256 n = GBIT32(pkt);
1257 nn = convM2Su(pkt, n, &m->tx, dotu);
1258 if(nn != n){
1259 fprint(2, "%T read bad packet from %d\n", fd);
1260 return nil;
1262 return m;
1265 int
1266 mwrite9p(Ioproc *io, int fd, uchar *pkt)
1268 int n, nfd;
1270 n = GBIT32(pkt);
1271 if(verbose > 2) fprint(2, "%T write %d %d %.*H\n", fd, n, n, pkt);
1272 if(verbose > 1) fprint(2, "%T before iowrite\n");
1273 if(iowrite(io, fd, pkt, n) != n){
1274 fprint(2, "%T write error: %r\n");
1275 return -1;
1277 if(verbose > 1) fprint(2, "%T after iowrite\n");
1278 if(pkt[4] == Ropenfd){
1279 nfd = GBIT32(pkt+n-4);
1280 if(iosendfd(io, fd, nfd) < 0){
1281 fprint(2, "%T send fd error: %r\n");
1282 return -1;
1285 return 0;
1288 void
1289 restring(uchar *pkt, int pn, char *s)
1291 int n;
1293 if(s < (char*)pkt || s >= (char*)pkt+pn)
1294 return;
1296 n = strlen(s);
1297 memmove(s+1, s, n);
1298 PBIT16((uchar*)s-1, n);
1301 void
1302 repack(Fcall *f, uchar **ppkt, int dotu)
1304 uint n, nn;
1305 uchar *pkt;
1307 pkt = *ppkt;
1308 n = GBIT32(pkt);
1309 nn = sizeS2Mu(f, dotu);
1310 if(nn > n){
1311 free(pkt);
1312 pkt = emalloc(nn);
1313 *ppkt = pkt;
1315 n = convS2Mu(f, pkt, nn, dotu);
1316 if(n <= BIT16SZ)
1317 sysfatal("convS2M conversion error");
1318 if(n != nn)
1319 sysfatal("convS2Mu and sizeS2Mu disagree");
1322 void
1323 rewritehdr(Fcall *f, uchar *pkt)
1325 int i, n;
1327 n = GBIT32(pkt);
1328 PBIT16(pkt+5, f->tag);
1329 switch(f->type){
1330 case Tversion:
1331 case Rversion:
1332 restring(pkt, n, f->version);
1333 break;
1334 case Tauth:
1335 PBIT32(pkt+7, f->afid);
1336 restring(pkt, n, f->uname);
1337 restring(pkt, n, f->aname);
1338 break;
1339 case Tflush:
1340 PBIT16(pkt+7, f->oldtag);
1341 break;
1342 case Tattach:
1343 restring(pkt, n, f->uname);
1344 restring(pkt, n, f->aname);
1345 PBIT32(pkt+7, f->fid);
1346 PBIT32(pkt+11, f->afid);
1347 break;
1348 case Twalk:
1349 PBIT32(pkt+7, f->fid);
1350 PBIT32(pkt+11, f->newfid);
1351 for(i=0; i<f->nwname; i++)
1352 restring(pkt, n, f->wname[i]);
1353 break;
1354 case Tcreate:
1355 restring(pkt, n, f->name);
1356 /* fall through */
1357 case Topen:
1358 case Tclunk:
1359 case Tremove:
1360 case Tstat:
1361 case Twstat:
1362 case Twrite:
1363 PBIT32(pkt+7, f->fid);
1364 break;
1365 case Tread:
1366 PBIT32(pkt+7, f->fid);
1367 PBIT64(pkt+11, f->offset);
1368 break;
1369 case Rerror:
1370 restring(pkt, n, f->ename);
1371 break;
1375 static long
1376 _iolisten(va_list *arg)
1378 char *a, *b;
1380 a = va_arg(*arg, char*);
1381 b = va_arg(*arg, char*);
1382 return listen(a, b);
1385 int
1386 iolisten(Ioproc *io, char *a, char *b)
1388 return iocall(io, _iolisten, a, b);
1391 static long
1392 _ioaccept(va_list *arg)
1394 int fd;
1395 char *dir;
1397 fd = va_arg(*arg, int);
1398 dir = va_arg(*arg, char*);
1399 return accept(fd, dir);
1402 int
1403 ioaccept(Ioproc *io, int fd, char *dir)
1405 return iocall(io, _ioaccept, fd, dir);
1408 int
1409 timefmt(Fmt *fmt)
1411 static char *mon[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1412 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1413 vlong ns;
1414 Tm tm;
1415 ns = nsec();
1416 tm = *localtime(time(0));
1417 return fmtprint(fmt, "%s %2d %02d:%02d:%02d.%03d",
1418 mon[tm.mon], tm.mday, tm.hour, tm.min, tm.sec,
1419 (int)(ns%1000000000)/1000000);
1422 int
1423 cvtustat(Fcall *f, uchar **fpkt, int tounix)
1425 int n;
1426 uchar *buf;
1427 char *str;
1428 Dir dir;
1430 str = emalloc(f->nstat);
1431 n = convM2Du(f->stat, f->nstat, &dir, str, !tounix);
1432 if(n <= BIT16SZ){
1433 free(str);
1434 return -1;
1437 n = sizeD2Mu(&dir, tounix);
1438 buf = emalloc(n);
1439 if(convD2Mu(&dir, buf, n, tounix) != n)
1440 sysfatal("convD2Mu conversion error");
1441 f->nstat = n;
1442 f->stat = buf;
1444 repack(f, fpkt, dotu);
1445 free(buf);
1446 f->stat = nil; /* is this okay ??? */
1447 free(str);
1449 return 0;
1452 int
1453 stripudirread(Msg* msg)
1455 char *str;
1456 int i, m, n, nn;
1457 uchar *buf;
1458 Dir d;
1459 Fcall* rx;
1461 buf = nil;
1462 str = nil;
1463 rx = &msg->rx;
1464 n = 0;
1465 nn = 0;
1466 for(i = 0; i < rx->count; i += m){
1467 m = BIT16SZ + GBIT16(&rx->data[i]);
1468 if(statchecku((uchar*)&rx->data[i], m, 1) < 0)
1469 return -1;
1470 if(nn < m)
1471 nn = m;
1472 n++;
1475 str = emalloc(nn);
1476 buf = emalloc(rx->count);
1478 nn = 0;
1479 for(i = 0; i < rx->count; i += m){
1480 m = BIT16SZ + GBIT16(&rx->data[i]);
1481 if(convM2Du((uchar*)&rx->data[i], m, &d, str, 1) != m){
1482 free(buf);
1483 free(str);
1484 return -1;
1487 n = convD2M(&d, &buf[nn], rx->count - nn);
1488 if(n <= BIT16SZ){
1489 free(buf);
1490 free(str);
1491 return -1;
1494 nn += n;
1497 rx->count = nn;
1498 rx->data = (char*)buf;
1500 repack(&msg->rx, &msg->rpkt, 0);
1501 free(str);
1502 free(buf);
1503 rx->data = nil; /* is this okay ??? */
1505 return 0;