Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <fcall.h>
4 #include <thread.h>
5 #include <errno.h>
7 #define err err9pserve /* Darwin x86 */
9 enum
10 {
11 STACK = 32768,
12 NHASH = 31,
13 MAXMSG = 64, /* per connection */
14 MAXMSGSIZE = 4*1024*1024
15 };
17 typedef struct Hash Hash;
18 typedef struct Fid Fid;
19 typedef struct Msg Msg;
20 typedef struct Conn Conn;
21 typedef struct Queue Queue;
23 struct Hash
24 {
25 Hash *next;
26 uint n;
27 void *v;
28 };
30 struct Fid
31 {
32 int fid;
33 int ref;
34 int cfid;
35 int openfd;
36 int offset;
37 int coffset;
38 int isdir;
39 Fid *next;
40 };
42 struct Msg
43 {
44 Conn *c;
45 int internal;
46 int sync;
47 int ref;
48 int ctag;
49 int tag;
50 int isopenfd;
51 Fcall tx;
52 Fcall rx;
53 Fid *fid;
54 Fid *newfid;
55 Fid *afid;
56 Msg *oldm;
57 Msg *next;
58 uchar *tpkt;
59 uchar *rpkt;
60 };
62 struct Conn
63 {
64 int fd;
65 int fdmode;
66 Fid *fdfid;
67 int nmsg;
68 int nfid;
69 Channel *inc;
70 Channel *internal;
71 int inputstalled;
72 char dir[40];
73 Hash *tag[NHASH];
74 Hash *fid[NHASH];
75 Queue *outq;
76 Queue *inq;
77 Channel *outqdead;
78 int dotu;
79 };
81 char *xaname;
82 char *addr;
83 int afd;
84 char adir[40];
85 int isunix;
86 Queue *outq;
87 Queue *inq;
88 int verbose = 0;
89 int logging = 0;
90 int msize = 8192;
91 u32int xafid = NOFID;
92 int attached;
93 int versioned;
94 int dotu;
95 int noauth;
97 void *gethash(Hash**, uint);
98 int puthash(Hash**, uint, void*);
99 int delhash(Hash**, uint, void*);
100 Msg *mread9p(Ioproc*, int, int);
101 int mwrite9p(Ioproc*, int, uchar*);
102 uchar *read9ppkt(Ioproc*, int);
103 int write9ppkt(int, uchar*);
104 Msg *msgnew(int);
105 void msgput(Msg*);
106 void msgclear(Msg*);
107 Msg *msgget(int);
108 void msgincref(Msg*);
109 Fid *fidnew(int);
110 void fidput(Fid*);
111 void *emalloc(int);
112 void *erealloc(void*, int);
113 Queue *qalloc(void);
114 int sendq(Queue*, void*);
115 void *recvq(Queue*);
116 void connthread(void*);
117 void connoutthread(void*);
118 void listenthread(void*);
119 void outputthread(void*);
120 void inputthread(void*);
121 void rewritehdr(Fcall*, uchar*);
122 void repack(Fcall*, uchar**, int);
123 int tlisten(char*, char*);
124 int taccept(int, char*);
125 int iolisten(Ioproc*, char*, char*);
126 int ioaccept(Ioproc*, int, char*);
127 int iorecvfd(Ioproc*, int);
128 int iosendfd(Ioproc*, int, int);
129 void mainproc(void*);
130 int ignorepipe(void*, char*);
131 int timefmt(Fmt*);
132 void dorootstat(void);
133 int stripudirread(Msg*);
134 int cvtustat(Fcall*, uchar**, int);
136 void
137 usage(void)
139 fprint(2, "usage: 9pserve [-lnv] [-A aname afid] [-M msize] address\n");
140 fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
141 threadexitsall("usage");
144 uchar vbuf[128];
145 extern int _threaddebuglevel;
146 void
147 threadmain(int argc, char **argv)
149 char *file, *x;
150 int fd;
152 x = getenv("verbose9pserve");
153 if(x){
154 verbose = atoi(x);
155 fprint(2, "verbose9pserve %s => %d\n", x, verbose);
157 ARGBEGIN{
158 default:
159 usage();
160 case 'A':
161 attached = 1;
162 xaname = EARGF(usage());
163 xafid = atoi(EARGF(usage()));
164 break;
165 case 'M':
166 versioned = 1;
167 msize = atoi(EARGF(usage()));
168 break;
169 case 'n':
170 noauth = 1;
171 break;
172 case 'v':
173 verbose++;
174 break;
175 case 'u':
176 isunix++;
177 break;
178 case 'l':
179 logging++;
180 break;
181 }ARGEND
183 if(attached && !versioned){
184 fprint(2, "-A must be used with -M\n");
185 usage();
188 if(argc != 1)
189 usage();
190 addr = argv[0];
192 fmtinstall('T', timefmt);
194 if((afd = announce(addr, adir)) < 0)
195 sysfatal("announce %s: %r", addr);
196 if(logging){
197 if(strncmp(addr, "unix!", 5) == 0)
198 addr += 5;
199 file = smprint("%s.log", addr);
200 if(file == nil)
201 sysfatal("smprint log: %r");
202 if((fd = create(file, OWRITE, 0666)) < 0)
203 sysfatal("create %s: %r", file);
204 dup(fd, 2);
205 if(fd > 2)
206 close(fd);
208 if(verbose) fprint(2, "%T 9pserve running\n");
209 proccreate(mainproc, nil, STACK);
212 void
213 mainproc(void *v)
215 int n, nn;
216 Fcall f;
217 USED(v);
219 atnotify(ignorepipe, 1);
220 fmtinstall('D', dirfmt);
221 fmtinstall('M', dirmodefmt);
222 fmtinstall('F', fcallfmt);
223 fmtinstall('H', encodefmt);
225 outq = qalloc();
226 inq = qalloc();
228 if(!versioned){
229 f.type = Tversion;
230 f.version = "9P2000.u";
231 f.msize = msize;
232 f.tag = NOTAG;
233 n = convS2M(&f, vbuf, sizeof vbuf);
234 if(n <= BIT16SZ)
235 sysfatal("convS2M conversion error");
236 if(verbose > 1) fprint(2, "%T * <- %F\n", &f);
237 nn = write(1, vbuf, n);
238 if(n != nn)
239 sysfatal("error writing Tversion: %r\n");
240 n = read9pmsg(0, vbuf, sizeof vbuf);
241 if(n < 0)
242 sysfatal("read9pmsg failure");
243 if(convM2S(vbuf, n, &f) != n)
244 sysfatal("convM2S failure");
245 if(f.msize < msize)
246 msize = f.msize;
247 if(verbose > 1) fprint(2, "%T * -> %F\n", &f);
248 dotu = strncmp(f.version, "9P2000.u", 8) == 0;
251 threadcreate(inputthread, nil, STACK);
252 threadcreate(outputthread, nil, STACK);
254 /* if(rootfid) */
255 /* dorootstat(); */
257 threadcreate(listenthread, nil, STACK);
258 threadexits(0);
261 int
262 ignorepipe(void *v, char *s)
264 USED(v);
265 if(strcmp(s, "sys: write on closed pipe") == 0)
266 return 1;
267 if(strcmp(s, "sys: tstp") == 0)
268 return 1;
269 if(strcmp(s, "sys: window size change") == 0)
270 return 1;
271 fprint(2, "9pserve %s: %T note: %s\n", addr, s);
272 return 0;
275 void
276 listenthread(void *arg)
278 Conn *c;
279 Ioproc *io;
281 io = ioproc();
282 USED(arg);
283 threadsetname("listen %s", adir);
284 for(;;){
285 c = emalloc(sizeof(Conn));
286 c->fd = iolisten(io, adir, c->dir);
287 if(c->fd < 0){
288 if(verbose) fprint(2, "%T listen: %r\n");
289 close(afd);
290 free(c);
291 return;
293 c->inc = chancreate(sizeof(void*), 0);
294 c->internal = chancreate(sizeof(void*), 0);
295 c->inq = qalloc();
296 c->outq = qalloc();
297 c->outqdead = chancreate(sizeof(void*), 0);
298 if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
299 threadcreate(connthread, c, STACK);
303 void
304 send9pmsg(Msg *m)
306 int n, nn;
308 n = sizeS2Mu(&m->rx, m->c->dotu);
309 m->rpkt = emalloc(n);
310 nn = convS2Mu(&m->rx, m->rpkt, n, m->c->dotu);
311 if(nn <= BIT16SZ)
312 sysfatal("convS2Mu conversion error");
313 if(nn != n)
314 sysfatal("sizeS2Mu and convS2Mu disagree");
315 sendq(m->c->outq, m);
318 void
319 sendomsg(Msg *m)
321 int n, nn;
323 n = sizeS2Mu(&m->tx, m->c->dotu);
324 m->tpkt = emalloc(n);
325 nn = convS2Mu(&m->tx, m->tpkt, n, m->c->dotu);
326 if(nn <= BIT16SZ)
327 sysfatal("convS2Mu conversion error");
328 if(nn != n)
329 sysfatal("sizeS2Mu and convS2Mu disagree");
330 sendq(outq, m);
333 void
334 err(Msg *m, char *ename)
336 m->rx.type = Rerror;
337 m->rx.ename = ename;
338 m->rx.tag = m->tx.tag;
339 send9pmsg(m);
342 char*
343 estrdup(char *s)
345 char *t;
347 t = emalloc(strlen(s)+1);
348 strcpy(t, s);
349 return t;
352 void
353 connthread(void *arg)
355 int i, fd;
356 Conn *c;
357 Hash *h, *hnext;
358 Msg *m, *om, *mm, sync;
359 Fid *f;
360 Ioproc *io;
362 c = arg;
363 threadsetname("conn %s", c->dir);
364 io = ioproc();
365 fd = ioaccept(io, c->fd, c->dir);
366 if(fd < 0){
367 if(verbose) fprint(2, "%T accept %s: %r\n", c->dir);
368 goto out;
370 close(c->fd);
371 c->fd = fd;
372 threadcreate(connoutthread, c, STACK);
373 while((m = mread9p(io, c->fd, c->dotu)) != nil){
374 if(verbose > 1) fprint(2, "%T fd#%d -> %F\n", c->fd, &m->tx);
375 m->c = c;
376 m->ctag = m->tx.tag;
377 c->nmsg++;
378 if(verbose > 1) fprint(2, "%T fd#%d: new msg %p\n", c->fd, m);
379 if(puthash(c->tag, m->tx.tag, m) < 0){
380 err(m, "duplicate tag");
381 continue;
383 msgincref(m);
384 switch(m->tx.type){
385 case Tversion:
386 m->rx.tag = m->tx.tag;
387 m->rx.msize = m->tx.msize;
388 if(m->rx.msize > msize)
389 m->rx.msize = msize;
390 m->rx.version = "9P2000";
391 c->dotu = 0;
392 if(dotu && strncmp(m->tx.version, "9P2000.u", 8) == 0){
393 m->rx.version = "9P2000.u";
394 c->dotu = 1;
396 m->rx.type = Rversion;
397 send9pmsg(m);
398 continue;
399 case Tflush:
400 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
401 m->rx.tag = m->tx.tag;
402 m->rx.type = Rflush;
403 send9pmsg(m);
404 continue;
406 msgincref(m->oldm);
407 break;
408 case Tattach:
409 m->afid = nil;
410 if(m->tx.afid != NOFID
411 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){
412 err(m, "unknown fid");
413 continue;
415 if(m->afid)
416 m->afid->ref++;
417 m->fid = fidnew(m->tx.fid);
418 if(puthash(c->fid, m->tx.fid, m->fid) < 0){
419 err(m, "duplicate fid");
420 continue;
422 m->fid->ref++;
423 if(attached && m->afid==nil){
424 if(m->tx.aname[0] && strcmp(xaname, m->tx.aname) != 0){
425 err(m, "invalid attach name");
426 continue;
428 m->tx.afid = xafid;
429 m->tx.aname = xaname;
430 m->tx.uname = getuser(); /* what srv.c used */
431 repack(&m->tx, &m->tpkt, c->dotu);
433 break;
434 case Twalk:
435 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
436 err(m, "unknown fid");
437 continue;
439 m->fid->ref++;
440 if(m->tx.newfid == m->tx.fid){
441 m->fid->ref++;
442 m->newfid = m->fid;
443 }else{
444 m->newfid = fidnew(m->tx.newfid);
445 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
446 err(m, "duplicate fid");
447 continue;
449 m->newfid->ref++;
451 break;
452 case Tauth:
453 if(attached){
454 err(m, "authentication not required");
455 continue;
457 if(noauth){
458 err(m, "authentication rejected");
459 continue;
461 m->afid = fidnew(m->tx.afid);
462 if(puthash(c->fid, m->tx.afid, m->afid) < 0){
463 err(m, "duplicate fid");
464 continue;
466 m->afid->ref++;
467 break;
468 case Tcreate:
469 if(dotu && !c->dotu && (m->tx.perm&(DMSYMLINK|DMDEVICE|DMNAMEDPIPE|DMSOCKET))){
470 err(m, "unsupported file type");
471 continue;
473 goto caseTopen;
474 case Topenfd:
475 if(m->tx.mode&~(OTRUNC|3)){
476 err(m, "bad openfd mode");
477 continue;
479 m->isopenfd = 1;
480 m->tx.type = Topen;
481 m->tpkt[4] = Topen;
482 /* fall through */
483 caseTopen:
484 case Topen:
485 case Tclunk:
486 case Tread:
487 case Twrite:
488 case Tremove:
489 case Tstat:
490 case Twstat:
491 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
492 err(m, "unknown fid");
493 continue;
495 m->fid->ref++;
496 if(m->tx.type==Twstat && dotu && !c->dotu){
497 if(cvtustat(&m->tx, &m->tpkt, 1) < 0){
498 err(m, "cannot convert stat buffer");
499 continue;
502 if(m->tx.type==Tread && m->fid->isdir && dotu && !c->dotu){
503 if(m->tx.offset = m->fid->coffset)
504 m->tx.offset = m->fid->offset;
505 else
506 m->fid->offset = m->fid->coffset;
508 break;
511 /* have everything - translate and send */
512 m->c = c;
513 m->ctag = m->tx.tag;
514 m->tx.tag = m->tag;
515 if(m->fid)
516 m->tx.fid = m->fid->fid;
517 if(m->newfid)
518 m->tx.newfid = m->newfid->fid;
519 if(m->afid)
520 m->tx.afid = m->afid->fid;
521 if(m->oldm)
522 m->tx.oldtag = m->oldm->tag;
523 /* reference passes to outq */
524 sendq(outq, m);
525 while(c->nmsg >= MAXMSG){
526 c->inputstalled = 1;
527 recvp(c->inc);
531 if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
533 /* flush all outstanding messages */
534 for(i=0; i<NHASH; i++){
535 while((h = c->tag[i]) != nil){
536 om = h->v;
537 msgincref(om); /* for us */
538 m = msgnew(0);
539 m->internal = 1;
540 m->c = c;
541 c->nmsg++;
542 m->tx.type = Tflush;
543 m->tx.tag = m->tag;
544 m->tx.oldtag = om->tag;
545 m->oldm = om;
546 msgincref(om);
547 msgincref(m); /* for outq */
548 sendomsg(m);
549 mm = recvp(c->internal);
550 assert(mm == m);
551 msgput(m); /* got from recvp */
552 msgput(m); /* got from msgnew */
553 if(delhash(c->tag, om->ctag, om) == 0)
554 msgput(om); /* got from hash table */
555 msgput(om); /* got from msgincref */
559 /*
560 * outputthread has written all its messages
561 * to the remote connection (because we've gotten all the replies!),
562 * but it might not have gotten a chance to msgput
563 * the very last one. sync up to make sure.
564 */
565 memset(&sync, 0, sizeof sync);
566 sync.sync = 1;
567 sync.c = c;
568 sendq(outq, &sync);
569 recvp(c->outqdead);
571 /* everything is quiet; can close the local output queue. */
572 sendq(c->outq, nil);
573 recvp(c->outqdead);
575 /* should be no messages left anywhere. */
576 assert(c->nmsg == 0);
578 /* clunk all outstanding fids */
579 for(i=0; i<NHASH; i++){
580 for(h=c->fid[i]; h; h=hnext){
581 f = h->v;
582 m = msgnew(0);
583 m->internal = 1;
584 m->c = c;
585 c->nmsg++;
586 m->tx.type = Tclunk;
587 m->tx.tag = m->tag;
588 m->tx.fid = f->fid;
589 m->fid = f;
590 f->ref++;
591 msgincref(m);
592 sendomsg(m);
593 mm = recvp(c->internal);
594 assert(mm == m);
595 msgclear(m);
596 msgput(m); /* got from recvp */
597 msgput(m); /* got from msgnew */
598 fidput(f); /* got from hash table */
599 hnext = h->next;
600 free(h);
604 out:
605 closeioproc(io);
606 assert(c->nmsg == 0);
607 assert(c->nfid == 0);
608 close(c->fd);
609 chanfree(c->internal);
610 c->internal = 0;
611 chanfree(c->inc);
612 c->inc = 0;
613 free(c->inq);
614 c->inq = 0;
615 free(c);
618 static void
619 openfdthread(void *v)
621 Conn *c;
622 Fid *fid;
623 Msg *m;
624 int n;
625 vlong tot;
626 Ioproc *io;
627 char buf[1024];
629 c = v;
630 fid = c->fdfid;
631 io = ioproc();
632 threadsetname("openfd %s", c->fdfid);
633 tot = 0;
634 m = nil;
635 if(c->fdmode == OREAD){
636 for(;;){
637 if(verbose) fprint(2, "%T tread...");
638 m = msgnew(0);
639 m->internal = 1;
640 m->c = c;
641 m->tx.type = Tread;
642 m->tx.count = msize - IOHDRSZ;
643 m->tx.fid = fid->fid;
644 m->tx.tag = m->tag;
645 m->tx.offset = tot;
646 m->fid = fid;
647 fid->ref++;
648 msgincref(m);
649 sendomsg(m);
650 recvp(c->internal);
651 if(m->rx.type == Rerror){
652 /* fprint(2, "%T read error: %s\n", m->rx.ename); */
653 break;
655 if(m->rx.count == 0)
656 break;
657 tot += m->rx.count;
658 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
659 /* fprint(2, "%T pipe write error: %r\n"); */
660 break;
662 msgput(m);
663 msgput(m);
664 m = nil;
666 }else{
667 for(;;){
668 if(verbose) fprint(2, "%T twrite...");
669 n = sizeof buf;
670 if(n > msize)
671 n = msize;
672 if((n=ioread(io, c->fd, buf, n)) <= 0){
673 if(n < 0)
674 fprint(2, "%T pipe read error: %r\n");
675 break;
677 m = msgnew(0);
678 m->internal = 1;
679 m->c = c;
680 m->tx.type = Twrite;
681 m->tx.fid = fid->fid;
682 m->tx.data = buf;
683 m->tx.count = n;
684 m->tx.tag = m->tag;
685 m->tx.offset = tot;
686 m->fid = fid;
687 fid->ref++;
688 msgincref(m);
689 sendomsg(m);
690 recvp(c->internal);
691 if(m->rx.type == Rerror){
692 /* fprint(2, "%T write error: %s\n", m->rx.ename); */
694 tot += n;
695 msgput(m);
696 msgput(m);
697 m = nil;
700 if(verbose) fprint(2, "%T eof on %d fid %d\n", c->fd, fid->fid);
701 close(c->fd);
702 closeioproc(io);
703 if(m){
704 msgput(m);
705 msgput(m);
707 if(verbose) fprint(2, "%T eof on %d fid %d ref %d\n", c->fd, fid->fid, fid->ref);
708 if(--fid->openfd == 0){
709 m = msgnew(0);
710 m->internal = 1;
711 m->c = c;
712 m->tx.type = Tclunk;
713 m->tx.tag = m->tag;
714 m->tx.fid = fid->fid;
715 m->fid = fid;
716 fid->ref++;
717 msgincref(m);
718 sendomsg(m);
719 recvp(c->internal);
720 msgput(m);
721 msgput(m);
723 fidput(fid);
724 c->fdfid = nil;
725 chanfree(c->internal);
726 c->internal = 0;
727 free(c);
730 int
731 xopenfd(Msg *m)
733 char errs[ERRMAX];
734 int n, p[2];
735 Conn *nc;
737 if(pipe(p) < 0){
738 rerrstr(errs, sizeof errs);
739 err(m, errs);
740 /* XXX return here? */
742 if(verbose) fprint(2, "%T xopen pipe %d %d...", p[0], p[1]);
744 /* now we're committed. */
746 /* a new connection for this fid */
747 nc = emalloc(sizeof(Conn));
748 nc->internal = chancreate(sizeof(void*), 0);
750 /* a ref for us */
751 nc->fdfid = m->fid;
752 m->fid->ref++;
753 nc->fdfid->openfd++;
754 nc->fdmode = m->tx.mode;
755 nc->fd = p[0];
757 /* a thread to tend the pipe */
758 threadcreate(openfdthread, nc, STACK);
760 /* if mode is ORDWR, that openfdthread will write; start a reader */
761 if((m->tx.mode&3) == ORDWR){
762 nc = emalloc(sizeof(Conn));
763 nc->internal = chancreate(sizeof(void*), 0);
764 nc->fdfid = m->fid;
765 m->fid->ref++;
766 nc->fdfid->openfd++;
767 nc->fdmode = OREAD;
768 nc->fd = dup(p[0], -1);
769 threadcreate(openfdthread, nc, STACK);
772 /* steal fid from other connection */
773 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
774 fidput(m->fid);
776 /* rewrite as Ropenfd */
777 m->rx.type = Ropenfd;
778 n = GBIT32(m->rpkt);
779 m->rpkt = erealloc(m->rpkt, n+4);
780 PBIT32(m->rpkt+n, p[1]);
781 n += 4;
782 PBIT32(m->rpkt, n);
783 m->rpkt[4] = Ropenfd;
784 m->rx.unixfd = p[1];
785 return 0;
788 void
789 connoutthread(void *arg)
791 char *ename;
792 int err;
793 Conn *c;
794 Msg *m, *om;
795 Ioproc *io;
797 c = arg;
798 io = ioproc();
799 threadsetname("connout %s", c->dir);
800 while((m = recvq(c->outq)) != nil){
801 err = m->tx.type+1 != m->rx.type;
802 if(!err && m->isopenfd)
803 if(xopenfd(m) < 0)
804 continue;
805 switch(m->tx.type){
806 case Tflush:
807 om = m->oldm;
808 if(om)
809 if(delhash(om->c->tag, om->ctag, om) == 0)
810 msgput(om);
811 break;
812 case Tclunk:
813 case Tremove:
814 if(m->fid)
815 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
816 fidput(m->fid);
817 break;
818 case Tauth:
819 if(err && m->afid){
820 if(verbose) fprint(2, "%T auth error\n");
821 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
822 fidput(m->afid);
824 break;
825 case Tattach:
826 if(err && m->fid)
827 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
828 fidput(m->fid);
829 break;
830 case Twalk:
831 if(err || m->rx.nwqid < m->tx.nwname)
832 if(m->tx.fid != m->tx.newfid && m->newfid)
833 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
834 fidput(m->newfid);
835 break;
836 case Tread:
837 if(!err && m->fid->isdir && dotu && !m->c->dotu){
838 m->fid->offset += m->rx.count;
839 stripudirread(m);
840 m->fid->coffset += m->rx.count;
842 break;
843 case Tstat:
844 if(!err && dotu && !m->c->dotu)
845 cvtustat(&m->rx, &m->rpkt, 0);
846 break;
847 case Topen:
848 case Tcreate:
849 m->fid->isdir = (m->rx.qid.type & QTDIR);
850 break;
852 if(m->rx.type==Rerror && dotu && !c->dotu){
853 ename = estrdup(m->rx.ename);
854 m->rx.ename = ename;
855 repack(&m->rx, &m->rpkt, c->dotu);
856 free(ename);
857 m->rx.ename = "XXX";
859 if(delhash(m->c->tag, m->ctag, m) == 0)
860 msgput(m);
861 if(verbose > 1) fprint(2, "%T fd#%d <- %F\n", c->fd, &m->rx);
862 rewritehdr(&m->rx, m->rpkt);
863 if(mwrite9p(io, c->fd, m->rpkt) < 0)
864 if(verbose) fprint(2, "%T write error: %r\n");
865 msgput(m);
866 if(c->inputstalled && c->nmsg < MAXMSG)
867 nbsendp(c->inc, 0);
869 closeioproc(io);
870 free(c->outq);
871 c->outq = nil;
872 sendp(c->outqdead, nil);
875 void
876 outputthread(void *arg)
878 Msg *m;
879 Ioproc *io;
881 USED(arg);
882 io = ioproc();
883 threadsetname("output");
884 while((m = recvq(outq)) != nil){
885 if(m->sync){
886 sendp(m->c->outqdead, nil);
887 continue;
889 if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
890 rewritehdr(&m->tx, m->tpkt);
891 if(mwrite9p(io, 1, m->tpkt) < 0)
892 sysfatal("output error: %r");
893 msgput(m);
895 closeioproc(io);
896 fprint(2, "%T output eof\n");
897 threadexitsall(0);
900 void
901 inputthread(void *arg)
903 uchar *pkt;
904 int n, nn, tag;
905 Msg *m;
906 Ioproc *io;
908 threadsetname("input");
909 if(verbose) fprint(2, "%T input thread\n");
910 io = ioproc();
911 USED(arg);
912 while((pkt = read9ppkt(io, 0)) != nil){
913 n = GBIT32(pkt);
914 if(n < 7){
915 fprint(2, "%T short 9P packet from server\n");
916 free(pkt);
917 continue;
919 if(verbose > 2) fprint(2, "%T read %.*H\n", n, pkt);
920 tag = GBIT16(pkt+5);
921 if((m = msgget(tag)) == nil){
922 fprint(2, "%T unexpected 9P response tag %d\n", tag);
923 free(pkt);
924 continue;
926 if((nn = convM2Su(pkt, n, &m->rx, dotu)) != n){
927 fprint(2, "%T bad packet - convM2S %d but %d\n", nn, n);
928 free(pkt);
929 msgput(m);
930 continue;
932 if(verbose > 1) fprint(2, "%T * -> %F%s\n", &m->rx,
933 m->internal ? " (internal)" : "");
934 m->rpkt = pkt;
935 m->rx.tag = m->ctag;
936 if(m->internal)
937 sendp(m->c->internal, m);
938 else if(m->c->outq)
939 sendq(m->c->outq, m);
940 else
941 msgput(m);
943 closeioproc(io);
944 /*fprint(2, "%T input eof\n"); */
945 threadexitsall(0);
948 void*
949 gethash(Hash **ht, uint n)
951 Hash *h;
953 for(h=ht[n%NHASH]; h; h=h->next)
954 if(h->n == n)
955 return h->v;
956 return nil;
959 int
960 delhash(Hash **ht, uint n, void *v)
962 Hash *h, **l;
964 for(l=&ht[n%NHASH]; h=*l; l=&h->next)
965 if(h->n == n){
966 if(h->v != v){
967 if(verbose) fprint(2, "%T delhash %d got %p want %p\n", n, h->v, v);
968 return -1;
970 *l = h->next;
971 free(h);
972 return 0;
974 return -1;
977 int
978 puthash(Hash **ht, uint n, void *v)
980 Hash *h;
982 if(gethash(ht, n))
983 return -1;
984 h = emalloc(sizeof(Hash));
985 h->next = ht[n%NHASH];
986 h->n = n;
987 h->v = v;
988 ht[n%NHASH] = h;
989 return 0;
992 Fid **fidtab;
993 int nfidtab;
994 Fid *freefid;
996 Fid*
997 fidnew(int cfid)
999 Fid *f;
1001 if(freefid == nil){
1002 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
1003 if(nfidtab == xafid){
1004 fidtab[nfidtab++] = nil;
1005 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
1007 fidtab[nfidtab] = emalloc(sizeof(Fid));
1008 freefid = fidtab[nfidtab];
1009 freefid->fid = nfidtab++;
1011 f = freefid;
1012 freefid = f->next;
1013 f->cfid = cfid;
1014 f->ref = 1;
1015 f->offset = 0;
1016 f->coffset = 0;
1017 f->isdir = -1;
1018 return f;
1021 void
1022 fidput(Fid *f)
1024 if(f == nil)
1025 return;
1026 assert(f->ref > 0);
1027 if(--f->ref > 0)
1028 return;
1029 f->next = freefid;
1030 f->cfid = -1;
1031 freefid = f;
1034 Msg **msgtab;
1035 int nmsgtab;
1036 int nmsg;
1037 Msg *freemsg;
1039 void
1040 msgincref(Msg *m)
1042 if(verbose > 1) fprint(2, "%T msgincref @0x%lux %p tag %d/%d ref %d=>%d\n",
1043 getcallerpc(&m), m, m->tag, m->ctag, m->ref, m->ref+1);
1044 m->ref++;
1047 Msg*
1048 msgnew(int x)
1050 Msg *m;
1052 if(freemsg == nil){
1053 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
1054 msgtab[nmsgtab] = emalloc(sizeof(Msg));
1055 freemsg = msgtab[nmsgtab];
1056 freemsg->tag = nmsgtab++;
1058 m = freemsg;
1059 freemsg = m->next;
1060 m->ref = 1;
1061 if(verbose > 1) fprint(2, "%T msgnew @0x%lux %p tag %d ref %d\n",
1062 getcallerpc(&x), m, m->tag, m->ref);
1063 nmsg++;
1064 return m;
1068 * Clear data associated with connections, so that
1069 * if all msgs have been msgcleared, the connection
1070 * can be freed. Note that this does *not* free the tpkt
1071 * and rpkt; they are freed in msgput with the msg itself.
1072 * The io write thread might still be holding a ref to msg
1073 * even once the connection has finished with it.
1075 void
1076 msgclear(Msg *m)
1078 if(m->c){
1079 m->c->nmsg--;
1080 m->c = nil;
1082 if(m->oldm){
1083 msgput(m->oldm);
1084 m->oldm = nil;
1086 if(m->fid){
1087 fidput(m->fid);
1088 m->fid = nil;
1090 if(m->afid){
1091 fidput(m->afid);
1092 m->afid = nil;
1094 if(m->newfid){
1095 fidput(m->newfid);
1096 m->newfid = nil;
1098 if(m->rx.type == Ropenfd && m->rx.unixfd >= 0){
1099 close(m->rx.unixfd);
1100 m->rx.unixfd = -1;
1104 void
1105 msgput(Msg *m)
1107 if(m == nil)
1108 return;
1110 if(verbose > 1) fprint(2, "%T msgput 0x%lux %p tag %d/%d ref %d\n",
1111 getcallerpc(&m), m, m->tag, m->ctag, m->ref);
1112 assert(m->ref > 0);
1113 if(--m->ref > 0)
1114 return;
1115 nmsg--;
1116 msgclear(m);
1117 if(m->tpkt){
1118 free(m->tpkt);
1119 m->tpkt = nil;
1121 if(m->rpkt){
1122 free(m->rpkt);
1123 m->rpkt = nil;
1125 m->isopenfd = 0;
1126 m->internal = 0;
1127 m->next = freemsg;
1128 freemsg = m;
1131 Msg*
1132 msgget(int n)
1134 Msg *m;
1136 if(n < 0 || n >= nmsgtab)
1137 return nil;
1138 m = msgtab[n];
1139 if(m->ref == 0)
1140 return nil;
1141 if(verbose) fprint(2, "%T msgget %d = %p\n", n, m);
1142 msgincref(m);
1143 return m;
1147 void*
1148 emalloc(int n)
1150 void *v;
1152 v = mallocz(n, 1);
1153 if(v == nil){
1154 abort();
1155 sysfatal("out of memory allocating %d", n);
1157 return v;
1160 void*
1161 erealloc(void *v, int n)
1163 v = realloc(v, n);
1164 if(v == nil){
1165 abort();
1166 sysfatal("out of memory reallocating %d", n);
1168 return v;
1171 typedef struct Qel Qel;
1172 struct Qel
1174 Qel *next;
1175 void *p;
1178 struct Queue
1180 QLock lk;
1181 Rendez r;
1182 Qel *head;
1183 Qel *tail;
1186 Queue*
1187 qalloc(void)
1189 Queue *q;
1191 q = mallocz(sizeof(Queue), 1);
1192 if(q == nil)
1193 return nil;
1194 q->r.l = &q->lk;
1195 return q;
1198 int
1199 sendq(Queue *q, void *p)
1201 Qel *e;
1203 e = emalloc(sizeof(Qel));
1204 qlock(&q->lk);
1205 e->p = p;
1206 e->next = nil;
1207 if(q->head == nil)
1208 q->head = e;
1209 else
1210 q->tail->next = e;
1211 q->tail = e;
1212 rwakeup(&q->r);
1213 qunlock(&q->lk);
1214 return 0;
1217 void*
1218 recvq(Queue *q)
1220 void *p;
1221 Qel *e;
1223 qlock(&q->lk);
1224 while(q->head == nil)
1225 rsleep(&q->r);
1226 e = q->head;
1227 q->head = e->next;
1228 qunlock(&q->lk);
1229 p = e->p;
1230 free(e);
1231 return p;
1234 uchar*
1235 read9ppkt(Ioproc *io, int fd)
1237 uchar buf[4], *pkt;
1238 int n, nn;
1240 n = ioreadn(io, fd, buf, 4);
1241 if(n != 4)
1242 return nil;
1243 n = GBIT32(buf);
1244 if(n > MAXMSGSIZE)
1245 return nil;
1246 pkt = emalloc(n);
1247 PBIT32(pkt, n);
1248 nn = ioreadn(io, fd, pkt+4, n-4);
1249 if(nn != n-4){
1250 free(pkt);
1251 return nil;
1253 /* would do this if we ever got one of these, but we only generate them
1254 if(pkt[4] == Ropenfd){
1255 newfd = iorecvfd(io, fd);
1256 PBIT32(pkt+n-4, newfd);
1259 return pkt;
1262 Msg*
1263 mread9p(Ioproc *io, int fd, int dotu)
1265 int n, nn;
1266 uchar *pkt;
1267 Msg *m;
1269 if((pkt = read9ppkt(io, fd)) == nil)
1270 return nil;
1272 m = msgnew(0);
1273 m->tpkt = pkt;
1274 n = GBIT32(pkt);
1275 nn = convM2Su(pkt, n, &m->tx, dotu);
1276 if(nn != n){
1277 fprint(2, "%T read bad packet from %d\n", fd);
1278 return nil;
1280 return m;
1283 int
1284 mwrite9p(Ioproc *io, int fd, uchar *pkt)
1286 int n, nfd;
1288 n = GBIT32(pkt);
1289 if(verbose > 2) fprint(2, "%T write %d %d %.*H\n", fd, n, n, pkt);
1290 if(verbose > 1) fprint(2, "%T before iowrite\n");
1291 if(iowrite(io, fd, pkt, n) != n){
1292 fprint(2, "%T write error: %r\n");
1293 return -1;
1295 if(verbose > 1) fprint(2, "%T after iowrite\n");
1296 if(pkt[4] == Ropenfd){
1297 nfd = GBIT32(pkt+n-4);
1298 if(iosendfd(io, fd, nfd) < 0){
1299 fprint(2, "%T send fd error: %r\n");
1300 return -1;
1303 return 0;
1306 void
1307 restring(uchar *pkt, int pn, char *s)
1309 int n;
1311 if(s < (char*)pkt || s >= (char*)pkt+pn)
1312 return;
1314 n = strlen(s);
1315 memmove(s+1, s, n);
1316 PBIT16((uchar*)s-1, n);
1319 void
1320 repack(Fcall *f, uchar **ppkt, int dotu)
1322 uint n, nn;
1323 uchar *pkt;
1325 pkt = *ppkt;
1326 n = GBIT32(pkt);
1327 nn = sizeS2Mu(f, dotu);
1328 if(nn > n){
1329 free(pkt);
1330 pkt = emalloc(nn);
1331 *ppkt = pkt;
1333 n = convS2Mu(f, pkt, nn, dotu);
1334 if(n <= BIT16SZ)
1335 sysfatal("convS2M conversion error");
1336 if(n != nn)
1337 sysfatal("convS2Mu and sizeS2Mu disagree");
1340 void
1341 rewritehdr(Fcall *f, uchar *pkt)
1343 int i, n;
1345 n = GBIT32(pkt);
1346 PBIT16(pkt+5, f->tag);
1347 switch(f->type){
1348 case Tversion:
1349 case Rversion:
1350 restring(pkt, n, f->version);
1351 break;
1352 case Tauth:
1353 PBIT32(pkt+7, f->afid);
1354 restring(pkt, n, f->uname);
1355 restring(pkt, n, f->aname);
1356 break;
1357 case Tflush:
1358 PBIT16(pkt+7, f->oldtag);
1359 break;
1360 case Tattach:
1361 restring(pkt, n, f->uname);
1362 restring(pkt, n, f->aname);
1363 PBIT32(pkt+7, f->fid);
1364 PBIT32(pkt+11, f->afid);
1365 break;
1366 case Twalk:
1367 PBIT32(pkt+7, f->fid);
1368 PBIT32(pkt+11, f->newfid);
1369 for(i=0; i<f->nwname; i++)
1370 restring(pkt, n, f->wname[i]);
1371 break;
1372 case Tcreate:
1373 restring(pkt, n, f->name);
1374 /* fall through */
1375 case Topen:
1376 case Tclunk:
1377 case Tremove:
1378 case Tstat:
1379 case Twstat:
1380 case Twrite:
1381 PBIT32(pkt+7, f->fid);
1382 break;
1383 case Tread:
1384 PBIT32(pkt+7, f->fid);
1385 PBIT64(pkt+11, f->offset);
1386 break;
1387 case Rerror:
1388 restring(pkt, n, f->ename);
1389 break;
1393 static long
1394 _iolisten(va_list *arg)
1396 char *a, *b;
1398 a = va_arg(*arg, char*);
1399 b = va_arg(*arg, char*);
1400 return listen(a, b);
1403 int
1404 iolisten(Ioproc *io, char *a, char *b)
1406 return iocall(io, _iolisten, a, b);
1409 static long
1410 _ioaccept(va_list *arg)
1412 int fd;
1413 char *dir;
1415 fd = va_arg(*arg, int);
1416 dir = va_arg(*arg, char*);
1417 return accept(fd, dir);
1420 int
1421 ioaccept(Ioproc *io, int fd, char *dir)
1423 return iocall(io, _ioaccept, fd, dir);
1426 int
1427 timefmt(Fmt *fmt)
1429 static char *mon[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1430 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1431 vlong ns;
1432 Tm tm;
1433 ns = nsec();
1434 tm = *localtime(time(0));
1435 return fmtprint(fmt, "%s %2d %02d:%02d:%02d.%03d",
1436 mon[tm.mon], tm.mday, tm.hour, tm.min, tm.sec,
1437 (int)(ns%1000000000)/1000000);
1440 int
1441 cvtustat(Fcall *f, uchar **fpkt, int tounix)
1443 int n;
1444 uchar *buf;
1445 char *str;
1446 Dir dir;
1448 str = emalloc(f->nstat);
1449 n = convM2Du(f->stat, f->nstat, &dir, str, !tounix);
1450 if(n <= BIT16SZ){
1451 free(str);
1452 return -1;
1455 n = sizeD2Mu(&dir, tounix);
1456 buf = emalloc(n);
1457 if(convD2Mu(&dir, buf, n, tounix) != n)
1458 sysfatal("convD2Mu conversion error");
1459 f->nstat = n;
1460 f->stat = buf;
1462 repack(f, fpkt, dotu);
1463 free(buf);
1464 f->stat = nil; /* is this okay ??? */
1465 free(str);
1467 return 0;
1470 int
1471 stripudirread(Msg* msg)
1473 char *str;
1474 int i, m, n, nn;
1475 uchar *buf;
1476 Dir d;
1477 Fcall* rx;
1479 buf = nil;
1480 str = nil;
1481 rx = &msg->rx;
1482 n = 0;
1483 nn = 0;
1484 for(i = 0; i < rx->count; i += m){
1485 m = BIT16SZ + GBIT16(&rx->data[i]);
1486 if(statchecku((uchar*)&rx->data[i], m, 1) < 0)
1487 return -1;
1488 if(nn < m)
1489 nn = m;
1490 n++;
1493 str = emalloc(nn);
1494 buf = emalloc(rx->count);
1496 nn = 0;
1497 for(i = 0; i < rx->count; i += m){
1498 m = BIT16SZ + GBIT16(&rx->data[i]);
1499 if(convM2Du((uchar*)&rx->data[i], m, &d, str, 1) != m){
1500 free(buf);
1501 free(str);
1502 return -1;
1505 n = convD2M(&d, &buf[nn], rx->count - nn);
1506 if(n <= BIT16SZ){
1507 free(buf);
1508 free(str);
1509 return -1;
1512 nn += n;
1515 rx->count = nn;
1516 rx->data = (char*)buf;
1518 repack(&msg->rx, &msg->rpkt, 0);
1519 free(str);
1520 free(buf);
1521 rx->data = nil; /* is this okay ??? */
1523 return 0;