Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <fcall.h>
4 #include <thread.h>
5 #include <errno.h>
7 enum
8 {
9 STACK = 32768,
10 NHASH = 31,
11 MAXMSG = 64, /* per connection */
12 };
14 typedef struct Hash Hash;
15 typedef struct Fid Fid;
16 typedef struct Msg Msg;
17 typedef struct Conn Conn;
18 typedef struct Queue Queue;
20 struct Hash
21 {
22 Hash *next;
23 uint n;
24 void *v;
25 };
27 struct Fid
28 {
29 int fid;
30 int ref;
31 int cfid;
32 int openfd;
33 Fid *next;
34 };
36 struct Msg
37 {
38 Conn *c;
39 int internal;
40 int ref;
41 int ctag;
42 int tag;
43 int isopenfd;
44 Fcall tx;
45 Fcall rx;
46 Fid *fid;
47 Fid *newfid;
48 Fid *afid;
49 Msg *oldm;
50 Msg *next;
51 uchar *tpkt;
52 uchar *rpkt;
53 };
55 struct Conn
56 {
57 int fd;
58 int fdmode;
59 Fid *fdfid;
60 int nmsg;
61 int nfid;
62 Channel *inc;
63 Channel *internal;
64 int inputstalled;
65 char dir[40];
66 Hash *tag[NHASH];
67 Hash *fid[NHASH];
68 Queue *outq;
69 Queue *inq;
70 };
72 char *addr;
73 int afd;
74 char adir[40];
75 int isunix;
76 Queue *outq;
77 Queue *inq;
78 int verbose = 0;
79 int logging = 0;
80 int msize = 8192;
82 void *gethash(Hash**, uint);
83 int puthash(Hash**, uint, void*);
84 int delhash(Hash**, uint, void*);
85 Msg *mread9p(Ioproc*, int);
86 int mwrite9p(Ioproc*, int, uchar*);
87 uchar *read9ppkt(Ioproc*, int);
88 int write9ppkt(int, uchar*);
89 Msg *msgnew(int);
90 void msgput(Msg*);
91 void msgclear(Msg*);
92 Msg *msgget(int);
93 void msgincref(Msg*);
94 Fid *fidnew(int);
95 void fidput(Fid*);
96 void *emalloc(int);
97 void *erealloc(void*, int);
98 Queue *qalloc(void);
99 int sendq(Queue*, void*);
100 void *recvq(Queue*);
101 void connthread(void*);
102 void connoutthread(void*);
103 void listenthread(void*);
104 void outputthread(void*);
105 void inputthread(void*);
106 void rewritehdr(Fcall*, uchar*);
107 int tlisten(char*, char*);
108 int taccept(int, char*);
109 int iolisten(Ioproc*, char*, char*);
110 int ioaccept(Ioproc*, int, char*);
111 int iorecvfd(Ioproc*, int);
112 int iosendfd(Ioproc*, int, int);
113 void mainproc(void*);
114 int ignorepipe(void*, char*);
115 int timefmt(Fmt*);
117 void
118 usage(void)
120 fprint(2, "usage: 9pserve [-lv] address\n");
121 fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
122 exits("usage");
125 uchar vbuf[128];
126 extern int _threaddebuglevel;
127 void
128 threadmain(int argc, char **argv)
130 char *file;
131 int fd;
133 ARGBEGIN{
134 default:
135 usage();
136 case 'v':
137 verbose++;
138 break;
139 case 'u':
140 isunix = 1;
141 break;
142 case 'l':
143 logging++;
144 break;
145 }ARGEND
147 if(argc != 1)
148 usage();
149 addr = argv[0];
151 fmtinstall('T', timefmt);
153 if((afd = announce(addr, adir)) < 0)
154 sysfatal("announce %s: %r", addr);
155 if(logging){
156 if(strncmp(addr, "unix!", 5) == 0)
157 addr += 5;
158 file = smprint("%s.log", addr);
159 if(file == nil)
160 sysfatal("smprint log: %r");
161 if((fd = create(file, OWRITE, 0666)) < 0)
162 sysfatal("create %s: %r", file);
163 dup(fd, 2);
164 if(fd > 2)
165 close(fd);
167 if(verbose) fprint(2, "%T 9pserve running\n");
168 proccreate(mainproc, nil, STACK);
171 void
172 mainproc(void *v)
174 int n, nn;
175 Fcall f;
176 USED(v);
178 atnotify(ignorepipe, 1);
179 fmtinstall('D', dirfmt);
180 fmtinstall('M', dirmodefmt);
181 fmtinstall('F', fcallfmt);
182 fmtinstall('H', encodefmt);
184 outq = qalloc();
185 inq = qalloc();
187 f.type = Tversion;
188 f.version = "9P2000";
189 f.msize = msize;
190 f.tag = NOTAG;
191 n = convS2M(&f, vbuf, sizeof vbuf);
192 if(verbose > 1) fprint(2, "%T * <- %F\n", &f);
193 nn = write(1, vbuf, n);
194 if(n != nn)
195 sysfatal("error writing Tversion: %r\n");
196 n = read9pmsg(0, vbuf, sizeof vbuf);
197 if(convM2S(vbuf, n, &f) != n)
198 sysfatal("convM2S failure");
199 if(f.msize < msize)
200 msize = f.msize;
201 if(verbose > 1) fprint(2, "%T * -> %F\n", &f);
203 threadcreate(inputthread, nil, STACK);
204 threadcreate(outputthread, nil, STACK);
205 threadcreate(listenthread, nil, STACK);
206 threadexits(0);
209 int
210 ignorepipe(void *v, char *s)
212 USED(v);
213 if(strcmp(s, "sys: write on closed pipe") == 0)
214 return 1;
215 fprint(2, "%T msg: %s\n", s);
216 return 0;
219 void
220 listenthread(void *arg)
222 Conn *c;
223 Ioproc *io;
225 io = ioproc();
226 USED(arg);
227 threadsetname("listen %s", adir);
228 for(;;){
229 c = emalloc(sizeof(Conn));
230 c->fd = iolisten(io, adir, c->dir);
231 if(c->fd < 0){
232 if(verbose) fprint(2, "%T listen: %r\n");
233 close(afd);
234 free(c);
235 return;
237 c->inc = chancreate(sizeof(void*), 0);
238 c->internal = chancreate(sizeof(void*), 0);
239 c->inq = qalloc();
240 c->outq = qalloc();
241 if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
242 threadcreate(connthread, c, STACK);
246 void
247 send9pmsg(Msg *m)
249 int n, nn;
251 n = sizeS2M(&m->rx);
252 m->rpkt = emalloc(n);
253 nn = convS2M(&m->rx, m->rpkt, n);
254 if(nn != n)
255 sysfatal("sizeS2M + convS2M disagree");
256 sendq(m->c->outq, m);
259 void
260 sendomsg(Msg *m)
262 int n, nn;
264 n = sizeS2M(&m->tx);
265 m->tpkt = emalloc(n);
266 nn = convS2M(&m->tx, m->tpkt, n);
267 if(nn != n)
268 sysfatal("sizeS2M + convS2M disagree");
269 sendq(outq, m);
272 void
273 err(Msg *m, char *ename)
275 m->rx.type = Rerror;
276 m->rx.ename = ename;
277 m->rx.tag = m->tx.tag;
278 send9pmsg(m);
281 void
282 connthread(void *arg)
284 int i, fd;
285 Conn *c;
286 Hash *h, *hnext;
287 Msg *m, *om, *mm;
288 Fid *f;
289 Ioproc *io;
291 c = arg;
292 threadsetname("conn %s", c->dir);
293 io = ioproc();
294 fd = ioaccept(io, c->fd, c->dir);
295 if(fd < 0){
296 if(verbose) fprint(2, "%T accept %s: %r\n", c->dir);
297 goto out;
299 close(c->fd);
300 c->fd = fd;
301 threadcreate(connoutthread, c, STACK);
302 while((m = mread9p(io, c->fd)) != nil){
303 if(verbose > 1) fprint(2, "%T fd#%d -> %F\n", c->fd, &m->tx);
304 m->c = c;
305 m->ctag = m->tx.tag;
306 c->nmsg++;
307 if(verbose > 1) fprint(2, "%T fd#%d: new msg %p\n", c->fd, m);
308 if(puthash(c->tag, m->tx.tag, m) < 0){
309 err(m, "duplicate tag");
310 continue;
312 msgincref(m);
313 switch(m->tx.type){
314 case Tversion:
315 m->rx.tag = m->tx.tag;
316 m->rx.msize = m->tx.msize;
317 if(m->rx.msize > msize)
318 m->rx.msize = msize;
319 m->rx.version = "9P2000";
320 m->rx.type = Rversion;
321 send9pmsg(m);
322 continue;
323 case Tflush:
324 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
325 m->rx.tag = m->tx.tag;
326 m->rx.type = Rflush;
327 send9pmsg(m);
328 continue;
330 msgincref(m->oldm);
331 break;
332 case Tattach:
333 m->afid = nil;
334 if(m->tx.afid != NOFID
335 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){
336 err(m, "unknown fid");
337 continue;
339 m->fid = fidnew(m->tx.fid);
340 if(puthash(c->fid, m->tx.fid, m->fid) < 0){
341 err(m, "duplicate fid");
342 continue;
344 m->fid->ref++;
345 break;
346 case Twalk:
347 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
348 err(m, "unknown fid");
349 continue;
351 m->fid->ref++;
352 if(m->tx.newfid == m->tx.fid){
353 m->fid->ref++;
354 m->newfid = m->fid;
355 }else{
356 m->newfid = fidnew(m->tx.newfid);
357 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
358 err(m, "duplicate fid");
359 continue;
361 m->newfid->ref++;
363 break;
364 case Tauth:
365 m->afid = fidnew(m->tx.afid);
366 if(puthash(c->fid, m->tx.afid, m->afid) < 0){
367 err(m, "duplicate fid");
368 continue;
370 m->afid->ref++;
371 break;
372 case Topenfd:
373 if(m->tx.mode&~(OTRUNC|3)){
374 err(m, "bad openfd mode");
375 continue;
377 m->isopenfd = 1;
378 m->tx.type = Topen;
379 m->tpkt[4] = Topen;
380 /* fall through */
381 case Tcreate:
382 case Topen:
383 case Tclunk:
384 case Tread:
385 case Twrite:
386 case Tremove:
387 case Tstat:
388 case Twstat:
389 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
390 err(m, "unknown fid");
391 continue;
393 m->fid->ref++;
394 break;
397 /* have everything - translate and send */
398 m->c = c;
399 m->ctag = m->tx.tag;
400 m->tx.tag = m->tag;
401 if(m->fid)
402 m->tx.fid = m->fid->fid;
403 if(m->newfid)
404 m->tx.newfid = m->newfid->fid;
405 if(m->afid)
406 m->tx.afid = m->afid->fid;
407 if(m->oldm)
408 m->tx.oldtag = m->oldm->tag;
409 /* reference passes to outq */
410 sendq(outq, m);
411 while(c->nmsg >= MAXMSG){
412 c->inputstalled = 1;
413 recvp(c->inc);
417 if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
419 /* flush the output queue */
420 sendq(c->outq, nil);
421 while(c->outq != nil)
422 yield();
424 /* flush all outstanding messages */
425 for(i=0; i<NHASH; i++){
426 for(h=c->tag[i]; h; h=hnext){
427 om = h->v;
428 m = msgnew(0);
429 m->internal = 1;
430 m->c = c;
431 c->nmsg++;
432 m->tx.type = Tflush;
433 m->tx.tag = m->tag;
434 m->tx.oldtag = om->tag;
435 m->oldm = om;
436 msgincref(om);
437 msgincref(m); /* for outq */
438 sendomsg(m);
439 mm = recvp(c->internal);
440 assert(mm == m);
441 msgput(m); /* got from recvp */
442 msgput(m); /* got from msgnew */
443 msgput(om); /* got from hash table */
444 hnext = h->next;
445 free(h);
449 /* clunk all outstanding fids */
450 for(i=0; i<NHASH; i++){
451 for(h=c->fid[i]; h; h=hnext){
452 f = h->v;
453 m = msgnew(0);
454 m->internal = 1;
455 m->c = c;
456 c->nmsg++;
457 m->tx.type = Tclunk;
458 m->tx.tag = m->tag;
459 m->tx.fid = f->fid;
460 m->fid = f;
461 f->ref++;
462 msgincref(m);
463 sendomsg(m);
464 mm = recvp(c->internal);
465 assert(mm == m);
466 msgclear(m);
467 msgput(m); /* got from recvp */
468 msgput(m); /* got from msgnew */
469 fidput(f); /* got from hash table */
470 hnext = h->next;
471 free(h);
475 out:
476 closeioproc(io);
477 assert(c->nmsg == 0);
478 assert(c->nfid == 0);
479 close(c->fd);
480 chanfree(c->internal);
481 c->internal = 0;
482 chanfree(c->inc);
483 c->inc = 0;
484 free(c->inq);
485 c->inq = 0;
486 free(c);
489 static void
490 openfdthread(void *v)
492 Conn *c;
493 Fid *fid;
494 Msg *m;
495 int n;
496 vlong tot;
497 Ioproc *io;
498 char buf[1024];
500 c = v;
501 fid = c->fdfid;
502 io = ioproc();
503 threadsetname("openfd %s", c->fdfid);
504 tot = 0;
505 m = nil;
506 if(c->fdmode == OREAD){
507 for(;;){
508 if(verbose) fprint(2, "%T tread...");
509 m = msgnew(0);
510 m->internal = 1;
511 m->c = c;
512 m->tx.type = Tread;
513 m->tx.count = msize - IOHDRSZ;
514 m->tx.fid = fid->fid;
515 m->tx.tag = m->tag;
516 m->tx.offset = tot;
517 m->fid = fid;
518 fid->ref++;
519 msgincref(m);
520 sendomsg(m);
521 recvp(c->internal);
522 if(m->rx.type == Rerror){
523 // fprint(2, "%T read error: %s\n", m->rx.ename);
524 break;
526 if(m->rx.count == 0)
527 break;
528 tot += m->rx.count;
529 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
530 // fprint(2, "%T pipe write error: %r\n");
531 break;
533 msgput(m);
534 msgput(m);
535 m = nil;
537 }else{
538 for(;;){
539 if(verbose) fprint(2, "%T twrite...");
540 n = sizeof buf;
541 if(n > msize)
542 n = msize;
543 if((n=ioread(io, c->fd, buf, n)) <= 0){
544 if(n < 0)
545 fprint(2, "%T pipe read error: %r\n");
546 break;
548 m = msgnew(0);
549 m->internal = 1;
550 m->c = c;
551 m->tx.type = Twrite;
552 m->tx.fid = fid->fid;
553 m->tx.data = buf;
554 m->tx.count = n;
555 m->tx.tag = m->tag;
556 m->tx.offset = tot;
557 m->fid = fid;
558 fid->ref++;
559 msgincref(m);
560 sendomsg(m);
561 recvp(c->internal);
562 if(m->rx.type == Rerror){
563 // fprint(2, "%T write error: %s\n", m->rx.ename);
565 tot += n;
566 msgput(m);
567 msgput(m);
568 m = nil;
571 if(verbose) fprint(2, "%T eof on %d fid %d\n", c->fd, fid->fid);
572 close(c->fd);
573 closeioproc(io);
574 if(m){
575 msgput(m);
576 msgput(m);
578 if(verbose) fprint(2, "%T eof on %d fid %d ref %d\n", c->fd, fid->fid, fid->ref);
579 if(--fid->openfd == 0){
580 m = msgnew(0);
581 m->internal = 1;
582 m->c = c;
583 m->tx.type = Tclunk;
584 m->tx.tag = m->tag;
585 m->tx.fid = fid->fid;
586 m->fid = fid;
587 fid->ref++;
588 msgincref(m);
589 sendomsg(m);
590 recvp(c->internal);
591 msgput(m);
592 msgput(m);
594 fidput(fid);
595 c->fdfid = nil;
596 chanfree(c->internal);
597 c->internal = 0;
598 free(c);
601 int
602 xopenfd(Msg *m)
604 char errs[ERRMAX];
605 int n, p[2];
606 Conn *nc;
608 if(pipe(p) < 0){
609 rerrstr(errs, sizeof errs);
610 err(m, errs);
611 /* XXX return here? */
613 if(verbose) fprint(2, "%T xopen pipe %d %d...", p[0], p[1]);
615 /* now we're committed. */
617 /* a new connection for this fid */
618 nc = emalloc(sizeof(Conn));
619 nc->internal = chancreate(sizeof(void*), 0);
621 /* a ref for us */
622 nc->fdfid = m->fid;
623 m->fid->ref++;
624 nc->fdfid->openfd++;
625 nc->fdmode = m->tx.mode;
626 nc->fd = p[0];
628 /* a thread to tend the pipe */
629 threadcreate(openfdthread, nc, STACK);
631 /* if mode is ORDWR, that openfdthread will write; start a reader */
632 if((m->tx.mode&3) == ORDWR){
633 nc = emalloc(sizeof(Conn));
634 nc->internal = chancreate(sizeof(void*), 0);
635 nc->fdfid = m->fid;
636 m->fid->ref++;
637 nc->fdfid->openfd++;
638 nc->fdmode = OREAD;
639 nc->fd = dup(p[0], -1);
640 threadcreate(openfdthread, nc, STACK);
643 /* steal fid from other connection */
644 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
645 fidput(m->fid);
647 /* rewrite as Ropenfd */
648 m->rx.type = Ropenfd;
649 n = GBIT32(m->rpkt);
650 m->rpkt = erealloc(m->rpkt, n+4);
651 PBIT32(m->rpkt+n, p[1]);
652 n += 4;
653 PBIT32(m->rpkt, n);
654 m->rpkt[4] = Ropenfd;
655 m->rx.unixfd = p[1];
656 return 0;
659 void
660 connoutthread(void *arg)
662 int err;
663 Conn *c;
664 Queue *outq;
665 Msg *m, *om;
666 Ioproc *io;
668 c = arg;
669 outq = c->outq;
670 io = ioproc();
671 threadsetname("connout %s", c->dir);
672 while((m = recvq(outq)) != nil){
673 err = m->tx.type+1 != m->rx.type;
674 if(!err && m->isopenfd)
675 if(xopenfd(m) < 0)
676 continue;
677 switch(m->tx.type){
678 case Tflush:
679 om = m->oldm;
680 if(om)
681 if(delhash(om->c->tag, om->ctag, om) == 0)
682 msgput(om);
683 break;
684 case Tclunk:
685 case Tremove:
686 if(m->fid)
687 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
688 fidput(m->fid);
689 break;
690 case Tauth:
691 if(err && m->afid){
692 fprint(2, "%T auth error\n");
693 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
694 fidput(m->fid);
696 break;
697 case Tattach:
698 if(err && m->fid)
699 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
700 fidput(m->fid);
701 break;
702 case Twalk:
703 if(err && m->tx.fid != m->tx.newfid && m->newfid)
704 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
705 fidput(m->newfid);
706 break;
708 if(delhash(m->c->tag, m->ctag, m) == 0)
709 msgput(m);
710 if(verbose > 1) fprint(2, "%T fd#%d <- %F\n", c->fd, &m->rx);
711 rewritehdr(&m->rx, m->rpkt);
712 if(mwrite9p(io, c->fd, m->rpkt) < 0)
713 if(verbose) fprint(2, "%T write error: %r\n");
714 msgput(m);
715 if(c->inputstalled && c->nmsg < MAXMSG)
716 nbsendp(c->inc, 0);
718 closeioproc(io);
719 free(outq);
720 c->outq = nil;
723 void
724 outputthread(void *arg)
726 Msg *m;
727 Ioproc *io;
729 USED(arg);
730 io = ioproc();
731 threadsetname("output");
732 while((m = recvq(outq)) != nil){
733 if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
734 rewritehdr(&m->tx, m->tpkt);
735 if(mwrite9p(io, 1, m->tpkt) < 0)
736 sysfatal("output error: %r");
737 msgput(m);
739 closeioproc(io);
740 fprint(2, "%T output eof\n");
741 threadexitsall(0);
744 void
745 inputthread(void *arg)
747 uchar *pkt;
748 int n, nn, tag;
749 Msg *m;
750 Ioproc *io;
752 threadsetname("input");
753 if(verbose) fprint(2, "%T input thread\n");
754 io = ioproc();
755 USED(arg);
756 while((pkt = read9ppkt(io, 0)) != nil){
757 n = GBIT32(pkt);
758 if(n < 7){
759 fprint(2, "%T short 9P packet from server\n");
760 free(pkt);
761 continue;
763 if(verbose > 2) fprint(2, "%T read %.*H\n", n, pkt);
764 tag = GBIT16(pkt+5);
765 if((m = msgget(tag)) == nil){
766 fprint(2, "%T unexpected 9P response tag %d\n", tag);
767 free(pkt);
768 continue;
770 if((nn = convM2S(pkt, n, &m->rx)) != n){
771 fprint(2, "%T bad packet - convM2S %d but %d\n", nn, n);
772 free(pkt);
773 msgput(m);
774 continue;
776 if(verbose > 1) fprint(2, "%T * -> %F%s\n", &m->rx,
777 m->internal ? " (internal)" : "");
778 m->rpkt = pkt;
779 m->rx.tag = m->ctag;
780 if(m->internal)
781 sendp(m->c->internal, m);
782 else if(m->c->outq)
783 sendq(m->c->outq, m);
784 else
785 msgput(m);
787 closeioproc(io);
788 //fprint(2, "%T input eof\n");
789 threadexitsall(0);
792 void*
793 gethash(Hash **ht, uint n)
795 Hash *h;
797 for(h=ht[n%NHASH]; h; h=h->next)
798 if(h->n == n)
799 return h->v;
800 return nil;
803 int
804 delhash(Hash **ht, uint n, void *v)
806 Hash *h, **l;
808 for(l=&ht[n%NHASH]; h=*l; l=&h->next)
809 if(h->n == n){
810 if(h->v != v){
811 if(verbose) fprint(2, "%T delhash %d got %p want %p\n", n, h->v, v);
812 return -1;
814 *l = h->next;
815 free(h);
816 return 0;
818 return -1;
821 int
822 puthash(Hash **ht, uint n, void *v)
824 Hash *h;
826 if(gethash(ht, n))
827 return -1;
828 h = emalloc(sizeof(Hash));
829 h->next = ht[n%NHASH];
830 h->n = n;
831 h->v = v;
832 ht[n%NHASH] = h;
833 return 0;
836 Fid **fidtab;
837 int nfidtab;
838 Fid *freefid;
840 Fid*
841 fidnew(int cfid)
843 Fid *f;
845 if(freefid == nil){
846 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
847 fidtab[nfidtab] = emalloc(sizeof(Fid));
848 freefid = fidtab[nfidtab];
849 freefid->fid = nfidtab++;
851 f = freefid;
852 freefid = f->next;
853 f->cfid = cfid;
854 f->ref = 1;
855 return f;
858 void
859 fidput(Fid *f)
861 if(f == nil)
862 return;
863 assert(f->ref > 0);
864 if(--f->ref > 0)
865 return;
866 f->next = freefid;
867 f->cfid = -1;
868 freefid = f;
871 Msg **msgtab;
872 int nmsgtab;
873 Msg *freemsg;
875 void
876 msgincref(Msg *m)
878 if(verbose > 1) fprint(2, "%T msgincref @0x%lux %p tag %d/%d ref %d=>%d\n",
879 getcallerpc(&m), m, m->tag, m->ctag, m->ref, m->ref+1);
880 m->ref++;
883 Msg*
884 msgnew(int x)
886 Msg *m;
888 if(freemsg == nil){
889 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
890 msgtab[nmsgtab] = emalloc(sizeof(Msg));
891 freemsg = msgtab[nmsgtab];
892 freemsg->tag = nmsgtab++;
894 m = freemsg;
895 freemsg = m->next;
896 m->ref = 1;
897 if(verbose > 1) fprint(2, "%T msgnew @0x%lux %p tag %d ref %d\n",
898 getcallerpc(&x), m, m->tag, m->ref);
899 return m;
902 /*
903 * Clear data associated with connections, so that
904 * if all msgs have been msgcleared, the connection
905 * can be freed. Note that this does *not* free the tpkt
906 * and rpkt; they are freed in msgput with the msg itself.
907 * The io write thread might still be holding a ref to msg
908 * even once the connection has finished with it.
909 */
910 void
911 msgclear(Msg *m)
913 if(m->c){
914 m->c->nmsg--;
915 m->c = nil;
917 if(m->oldm){
918 msgput(m->oldm);
919 m->oldm = nil;
921 if(m->fid){
922 fidput(m->fid);
923 m->fid = nil;
925 if(m->afid){
926 fidput(m->afid);
927 m->afid = nil;
929 if(m->newfid){
930 fidput(m->newfid);
931 m->newfid = nil;
933 if(m->rx.type == Ropenfd && m->rx.unixfd >= 0){
934 close(m->rx.unixfd);
935 m->rx.unixfd = -1;
939 void
940 msgput(Msg *m)
942 if(m == nil)
943 return;
945 if(verbose > 1) fprint(2, "%T msgput 0x%lux %p tag %d/%d ref %d\n",
946 getcallerpc(&m), m, m->tag, m->ctag, m->ref);
947 assert(m->ref > 0);
948 if(--m->ref > 0)
949 return;
950 msgclear(m);
951 if(m->tpkt){
952 free(m->tpkt);
953 m->tpkt = nil;
955 if(m->rpkt){
956 free(m->rpkt);
957 m->rpkt = nil;
959 m->isopenfd = 0;
960 m->internal = 0;
961 m->next = freemsg;
962 freemsg = m;
965 Msg*
966 msgget(int n)
968 Msg *m;
970 if(n < 0 || n >= nmsgtab)
971 return nil;
972 m = msgtab[n];
973 if(m->ref == 0)
974 return nil;
975 if(verbose) fprint(2, "%T msgget %d = %p\n", n, m);
976 msgincref(m);
977 return m;
981 void*
982 emalloc(int n)
984 void *v;
986 v = mallocz(n, 1);
987 if(v == nil){
988 abort();
989 sysfatal("out of memory allocating %d", n);
991 return v;
994 void*
995 erealloc(void *v, int n)
997 v = realloc(v, n);
998 if(v == nil){
999 abort();
1000 sysfatal("out of memory reallocating %d", n);
1002 return v;
1005 typedef struct Qel Qel;
1006 struct Qel
1008 Qel *next;
1009 void *p;
1012 struct Queue
1014 int hungup;
1015 QLock lk;
1016 Rendez r;
1017 Qel *head;
1018 Qel *tail;
1021 Queue*
1022 qalloc(void)
1024 Queue *q;
1026 q = mallocz(sizeof(Queue), 1);
1027 if(q == nil)
1028 return nil;
1029 q->r.l = &q->lk;
1030 return q;
1033 int
1034 sendq(Queue *q, void *p)
1036 Qel *e;
1038 e = emalloc(sizeof(Qel));
1039 qlock(&q->lk);
1040 if(q->hungup){
1041 werrstr("hungup queue");
1042 qunlock(&q->lk);
1043 return -1;
1045 e->p = p;
1046 e->next = nil;
1047 if(q->head == nil)
1048 q->head = e;
1049 else
1050 q->tail->next = e;
1051 q->tail = e;
1052 rwakeup(&q->r);
1053 qunlock(&q->lk);
1054 return 0;
1057 void*
1058 recvq(Queue *q)
1060 void *p;
1061 Qel *e;
1063 qlock(&q->lk);
1064 while(q->head == nil && !q->hungup)
1065 rsleep(&q->r);
1066 if(q->hungup){
1067 qunlock(&q->lk);
1068 return nil;
1070 e = q->head;
1071 q->head = e->next;
1072 qunlock(&q->lk);
1073 p = e->p;
1074 free(e);
1075 return p;
1078 uchar*
1079 read9ppkt(Ioproc *io, int fd)
1081 uchar buf[4], *pkt;
1082 int n, nn;
1084 n = ioreadn(io, fd, buf, 4);
1085 if(n != 4)
1086 return nil;
1087 n = GBIT32(buf);
1088 pkt = emalloc(n);
1089 PBIT32(pkt, n);
1090 nn = ioreadn(io, fd, pkt+4, n-4);
1091 if(nn != n-4){
1092 free(pkt);
1093 return nil;
1095 /* would do this if we ever got one of these, but we only generate them
1096 if(pkt[4] == Ropenfd){
1097 newfd = iorecvfd(io, fd);
1098 PBIT32(pkt+n-4, newfd);
1101 return pkt;
1104 Msg*
1105 mread9p(Ioproc *io, int fd)
1107 int n, nn;
1108 uchar *pkt;
1109 Msg *m;
1111 if((pkt = read9ppkt(io, fd)) == nil)
1112 return nil;
1114 m = msgnew(0);
1115 m->tpkt = pkt;
1116 n = GBIT32(pkt);
1117 nn = convM2S(pkt, n, &m->tx);
1118 if(nn != n){
1119 fprint(2, "%T read bad packet from %d\n", fd);
1120 return nil;
1122 return m;
1125 int
1126 mwrite9p(Ioproc *io, int fd, uchar *pkt)
1128 int n, nfd;
1130 n = GBIT32(pkt);
1131 if(verbose > 2) fprint(2, "%T write %d %d %.*H\n", fd, n, n, pkt);
1132 if(verbose > 1) fprint(2, "%T before iowrite\n");
1133 if(iowrite(io, fd, pkt, n) != n){
1134 fprint(2, "%T write error: %r\n");
1135 return -1;
1137 if(verbose > 1) fprint(2, "%T after iowrite\n");
1138 if(pkt[4] == Ropenfd){
1139 nfd = GBIT32(pkt+n-4);
1140 if(iosendfd(io, fd, nfd) < 0){
1141 fprint(2, "%T send fd error: %r\n");
1142 return -1;
1145 return 0;
1148 void
1149 restring(uchar *pkt, int pn, char *s)
1151 int n;
1153 if(s < (char*)pkt || s >= (char*)pkt+pn)
1154 return;
1156 n = strlen(s);
1157 memmove(s+1, s, n);
1158 PBIT16((uchar*)s-1, n);
1161 void
1162 rewritehdr(Fcall *f, uchar *pkt)
1164 int i, n;
1166 n = GBIT32(pkt);
1167 PBIT16(pkt+5, f->tag);
1168 switch(f->type){
1169 case Tversion:
1170 case Rversion:
1171 restring(pkt, n, f->version);
1172 break;
1173 case Tauth:
1174 PBIT32(pkt+7, f->afid);
1175 restring(pkt, n, f->uname);
1176 restring(pkt, n, f->aname);
1177 break;
1178 case Tflush:
1179 PBIT16(pkt+7, f->oldtag);
1180 break;
1181 case Tattach:
1182 restring(pkt, n, f->uname);
1183 restring(pkt, n, f->aname);
1184 PBIT32(pkt+7, f->fid);
1185 PBIT32(pkt+11, f->afid);
1186 break;
1187 case Twalk:
1188 PBIT32(pkt+7, f->fid);
1189 PBIT32(pkt+11, f->newfid);
1190 for(i=0; i<f->nwname; i++)
1191 restring(pkt, n, f->wname[i]);
1192 break;
1193 case Tcreate:
1194 restring(pkt, n, f->name);
1195 /* fall through */
1196 case Topen:
1197 case Tread:
1198 case Twrite:
1199 case Tclunk:
1200 case Tremove:
1201 case Tstat:
1202 case Twstat:
1203 PBIT32(pkt+7, f->fid);
1204 break;
1205 case Rerror:
1206 restring(pkt, n, f->ename);
1207 break;
1211 static long
1212 _iolisten(va_list *arg)
1214 char *a, *b;
1216 a = va_arg(*arg, char*);
1217 b = va_arg(*arg, char*);
1218 return listen(a, b);
1221 int
1222 iolisten(Ioproc *io, char *a, char *b)
1224 return iocall(io, _iolisten, a, b);
1227 static long
1228 _ioaccept(va_list *arg)
1230 int fd;
1231 char *dir;
1233 fd = va_arg(*arg, int);
1234 dir = va_arg(*arg, char*);
1235 return accept(fd, dir);
1238 int
1239 ioaccept(Ioproc *io, int fd, char *dir)
1241 return iocall(io, _ioaccept, fd, dir);
1244 int
1245 timefmt(Fmt *fmt)
1247 static char *mon[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1248 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1249 vlong ns;
1250 Tm tm;
1251 ns = nsec();
1252 tm = *localtime(time(0));
1253 return fmtprint(fmt, "%s %2d %02d:%02d:%02d.%03d",
1254 mon[tm.mon], tm.mday, tm.hour, tm.min, tm.sec,
1255 (int)(ns%1000000000)/1000000);