Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <fcall.h>
4 #include <thread.h>
5 #include <errno.h>
7 enum
8 {
9 STACK = 32768,
10 NHASH = 31,
11 MAXMSG = 64, /* per connection */
12 };
14 typedef struct Hash Hash;
15 typedef struct Fid Fid;
16 typedef struct Msg Msg;
17 typedef struct Conn Conn;
18 typedef struct Queue Queue;
20 struct Hash
21 {
22 Hash *next;
23 uint n;
24 void *v;
25 };
27 struct Fid
28 {
29 int fid;
30 int ref;
31 int cfid;
32 int openfd;
33 int isdir;
34 Fid *next;
35 };
37 struct Msg
38 {
39 Conn *c;
40 int internal;
41 int ref;
42 int ctag;
43 int tag;
44 int isopenfd;
45 Fcall tx;
46 Fcall rx;
47 Fid *fid;
48 Fid *newfid;
49 Fid *afid;
50 Msg *oldm;
51 Msg *next;
52 uchar *tpkt;
53 uchar *rpkt;
54 };
56 struct Conn
57 {
58 int fd;
59 int fdmode;
60 Fid *fdfid;
61 int nmsg;
62 int nfid;
63 Channel *inc;
64 Channel *internal;
65 int inputstalled;
66 char dir[40];
67 Hash *tag[NHASH];
68 Hash *fid[NHASH];
69 Queue *outq;
70 Queue *inq;
71 int dotu;
72 };
74 char *xaname;
75 char *addr;
76 int afd;
77 char adir[40];
78 int isunix;
79 Queue *outq;
80 Queue *inq;
81 int verbose = 0;
82 int logging = 0;
83 int msize = 8192;
84 u32int xafid = NOFID;
85 int attached;
86 int versioned;
87 int dotu;
89 void *gethash(Hash**, uint);
90 int puthash(Hash**, uint, void*);
91 int delhash(Hash**, uint, void*);
92 Msg *mread9p(Ioproc*, int, int);
93 int mwrite9p(Ioproc*, int, uchar*);
94 uchar *read9ppkt(Ioproc*, int);
95 int write9ppkt(int, uchar*);
96 Msg *msgnew(int);
97 void msgput(Msg*);
98 void msgclear(Msg*);
99 Msg *msgget(int);
100 void msgincref(Msg*);
101 Fid *fidnew(int);
102 void fidput(Fid*);
103 void *emalloc(int);
104 void *erealloc(void*, int);
105 Queue *qalloc(void);
106 int sendq(Queue*, void*);
107 void *recvq(Queue*);
108 void connthread(void*);
109 void connoutthread(void*);
110 void listenthread(void*);
111 void outputthread(void*);
112 void inputthread(void*);
113 void rewritehdr(Fcall*, uchar*);
114 void repack(Fcall*, uchar**, int);
115 int tlisten(char*, char*);
116 int taccept(int, char*);
117 int iolisten(Ioproc*, char*, char*);
118 int ioaccept(Ioproc*, int, char*);
119 int iorecvfd(Ioproc*, int);
120 int iosendfd(Ioproc*, int, int);
121 void mainproc(void*);
122 int ignorepipe(void*, char*);
123 int timefmt(Fmt*);
124 void dorootstat(void);
125 int stripudirread(Msg*);
126 int stripustat(Fcall*, uchar**, int);
128 void
129 usage(void)
131 fprint(2, "usage: 9pserve [-lv] [-A aname afid] [-M msize] address\n");
132 fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
133 threadexitsall("usage");
136 uchar vbuf[128];
137 extern int _threaddebuglevel;
138 void
139 threadmain(int argc, char **argv)
141 char *file, *x;
142 int fd;
144 x = getenv("verbose9pserve");
145 if(x){
146 verbose = atoi(x);
147 fprint(2, "verbose9pserve %s => %d\n", x, verbose);
149 ARGBEGIN{
150 default:
151 usage();
152 case 'A':
153 attached = 1;
154 xaname = EARGF(usage());
155 xafid = atoi(EARGF(usage()));
156 break;
157 case 'M':
158 versioned = 1;
159 msize = atoi(EARGF(usage()));
160 break;
161 case 'v':
162 verbose++;
163 break;
164 case 'u':
165 isunix++;
166 break;
167 case 'l':
168 logging++;
169 break;
170 }ARGEND
172 if(attached && !versioned){
173 fprint(2, "-A must be used with -M\n");
174 usage();
177 if(argc != 1)
178 usage();
179 addr = argv[0];
181 fmtinstall('T', timefmt);
183 if((afd = announce(addr, adir)) < 0)
184 sysfatal("announce %s: %r", addr);
185 if(logging){
186 if(strncmp(addr, "unix!", 5) == 0)
187 addr += 5;
188 file = smprint("%s.log", addr);
189 if(file == nil)
190 sysfatal("smprint log: %r");
191 if((fd = create(file, OWRITE, 0666)) < 0)
192 sysfatal("create %s: %r", file);
193 dup(fd, 2);
194 if(fd > 2)
195 close(fd);
197 if(verbose) fprint(2, "%T 9pserve running\n");
198 proccreate(mainproc, nil, STACK);
201 void
202 mainproc(void *v)
204 int n, nn;
205 Fcall f;
206 USED(v);
208 atnotify(ignorepipe, 1);
209 fmtinstall('D', dirfmt);
210 fmtinstall('M', dirmodefmt);
211 fmtinstall('F', fcallfmt);
212 fmtinstall('H', encodefmt);
214 outq = qalloc();
215 inq = qalloc();
217 if(!versioned){
218 f.type = Tversion;
219 f.version = "9P2000.u";
220 f.msize = msize;
221 f.tag = NOTAG;
222 n = convS2M(&f, vbuf, sizeof vbuf);
223 if(verbose > 1) fprint(2, "%T * <- %F\n", &f);
224 nn = write(1, vbuf, n);
225 if(n != nn)
226 sysfatal("error writing Tversion: %r\n");
227 n = read9pmsg(0, vbuf, sizeof vbuf);
228 if(convM2S(vbuf, n, &f) != n)
229 sysfatal("convM2S failure");
230 if(f.msize < msize)
231 msize = f.msize;
232 if(verbose > 1) fprint(2, "%T * -> %F\n", &f);
233 dotu = strncmp(f.version, "9P2000.u", 8) == 0;
236 threadcreate(inputthread, nil, STACK);
237 threadcreate(outputthread, nil, STACK);
239 // if(rootfid)
240 // dorootstat();
242 threadcreate(listenthread, nil, STACK);
243 threadexits(0);
246 int
247 ignorepipe(void *v, char *s)
249 USED(v);
250 if(strcmp(s, "sys: write on closed pipe") == 0)
251 return 1;
252 if(strcmp(s, "sys: tstp") == 0)
253 return 1;
254 fprint(2, "9pserve %s: %T note: %s\n", addr, s);
255 return 0;
258 void
259 listenthread(void *arg)
261 Conn *c;
262 Ioproc *io;
264 io = ioproc();
265 USED(arg);
266 threadsetname("listen %s", adir);
267 for(;;){
268 c = emalloc(sizeof(Conn));
269 c->fd = iolisten(io, adir, c->dir);
270 if(c->fd < 0){
271 if(verbose) fprint(2, "%T listen: %r\n");
272 close(afd);
273 free(c);
274 return;
276 c->inc = chancreate(sizeof(void*), 0);
277 c->internal = chancreate(sizeof(void*), 0);
278 c->inq = qalloc();
279 c->outq = qalloc();
280 if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
281 threadcreate(connthread, c, STACK);
285 void
286 send9pmsg(Msg *m)
288 int n, nn;
290 n = sizeS2Mu(&m->rx, m->c->dotu);
291 m->rpkt = emalloc(n);
292 nn = convS2Mu(&m->rx, m->rpkt, n, m->c->dotu);
293 if(nn != n)
294 sysfatal("sizeS2M + convS2M disagree");
295 sendq(m->c->outq, m);
298 void
299 sendomsg(Msg *m)
301 int n, nn;
303 n = sizeS2Mu(&m->tx, m->c->dotu);
304 m->tpkt = emalloc(n);
305 nn = convS2Mu(&m->tx, m->tpkt, n, m->c->dotu);
306 if(nn != n)
307 sysfatal("sizeS2M + convS2M disagree");
308 sendq(outq, m);
311 void
312 err(Msg *m, char *ename)
314 m->rx.type = Rerror;
315 m->rx.ename = ename;
316 m->rx.tag = m->tx.tag;
317 send9pmsg(m);
320 char*
321 estrdup(char *s)
323 char *t;
325 t = emalloc(strlen(s)+1);
326 strcpy(t, s);
327 return t;
330 void
331 connthread(void *arg)
333 int i, fd;
334 Conn *c;
335 Hash *h, *hnext;
336 Msg *m, *om, *mm;
337 Fid *f;
338 Ioproc *io;
340 c = arg;
341 threadsetname("conn %s", c->dir);
342 io = ioproc();
343 fd = ioaccept(io, c->fd, c->dir);
344 if(fd < 0){
345 if(verbose) fprint(2, "%T accept %s: %r\n", c->dir);
346 goto out;
348 close(c->fd);
349 c->fd = fd;
350 threadcreate(connoutthread, c, STACK);
351 while((m = mread9p(io, c->fd, c->dotu)) != nil){
352 if(verbose > 1) fprint(2, "%T fd#%d -> %F\n", c->fd, &m->tx);
353 m->c = c;
354 m->ctag = m->tx.tag;
355 c->nmsg++;
356 if(verbose > 1) fprint(2, "%T fd#%d: new msg %p\n", c->fd, m);
357 if(puthash(c->tag, m->tx.tag, m) < 0){
358 err(m, "duplicate tag");
359 continue;
361 msgincref(m);
362 switch(m->tx.type){
363 case Tversion:
364 m->rx.tag = m->tx.tag;
365 m->rx.msize = m->tx.msize;
366 if(m->rx.msize > msize)
367 m->rx.msize = msize;
368 m->rx.version = "9P2000";
369 c->dotu = 0;
370 if(dotu && strncmp(m->tx.version, "9P2000.u", 8) == 0){
371 m->rx.version = "9P2000.u";
372 c->dotu = 1;
374 m->rx.type = Rversion;
375 send9pmsg(m);
376 continue;
377 case Tflush:
378 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
379 m->rx.tag = m->tx.tag;
380 m->rx.type = Rflush;
381 send9pmsg(m);
382 continue;
384 msgincref(m->oldm);
385 break;
386 case Tattach:
387 m->afid = nil;
388 if(m->tx.afid != NOFID
389 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){
390 err(m, "unknown fid");
391 continue;
393 if(m->afid)
394 m->afid->ref++;
395 m->fid = fidnew(m->tx.fid);
396 if(puthash(c->fid, m->tx.fid, m->fid) < 0){
397 err(m, "duplicate fid");
398 continue;
400 m->fid->ref++;
401 if(attached && m->afid==nil){
402 if(m->tx.aname[0] && strcmp(xaname, m->tx.aname) != 0){
403 err(m, "invalid attach name");
404 continue;
406 m->tx.afid = xafid;
407 m->tx.aname = xaname;
408 m->tx.uname = estrdup(m->tx.uname);
409 repack(&m->tx, &m->tpkt, c->dotu);
410 free(m->tx.uname);
411 m->tx.uname = "XXX";
413 break;
414 case Twalk:
415 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
416 err(m, "unknown fid");
417 continue;
419 m->fid->ref++;
420 if(m->tx.newfid == m->tx.fid){
421 m->fid->ref++;
422 m->newfid = m->fid;
423 }else{
424 m->newfid = fidnew(m->tx.newfid);
425 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
426 err(m, "duplicate fid");
427 continue;
429 m->newfid->ref++;
431 break;
432 case Tauth:
433 if(attached){
434 err(m, "authentication not required");
435 continue;
437 m->afid = fidnew(m->tx.afid);
438 if(puthash(c->fid, m->tx.afid, m->afid) < 0){
439 err(m, "duplicate fid");
440 continue;
442 m->afid->ref++;
443 break;
444 case Topenfd:
445 if(m->tx.mode&~(OTRUNC|3)){
446 err(m, "bad openfd mode");
447 continue;
449 m->isopenfd = 1;
450 m->tx.type = Topen;
451 m->tpkt[4] = Topen;
452 /* fall through */
453 case Tcreate:
454 case Topen:
455 case Tclunk:
456 case Tread:
457 case Twrite:
458 case Tremove:
459 case Tstat:
460 case Twstat:
461 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
462 err(m, "unknown fid");
463 continue;
465 m->fid->ref++;
466 if(m->tx.type==Twstat && dotu && !c->dotu){
467 if(stripustat(&m->tx, &m->tpkt, 1) < 0){
468 err(m, "cannot convert stat buffer");
469 continue;
472 break;
475 /* have everything - translate and send */
476 m->c = c;
477 m->ctag = m->tx.tag;
478 m->tx.tag = m->tag;
479 if(m->fid)
480 m->tx.fid = m->fid->fid;
481 if(m->newfid)
482 m->tx.newfid = m->newfid->fid;
483 if(m->afid)
484 m->tx.afid = m->afid->fid;
485 if(m->oldm)
486 m->tx.oldtag = m->oldm->tag;
487 /* reference passes to outq */
488 sendq(outq, m);
489 while(c->nmsg >= MAXMSG){
490 c->inputstalled = 1;
491 recvp(c->inc);
495 if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
497 /* flush the output queue */
498 sendq(c->outq, nil);
499 while(c->outq != nil)
500 yield();
502 /* flush all outstanding messages */
503 for(i=0; i<NHASH; i++){
504 for(h=c->tag[i]; h; h=hnext){
505 om = h->v;
506 m = msgnew(0);
507 m->internal = 1;
508 m->c = c;
509 c->nmsg++;
510 m->tx.type = Tflush;
511 m->tx.tag = m->tag;
512 m->tx.oldtag = om->tag;
513 m->oldm = om;
514 msgincref(om);
515 msgincref(m); /* for outq */
516 sendomsg(m);
517 mm = recvp(c->internal);
518 assert(mm == m);
519 msgput(m); /* got from recvp */
520 msgput(m); /* got from msgnew */
521 msgput(om); /* got from hash table */
522 hnext = h->next;
523 free(h);
527 /* clunk all outstanding fids */
528 for(i=0; i<NHASH; i++){
529 for(h=c->fid[i]; h; h=hnext){
530 f = h->v;
531 m = msgnew(0);
532 m->internal = 1;
533 m->c = c;
534 c->nmsg++;
535 m->tx.type = Tclunk;
536 m->tx.tag = m->tag;
537 m->tx.fid = f->fid;
538 m->fid = f;
539 f->ref++;
540 msgincref(m);
541 sendomsg(m);
542 mm = recvp(c->internal);
543 assert(mm == m);
544 msgclear(m);
545 msgput(m); /* got from recvp */
546 msgput(m); /* got from msgnew */
547 fidput(f); /* got from hash table */
548 hnext = h->next;
549 free(h);
553 out:
554 closeioproc(io);
555 assert(c->nmsg == 0);
556 assert(c->nfid == 0);
557 close(c->fd);
558 chanfree(c->internal);
559 c->internal = 0;
560 chanfree(c->inc);
561 c->inc = 0;
562 free(c->inq);
563 c->inq = 0;
564 free(c);
567 static void
568 openfdthread(void *v)
570 Conn *c;
571 Fid *fid;
572 Msg *m;
573 int n;
574 vlong tot;
575 Ioproc *io;
576 char buf[1024];
578 c = v;
579 fid = c->fdfid;
580 io = ioproc();
581 threadsetname("openfd %s", c->fdfid);
582 tot = 0;
583 m = nil;
584 if(c->fdmode == OREAD){
585 for(;;){
586 if(verbose) fprint(2, "%T tread...");
587 m = msgnew(0);
588 m->internal = 1;
589 m->c = c;
590 m->tx.type = Tread;
591 m->tx.count = msize - IOHDRSZ;
592 m->tx.fid = fid->fid;
593 m->tx.tag = m->tag;
594 m->tx.offset = tot;
595 m->fid = fid;
596 fid->ref++;
597 msgincref(m);
598 sendomsg(m);
599 recvp(c->internal);
600 if(m->rx.type == Rerror){
601 // fprint(2, "%T read error: %s\n", m->rx.ename);
602 break;
604 if(m->rx.count == 0)
605 break;
606 tot += m->rx.count;
607 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
608 // fprint(2, "%T pipe write error: %r\n");
609 break;
611 msgput(m);
612 msgput(m);
613 m = nil;
615 }else{
616 for(;;){
617 if(verbose) fprint(2, "%T twrite...");
618 n = sizeof buf;
619 if(n > msize)
620 n = msize;
621 if((n=ioread(io, c->fd, buf, n)) <= 0){
622 if(n < 0)
623 fprint(2, "%T pipe read error: %r\n");
624 break;
626 m = msgnew(0);
627 m->internal = 1;
628 m->c = c;
629 m->tx.type = Twrite;
630 m->tx.fid = fid->fid;
631 m->tx.data = buf;
632 m->tx.count = n;
633 m->tx.tag = m->tag;
634 m->tx.offset = tot;
635 m->fid = fid;
636 fid->ref++;
637 msgincref(m);
638 sendomsg(m);
639 recvp(c->internal);
640 if(m->rx.type == Rerror){
641 // fprint(2, "%T write error: %s\n", m->rx.ename);
643 tot += n;
644 msgput(m);
645 msgput(m);
646 m = nil;
649 if(verbose) fprint(2, "%T eof on %d fid %d\n", c->fd, fid->fid);
650 close(c->fd);
651 closeioproc(io);
652 if(m){
653 msgput(m);
654 msgput(m);
656 if(verbose) fprint(2, "%T eof on %d fid %d ref %d\n", c->fd, fid->fid, fid->ref);
657 if(--fid->openfd == 0){
658 m = msgnew(0);
659 m->internal = 1;
660 m->c = c;
661 m->tx.type = Tclunk;
662 m->tx.tag = m->tag;
663 m->tx.fid = fid->fid;
664 m->fid = fid;
665 fid->ref++;
666 msgincref(m);
667 sendomsg(m);
668 recvp(c->internal);
669 msgput(m);
670 msgput(m);
672 fidput(fid);
673 c->fdfid = nil;
674 chanfree(c->internal);
675 c->internal = 0;
676 free(c);
679 int
680 xopenfd(Msg *m)
682 char errs[ERRMAX];
683 int n, p[2];
684 Conn *nc;
686 if(pipe(p) < 0){
687 rerrstr(errs, sizeof errs);
688 err(m, errs);
689 /* XXX return here? */
691 if(verbose) fprint(2, "%T xopen pipe %d %d...", p[0], p[1]);
693 /* now we're committed. */
695 /* a new connection for this fid */
696 nc = emalloc(sizeof(Conn));
697 nc->internal = chancreate(sizeof(void*), 0);
699 /* a ref for us */
700 nc->fdfid = m->fid;
701 m->fid->ref++;
702 nc->fdfid->openfd++;
703 nc->fdmode = m->tx.mode;
704 nc->fd = p[0];
706 /* a thread to tend the pipe */
707 threadcreate(openfdthread, nc, STACK);
709 /* if mode is ORDWR, that openfdthread will write; start a reader */
710 if((m->tx.mode&3) == ORDWR){
711 nc = emalloc(sizeof(Conn));
712 nc->internal = chancreate(sizeof(void*), 0);
713 nc->fdfid = m->fid;
714 m->fid->ref++;
715 nc->fdfid->openfd++;
716 nc->fdmode = OREAD;
717 nc->fd = dup(p[0], -1);
718 threadcreate(openfdthread, nc, STACK);
721 /* steal fid from other connection */
722 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
723 fidput(m->fid);
725 /* rewrite as Ropenfd */
726 m->rx.type = Ropenfd;
727 n = GBIT32(m->rpkt);
728 m->rpkt = erealloc(m->rpkt, n+4);
729 PBIT32(m->rpkt+n, p[1]);
730 n += 4;
731 PBIT32(m->rpkt, n);
732 m->rpkt[4] = Ropenfd;
733 m->rx.unixfd = p[1];
734 return 0;
737 void
738 connoutthread(void *arg)
740 char *ename;
741 int err;
742 Conn *c;
743 Queue *outq;
744 Msg *m, *om;
745 Ioproc *io;
747 c = arg;
748 outq = c->outq;
749 io = ioproc();
750 threadsetname("connout %s", c->dir);
751 while((m = recvq(outq)) != nil){
752 err = m->tx.type+1 != m->rx.type;
753 if(!err && m->isopenfd)
754 if(xopenfd(m) < 0)
755 continue;
756 switch(m->tx.type){
757 case Tflush:
758 om = m->oldm;
759 if(om)
760 if(delhash(om->c->tag, om->ctag, om) == 0)
761 msgput(om);
762 break;
763 case Tclunk:
764 case Tremove:
765 if(m->fid)
766 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
767 fidput(m->fid);
768 break;
769 case Tauth:
770 if(err && m->afid){
771 if(verbose) fprint(2, "%T auth error\n");
772 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
773 fidput(m->afid);
775 break;
776 case Tattach:
777 if(err && m->fid)
778 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
779 fidput(m->fid);
780 break;
781 case Twalk:
782 if(err || m->rx.nwqid < m->tx.nwname)
783 if(m->tx.fid != m->tx.newfid && m->newfid)
784 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
785 fidput(m->newfid);
786 break;
787 case Tread:
788 if(!err && m->fid->isdir && dotu && !m->c->dotu)
789 stripudirread(m);
790 break;
791 case Tstat:
792 if(!err && dotu && !m->c->dotu)
793 stripustat(&m->rx, &m->rpkt, 0);
794 break;
795 case Topen:
796 case Tcreate:
797 m->fid->isdir = (m->rx.qid.type & QTDIR);
798 break;
800 if(m->rx.type==Rerror && dotu && !c->dotu){
801 ename = estrdup(m->rx.ename);
802 m->rx.ename = ename;
803 repack(&m->rx, &m->rpkt, c->dotu);
804 free(ename);
806 if(delhash(m->c->tag, m->ctag, m) == 0)
807 msgput(m);
808 if(verbose > 1) fprint(2, "%T fd#%d <- %F\n", c->fd, &m->rx);
809 rewritehdr(&m->rx, m->rpkt);
810 if(mwrite9p(io, c->fd, m->rpkt) < 0)
811 if(verbose) fprint(2, "%T write error: %r\n");
812 msgput(m);
813 if(c->inputstalled && c->nmsg < MAXMSG)
814 nbsendp(c->inc, 0);
816 closeioproc(io);
817 free(outq);
818 c->outq = nil;
821 void
822 outputthread(void *arg)
824 Msg *m;
825 Ioproc *io;
827 USED(arg);
828 io = ioproc();
829 threadsetname("output");
830 while((m = recvq(outq)) != nil){
831 if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
832 rewritehdr(&m->tx, m->tpkt);
833 if(mwrite9p(io, 1, m->tpkt) < 0)
834 sysfatal("output error: %r");
835 msgput(m);
837 closeioproc(io);
838 fprint(2, "%T output eof\n");
839 threadexitsall(0);
842 void
843 inputthread(void *arg)
845 uchar *pkt;
846 int n, nn, tag;
847 Msg *m;
848 Ioproc *io;
850 threadsetname("input");
851 if(verbose) fprint(2, "%T input thread\n");
852 io = ioproc();
853 USED(arg);
854 while((pkt = read9ppkt(io, 0)) != nil){
855 n = GBIT32(pkt);
856 if(n < 7){
857 fprint(2, "%T short 9P packet from server\n");
858 free(pkt);
859 continue;
861 if(verbose > 2) fprint(2, "%T read %.*H\n", n, pkt);
862 tag = GBIT16(pkt+5);
863 if((m = msgget(tag)) == nil){
864 fprint(2, "%T unexpected 9P response tag %d\n", tag);
865 free(pkt);
866 continue;
868 if((nn = convM2Su(pkt, n, &m->rx, dotu)) != n){
869 fprint(2, "%T bad packet - convM2S %d but %d\n", nn, n);
870 free(pkt);
871 msgput(m);
872 continue;
874 if(verbose > 1) fprint(2, "%T * -> %F%s\n", &m->rx,
875 m->internal ? " (internal)" : "");
876 m->rpkt = pkt;
877 m->rx.tag = m->ctag;
878 if(m->internal)
879 sendp(m->c->internal, m);
880 else if(m->c->outq)
881 sendq(m->c->outq, m);
882 else
883 msgput(m);
885 closeioproc(io);
886 //fprint(2, "%T input eof\n");
887 threadexitsall(0);
890 void*
891 gethash(Hash **ht, uint n)
893 Hash *h;
895 for(h=ht[n%NHASH]; h; h=h->next)
896 if(h->n == n)
897 return h->v;
898 return nil;
901 int
902 delhash(Hash **ht, uint n, void *v)
904 Hash *h, **l;
906 for(l=&ht[n%NHASH]; h=*l; l=&h->next)
907 if(h->n == n){
908 if(h->v != v){
909 if(verbose) fprint(2, "%T delhash %d got %p want %p\n", n, h->v, v);
910 return -1;
912 *l = h->next;
913 free(h);
914 return 0;
916 return -1;
919 int
920 puthash(Hash **ht, uint n, void *v)
922 Hash *h;
924 if(gethash(ht, n))
925 return -1;
926 h = emalloc(sizeof(Hash));
927 h->next = ht[n%NHASH];
928 h->n = n;
929 h->v = v;
930 ht[n%NHASH] = h;
931 return 0;
934 Fid **fidtab;
935 int nfidtab;
936 Fid *freefid;
938 Fid*
939 fidnew(int cfid)
941 Fid *f;
943 if(freefid == nil){
944 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
945 if(nfidtab == xafid){
946 fidtab[nfidtab++] = nil;
947 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
949 fidtab[nfidtab] = emalloc(sizeof(Fid));
950 freefid = fidtab[nfidtab];
951 freefid->fid = nfidtab++;
953 f = freefid;
954 freefid = f->next;
955 f->cfid = cfid;
956 f->ref = 1;
957 f->isdir = -1;
958 return f;
961 void
962 fidput(Fid *f)
964 if(f == nil)
965 return;
966 assert(f->ref > 0);
967 if(--f->ref > 0)
968 return;
969 f->next = freefid;
970 f->cfid = -1;
971 freefid = f;
974 Msg **msgtab;
975 int nmsgtab;
976 Msg *freemsg;
978 void
979 msgincref(Msg *m)
981 if(verbose > 1) fprint(2, "%T msgincref @0x%lux %p tag %d/%d ref %d=>%d\n",
982 getcallerpc(&m), m, m->tag, m->ctag, m->ref, m->ref+1);
983 m->ref++;
986 Msg*
987 msgnew(int x)
989 Msg *m;
991 if(freemsg == nil){
992 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
993 msgtab[nmsgtab] = emalloc(sizeof(Msg));
994 freemsg = msgtab[nmsgtab];
995 freemsg->tag = nmsgtab++;
997 m = freemsg;
998 freemsg = m->next;
999 m->ref = 1;
1000 if(verbose > 1) fprint(2, "%T msgnew @0x%lux %p tag %d ref %d\n",
1001 getcallerpc(&x), m, m->tag, m->ref);
1002 return m;
1006 * Clear data associated with connections, so that
1007 * if all msgs have been msgcleared, the connection
1008 * can be freed. Note that this does *not* free the tpkt
1009 * and rpkt; they are freed in msgput with the msg itself.
1010 * The io write thread might still be holding a ref to msg
1011 * even once the connection has finished with it.
1013 void
1014 msgclear(Msg *m)
1016 if(m->c){
1017 m->c->nmsg--;
1018 m->c = nil;
1020 if(m->oldm){
1021 msgput(m->oldm);
1022 m->oldm = nil;
1024 if(m->fid){
1025 fidput(m->fid);
1026 m->fid = nil;
1028 if(m->afid){
1029 fidput(m->afid);
1030 m->afid = nil;
1032 if(m->newfid){
1033 fidput(m->newfid);
1034 m->newfid = nil;
1036 if(m->rx.type == Ropenfd && m->rx.unixfd >= 0){
1037 close(m->rx.unixfd);
1038 m->rx.unixfd = -1;
1042 void
1043 msgput(Msg *m)
1045 if(m == nil)
1046 return;
1048 if(verbose > 1) fprint(2, "%T msgput 0x%lux %p tag %d/%d ref %d\n",
1049 getcallerpc(&m), m, m->tag, m->ctag, m->ref);
1050 assert(m->ref > 0);
1051 if(--m->ref > 0)
1052 return;
1053 msgclear(m);
1054 if(m->tpkt){
1055 free(m->tpkt);
1056 m->tpkt = nil;
1058 if(m->rpkt){
1059 free(m->rpkt);
1060 m->rpkt = nil;
1062 m->isopenfd = 0;
1063 m->internal = 0;
1064 m->next = freemsg;
1065 freemsg = m;
1068 Msg*
1069 msgget(int n)
1071 Msg *m;
1073 if(n < 0 || n >= nmsgtab)
1074 return nil;
1075 m = msgtab[n];
1076 if(m->ref == 0)
1077 return nil;
1078 if(verbose) fprint(2, "%T msgget %d = %p\n", n, m);
1079 msgincref(m);
1080 return m;
1084 void*
1085 emalloc(int n)
1087 void *v;
1089 v = mallocz(n, 1);
1090 if(v == nil){
1091 abort();
1092 sysfatal("out of memory allocating %d", n);
1094 return v;
1097 void*
1098 erealloc(void *v, int n)
1100 v = realloc(v, n);
1101 if(v == nil){
1102 abort();
1103 sysfatal("out of memory reallocating %d", n);
1105 return v;
1108 typedef struct Qel Qel;
1109 struct Qel
1111 Qel *next;
1112 void *p;
1115 struct Queue
1117 int hungup;
1118 QLock lk;
1119 Rendez r;
1120 Qel *head;
1121 Qel *tail;
1124 Queue*
1125 qalloc(void)
1127 Queue *q;
1129 q = mallocz(sizeof(Queue), 1);
1130 if(q == nil)
1131 return nil;
1132 q->r.l = &q->lk;
1133 return q;
1136 int
1137 sendq(Queue *q, void *p)
1139 Qel *e;
1141 e = emalloc(sizeof(Qel));
1142 qlock(&q->lk);
1143 if(q->hungup){
1144 werrstr("hungup queue");
1145 qunlock(&q->lk);
1146 return -1;
1148 e->p = p;
1149 e->next = nil;
1150 if(q->head == nil)
1151 q->head = e;
1152 else
1153 q->tail->next = e;
1154 q->tail = e;
1155 rwakeup(&q->r);
1156 qunlock(&q->lk);
1157 return 0;
1160 void*
1161 recvq(Queue *q)
1163 void *p;
1164 Qel *e;
1166 qlock(&q->lk);
1167 while(q->head == nil && !q->hungup)
1168 rsleep(&q->r);
1169 if(q->hungup){
1170 qunlock(&q->lk);
1171 return nil;
1173 e = q->head;
1174 q->head = e->next;
1175 qunlock(&q->lk);
1176 p = e->p;
1177 free(e);
1178 return p;
1181 uchar*
1182 read9ppkt(Ioproc *io, int fd)
1184 uchar buf[4], *pkt;
1185 int n, nn;
1187 n = ioreadn(io, fd, buf, 4);
1188 if(n != 4)
1189 return nil;
1190 n = GBIT32(buf);
1191 pkt = emalloc(n);
1192 PBIT32(pkt, n);
1193 nn = ioreadn(io, fd, pkt+4, n-4);
1194 if(nn != n-4){
1195 free(pkt);
1196 return nil;
1198 /* would do this if we ever got one of these, but we only generate them
1199 if(pkt[4] == Ropenfd){
1200 newfd = iorecvfd(io, fd);
1201 PBIT32(pkt+n-4, newfd);
1204 return pkt;
1207 Msg*
1208 mread9p(Ioproc *io, int fd, int dotu)
1210 int n, nn;
1211 uchar *pkt;
1212 Msg *m;
1214 if((pkt = read9ppkt(io, fd)) == nil)
1215 return nil;
1217 m = msgnew(0);
1218 m->tpkt = pkt;
1219 n = GBIT32(pkt);
1220 nn = convM2Su(pkt, n, &m->tx, dotu);
1221 if(nn != n){
1222 fprint(2, "%T read bad packet from %d\n", fd);
1223 return nil;
1225 return m;
1228 int
1229 mwrite9p(Ioproc *io, int fd, uchar *pkt)
1231 int n, nfd;
1233 n = GBIT32(pkt);
1234 if(verbose > 2) fprint(2, "%T write %d %d %.*H\n", fd, n, n, pkt);
1235 if(verbose > 1) fprint(2, "%T before iowrite\n");
1236 if(iowrite(io, fd, pkt, n) != n){
1237 fprint(2, "%T write error: %r\n");
1238 return -1;
1240 if(verbose > 1) fprint(2, "%T after iowrite\n");
1241 if(pkt[4] == Ropenfd){
1242 nfd = GBIT32(pkt+n-4);
1243 if(iosendfd(io, fd, nfd) < 0){
1244 fprint(2, "%T send fd error: %r\n");
1245 return -1;
1248 return 0;
1251 void
1252 restring(uchar *pkt, int pn, char *s)
1254 int n;
1256 if(s < (char*)pkt || s >= (char*)pkt+pn)
1257 return;
1259 n = strlen(s);
1260 memmove(s+1, s, n);
1261 PBIT16((uchar*)s-1, n);
1264 void
1265 repack(Fcall *f, uchar **ppkt, int dotu)
1267 uint n, nn;
1268 uchar *pkt;
1270 pkt = *ppkt;
1271 n = GBIT32(pkt);
1272 nn = sizeS2Mu(f, dotu);
1273 if(nn > n){
1274 free(pkt);
1275 pkt = emalloc(nn);
1276 *ppkt = pkt;
1278 convS2Mu(f, pkt, nn, dotu);
1281 void
1282 rewritehdr(Fcall *f, uchar *pkt)
1284 int i, n;
1286 n = GBIT32(pkt);
1287 PBIT16(pkt+5, f->tag);
1288 switch(f->type){
1289 case Tversion:
1290 case Rversion:
1291 restring(pkt, n, f->version);
1292 break;
1293 case Tauth:
1294 PBIT32(pkt+7, f->afid);
1295 restring(pkt, n, f->uname);
1296 restring(pkt, n, f->aname);
1297 break;
1298 case Tflush:
1299 PBIT16(pkt+7, f->oldtag);
1300 break;
1301 case Tattach:
1302 restring(pkt, n, f->uname);
1303 restring(pkt, n, f->aname);
1304 PBIT32(pkt+7, f->fid);
1305 PBIT32(pkt+11, f->afid);
1306 break;
1307 case Twalk:
1308 PBIT32(pkt+7, f->fid);
1309 PBIT32(pkt+11, f->newfid);
1310 for(i=0; i<f->nwname; i++)
1311 restring(pkt, n, f->wname[i]);
1312 break;
1313 case Tcreate:
1314 restring(pkt, n, f->name);
1315 /* fall through */
1316 case Topen:
1317 case Tread:
1318 case Twrite:
1319 case Tclunk:
1320 case Tremove:
1321 case Tstat:
1322 case Twstat:
1323 PBIT32(pkt+7, f->fid);
1324 break;
1325 case Rerror:
1326 restring(pkt, n, f->ename);
1327 break;
1331 static long
1332 _iolisten(va_list *arg)
1334 char *a, *b;
1336 a = va_arg(*arg, char*);
1337 b = va_arg(*arg, char*);
1338 return listen(a, b);
1341 int
1342 iolisten(Ioproc *io, char *a, char *b)
1344 return iocall(io, _iolisten, a, b);
1347 static long
1348 _ioaccept(va_list *arg)
1350 int fd;
1351 char *dir;
1353 fd = va_arg(*arg, int);
1354 dir = va_arg(*arg, char*);
1355 return accept(fd, dir);
1358 int
1359 ioaccept(Ioproc *io, int fd, char *dir)
1361 return iocall(io, _ioaccept, fd, dir);
1364 int
1365 timefmt(Fmt *fmt)
1367 static char *mon[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1368 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1369 vlong ns;
1370 Tm tm;
1371 ns = nsec();
1372 tm = *localtime(time(0));
1373 return fmtprint(fmt, "%s %2d %02d:%02d:%02d.%03d",
1374 mon[tm.mon], tm.mday, tm.hour, tm.min, tm.sec,
1375 (int)(ns%1000000000)/1000000);
1378 int
1379 stripustat(Fcall *f, uchar **fpkt, int s2u)
1381 int n;
1382 uchar *buf;
1383 char *str;
1384 Dir dir;
1386 str = emalloc(f->nstat);
1387 n = convM2Du(f->stat, f->nstat, &dir, str, s2u);
1388 if(n <= BIT16SZ)
1389 return -1;
1390 n = sizeD2Mu(&dir, !s2u);
1391 buf = emalloc(n);
1393 n = convD2Mu(&dir, buf, n, !s2u);
1394 if(n <= BIT16SZ)
1395 return -1;
1396 f->nstat = n;
1397 f->stat = buf;
1399 repack(f, fpkt, dotu);
1400 free(buf);
1401 free(str);
1403 return 0;
1406 int
1407 stripudirread(Msg* msg)
1409 char *str;
1410 int i, m, n, nn;
1411 uchar *buf;
1412 Dir d;
1413 Fcall* rx;
1415 buf = nil;
1416 str = nil;
1417 rx = &msg->rx;
1418 n = 0;
1419 nn = 0;
1420 for(i = 0; i < rx->count; i += m){
1421 m = BIT16SZ + GBIT16(&rx->data[i]);
1422 if(statchecku((uchar*)&rx->data[i], m, 1) < 0)
1423 return -1;
1424 if(nn < m)
1425 nn = m;
1426 n++;
1429 str = emalloc(nn);
1430 buf = emalloc(rx->count);
1432 nn = 0;
1433 for(i = 0; i < rx->count; i += m){
1434 m = BIT16SZ + GBIT16(&rx->data[i]);
1435 if(convM2Du((uchar*)&rx->data[i], m, &d, str, 1) != m){
1436 free(buf);
1437 free(str);
1438 return -1;
1441 n = convD2M(&d, &buf[nn], rx->count - nn);
1442 if(n <= BIT16SZ){
1443 free(buf);
1444 free(str);
1445 return -1;
1448 nn += n;
1451 rx->count = nn;
1452 rx->data = (char*)buf;
1454 repack(&msg->rx, &msg->rpkt, 0);
1455 free(str);
1456 free(buf);
1458 return 0;