Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <fcall.h>
4 #include <thread.h>
5 #include <errno.h>
7 enum
8 {
9 STACK = 32768,
10 NHASH = 31,
11 MAXMSG = 64, /* per connection */
12 };
14 typedef struct Hash Hash;
15 typedef struct Fid Fid;
16 typedef struct Msg Msg;
17 typedef struct Conn Conn;
18 typedef struct Queue Queue;
20 struct Hash
21 {
22 Hash *next;
23 uint n;
24 void *v;
25 };
27 struct Fid
28 {
29 int fid;
30 int ref;
31 int cfid;
32 Fid *next;
33 };
35 struct Msg
36 {
37 Conn *c;
38 int internal;
39 int ref;
40 int ctag;
41 int tag;
42 int isopenfd;
43 Fcall tx;
44 Fcall rx;
45 Fid *fid;
46 Fid *newfid;
47 Fid *afid;
48 Msg *oldm;
49 Msg *next;
50 uchar *tpkt;
51 uchar *rpkt;
52 };
54 struct Conn
55 {
56 int fd;
57 int fdmode;
58 Fid *fdfid;
59 int nmsg;
60 int nfid;
61 Channel *inc;
62 Channel *internal;
63 int inputstalled;
64 char dir[40];
65 Hash *tag[NHASH];
66 Hash *fid[NHASH];
67 Queue *outq;
68 Queue *inq;
69 };
71 char *addr;
72 int afd;
73 char adir[40];
74 int isunix;
75 Queue *outq;
76 Queue *inq;
77 int verbose = 0;
78 int msize = 8192;
80 void *gethash(Hash**, uint);
81 int puthash(Hash**, uint, void*);
82 int delhash(Hash**, uint, void*);
83 Msg *mread9p(Ioproc*, int);
84 int mwrite9p(Ioproc*, int, uchar*);
85 uchar *read9ppkt(Ioproc*, int);
86 int write9ppkt(int, uchar*);
87 Msg *msgnew(void);
88 void msgput(Msg*);
89 Msg *msgget(int);
90 Fid *fidnew(int);
91 void fidput(Fid*);
92 void *emalloc(int);
93 void *erealloc(void*, int);
94 Queue *qalloc(void);
95 int sendq(Queue*, void*);
96 void *recvq(Queue*);
97 void connthread(void*);
98 void connoutthread(void*);
99 void listenthread(void*);
100 void outputthread(void*);
101 void inputthread(void*);
102 void rewritehdr(Fcall*, uchar*);
103 int tlisten(char*, char*);
104 int taccept(int, char*);
105 int iolisten(Ioproc*, char*, char*);
106 int ioaccept(Ioproc*, int, char*);
107 int iorecvfd(Ioproc*, int);
108 int iosendfd(Ioproc*, int, int);
109 void mainproc(void*);
110 int ignorepipe(void*, char*);
112 void
113 usage(void)
115 fprint(2, "usage: 9pserve [-s service] [-u] address\n");
116 fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
117 exits("usage");
120 uchar vbuf[128];
121 extern int _threaddebuglevel;
122 void
123 threadmain(int argc, char **argv)
125 char *file;
127 ARGBEGIN{
128 default:
129 usage();
130 case 'v':
131 verbose++;
132 break;
133 case 's':
134 close(0);
135 if(open(file=EARGF(usage()), ORDWR) != 0)
136 sysfatal("open %s: %r", file);
137 dup(0, 1);
138 break;
139 case 'u':
140 isunix = 1;
141 break;
142 }ARGEND
144 if(verbose) fprint(2, "9pserve running\n");
145 if(argc != 1)
146 usage();
147 addr = argv[0];
149 if((afd = announce(addr, adir)) < 0)
150 sysfatal("announce %s: %r", addr);
152 if(verbose) fprint(2, "9pserve forking\n");
153 switch(fork()){
154 case -1:
155 sysfatal("fork: %r");
156 case 0:
157 if(verbose) fprint(2, "running mainproc\n");
158 mainproc(nil);
159 if(verbose) fprint(2, "mainproc finished\n");
160 _exits(0);
161 default:
162 if(verbose) fprint(2, "9pserve exiting\n");
163 _exits(0);
167 void
168 mainproc(void *v)
170 int n, nn;
171 Fcall f;
172 USED(v);
174 atnotify(ignorepipe, 1);
175 fmtinstall('D', dirfmt);
176 fmtinstall('M', dirmodefmt);
177 fmtinstall('F', fcallfmt);
178 fmtinstall('H', encodefmt);
180 outq = qalloc();
181 inq = qalloc();
183 f.type = Tversion;
184 f.version = "9P2000";
185 f.msize = msize;
186 f.tag = NOTAG;
187 n = convS2M(&f, vbuf, sizeof vbuf);
188 if(verbose > 1) fprint(2, "* <- %F\n", &f);
189 nn = write(1, vbuf, n);
190 if(n != nn)
191 sysfatal("error writing Tversion: %r\n");
192 n = threadread9pmsg(0, vbuf, sizeof vbuf);
193 if(convM2S(vbuf, n, &f) != n)
194 sysfatal("convM2S failure");
195 if(f.msize < msize)
196 msize = f.msize;
197 if(verbose > 1) fprint(2, "* -> %F\n", &f);
199 threadcreate(inputthread, nil, STACK);
200 threadcreate(outputthread, nil, STACK);
201 threadcreate(listenthread, nil, STACK);
202 threadexits(0);
205 int
206 ignorepipe(void *v, char *s)
208 USED(v);
209 if(strcmp(s, "sys: write on closed pipe") == 0)
210 return 1;
211 fprint(2, "msg: %s\n", s);
212 return 0;
215 void
216 listenthread(void *arg)
218 Conn *c;
219 Ioproc *io;
221 io = ioproc();
222 USED(arg);
223 for(;;){
224 c = emalloc(sizeof(Conn));
225 c->fd = iolisten(io, adir, c->dir);
226 if(c->fd < 0){
227 if(verbose) fprint(2, "listen: %r\n");
228 close(afd);
229 free(c);
230 return;
232 c->inc = chancreate(sizeof(void*), 0);
233 c->internal = chancreate(sizeof(void*), 0);
234 c->inq = qalloc();
235 c->outq = qalloc();
236 if(verbose) fprint(2, "incoming call on %s\n", c->dir);
237 threadcreate(connthread, c, STACK);
241 void
242 send9pmsg(Msg *m)
244 int n, nn;
246 n = sizeS2M(&m->rx);
247 m->rpkt = emalloc(n);
248 nn = convS2M(&m->rx, m->rpkt, n);
249 if(nn != n)
250 sysfatal("sizeS2M + convS2M disagree");
251 sendq(m->c->outq, m);
254 void
255 sendomsg(Msg *m)
257 int n, nn;
259 n = sizeS2M(&m->tx);
260 m->tpkt = emalloc(n);
261 nn = convS2M(&m->tx, m->tpkt, n);
262 if(nn != n)
263 sysfatal("sizeS2M + convS2M disagree");
264 sendq(outq, m);
267 void
268 err(Msg *m, char *ename)
270 m->rx.type = Rerror;
271 m->rx.ename = ename;
272 m->rx.tag = m->tx.tag;
273 send9pmsg(m);
276 void
277 connthread(void *arg)
279 int i, fd;
280 Conn *c;
281 Hash *h, *hnext;
282 Msg *m, *om, *mm;
283 Fid *f;
284 Ioproc *io;
286 c = arg;
287 io = ioproc();
288 fd = ioaccept(io, c->fd, c->dir);
289 if(fd < 0){
290 if(verbose) fprint(2, "accept %s: %r\n", c->dir);
291 goto out;
293 close(c->fd);
294 c->fd = fd;
295 threadcreate(connoutthread, c, STACK);
296 while((m = mread9p(io, c->fd)) != nil){
297 if(verbose > 1) fprint(2, "fd#%d -> %F\n", c->fd, &m->tx);
298 m->c = c;
299 m->ctag = m->tx.tag;
300 c->nmsg++;
301 if(puthash(c->tag, m->tx.tag, m) < 0){
302 err(m, "duplicate tag");
303 continue;
305 m->ref++;
306 switch(m->tx.type){
307 case Tversion:
308 m->rx.tag = m->tx.tag;
309 m->rx.msize = m->tx.msize;
310 if(m->rx.msize > msize)
311 m->rx.msize = msize;
312 m->rx.version = "9P2000";
313 m->rx.type = Rversion;
314 send9pmsg(m);
315 continue;
316 case Tflush:
317 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
318 m->rx.tag = m->tx.tag;
319 m->rx.type = Rflush;
320 send9pmsg(m);
321 continue;
323 m->oldm->ref++;
324 break;
325 case Tattach:
326 m->afid = nil;
327 if(m->tx.afid != NOFID
328 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){
329 err(m, "unknown fid");
330 continue;
332 m->fid = fidnew(m->tx.fid);
333 if(puthash(c->fid, m->tx.fid, m->fid) < 0){
334 err(m, "duplicate fid");
335 continue;
337 m->fid->ref++;
338 break;
339 case Twalk:
340 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
341 err(m, "unknown fid");
342 continue;
344 m->fid->ref++;
345 if(m->tx.newfid == m->tx.fid){
346 m->fid->ref++;
347 m->newfid = m->fid;
348 }else{
349 m->newfid = fidnew(m->tx.newfid);
350 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
351 err(m, "duplicate fid");
352 continue;
354 m->newfid->ref++;
356 break;
357 case Tauth:
358 m->afid = fidnew(m->tx.afid);
359 if(puthash(c->fid, m->tx.afid, m->afid) < 0){
360 err(m, "duplicate fid");
361 continue;
363 m->afid->ref++;
364 break;
365 case Topenfd:
366 if(m->tx.mode&~(OTRUNC|3)){
367 err(m, "bad openfd mode");
368 continue;
370 m->isopenfd = 1;
371 m->tx.type = Topen;
372 m->tpkt[4] = Topen;
373 /* fall through */
374 case Tcreate:
375 case Topen:
376 case Tclunk:
377 case Tread:
378 case Twrite:
379 case Tremove:
380 case Tstat:
381 case Twstat:
382 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
383 err(m, "unknown fid");
384 continue;
386 m->fid->ref++;
387 break;
390 /* have everything - translate and send */
391 m->c = c;
392 m->ctag = m->tx.tag;
393 m->tx.tag = m->tag;
394 if(m->fid)
395 m->tx.fid = m->fid->fid;
396 if(m->newfid)
397 m->tx.newfid = m->newfid->fid;
398 if(m->afid)
399 m->tx.afid = m->afid->fid;
400 if(m->oldm)
401 m->tx.oldtag = m->oldm->tag;
402 /* reference passes to outq */
403 sendq(outq, m);
404 while(c->nmsg >= MAXMSG){
405 c->inputstalled = 1;
406 recvp(c->inc);
410 if(verbose) fprint(2, "fd#%d eof; flushing conn\n", c->fd);
412 /* flush the output queue */
413 sendq(c->outq, nil);
414 while(c->outq != nil)
415 yield();
417 /* flush all outstanding messages */
418 for(i=0; i<NHASH; i++){
419 for(h=c->tag[i]; h; h=hnext){
420 om = h->v;
421 m = msgnew();
422 m->internal = 1;
423 m->c = c;
424 c->nmsg++;
425 m->tx.type = Tflush;
426 m->tx.tag = m->tag;
427 m->tx.oldtag = om->tag;
428 m->oldm = om;
429 om->ref++; /* for m->oldm */
430 m->ref++; /* for outq */
431 sendomsg(m);
432 mm = recvp(c->internal);
433 assert(mm == m);
434 msgput(m); /* got from recvp */
435 msgput(m); /* got from msgnew */
436 msgput(om); /* got from hash table */
437 hnext = h->next;
438 free(h);
442 /* clunk all outstanding fids */
443 for(i=0; i<NHASH; i++){
444 for(h=c->fid[i]; h; h=hnext){
445 f = h->v;
446 m = msgnew();
447 m->internal = 1;
448 m->c = c;
449 c->nmsg++;
450 m->tx.type = Tclunk;
451 m->tx.tag = m->tag;
452 m->tx.fid = f->fid;
453 m->fid = f;
454 f->ref++;
455 m->ref++;
456 sendomsg(m);
457 mm = recvp(c->internal);
458 assert(mm == m);
459 msgput(m); /* got from recvp */
460 msgput(m); /* got from msgnew */
461 fidput(f); /* got from hash table */
462 hnext = h->next;
463 free(h);
467 out:
468 assert(c->nmsg == 0);
469 assert(c->nfid == 0);
470 close(c->fd);
471 chanfree(c->internal);
472 c->internal = 0;
473 chanfree(c->inc);
474 c->inc = 0;
475 free(c->inq);
476 c->inq = 0;
477 free(c);
480 static void
481 openfdthread(void *v)
483 Conn *c;
484 Fid *fid;
485 Msg *m;
486 int n;
487 vlong tot;
488 Ioproc *io;
489 char buf[1024];
491 c = v;
492 fid = c->fdfid;
493 io = ioproc();
495 tot = 0;
496 m = nil;
497 if(c->fdmode == OREAD){
498 for(;;){
499 if(verbose) fprint(2, "tread...");
500 m = msgnew();
501 m->internal = 1;
502 m->c = c;
503 m->tx.type = Tread;
504 m->tx.count = msize - IOHDRSZ;
505 m->tx.fid = fid->fid;
506 m->tx.tag = m->tag;
507 m->tx.offset = tot;
508 m->fid = fid;
509 fid->ref++;
510 m->ref++;
511 sendomsg(m);
512 recvp(c->internal);
513 if(m->rx.type == Rerror){
514 // fprint(2, "read error: %s\n", m->rx.ename);
515 break;
517 if(m->rx.count == 0)
518 break;
519 tot += m->rx.count;
520 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
521 // fprint(2, "pipe write error: %r\n");
522 break;
524 msgput(m);
525 msgput(m);
526 m = nil;
528 }else{
529 for(;;){
530 if(verbose) fprint(2, "twrite...");
531 n = sizeof buf;
532 if(n > msize)
533 n = msize;
534 if((n=ioread(io, c->fd, buf, n)) <= 0){
535 if(n < 0)
536 fprint(2, "pipe read error: %r\n");
537 break;
539 m = msgnew();
540 m->internal = 1;
541 m->c = c;
542 m->tx.type = Twrite;
543 m->tx.fid = fid->fid;
544 m->tx.data = buf;
545 m->tx.count = n;
546 m->tx.tag = m->tag;
547 m->tx.offset = tot;
548 m->fid = fid;
549 fid->ref++;
550 m->ref++;
551 sendomsg(m);
552 recvp(c->internal);
553 if(m->rx.type == Rerror){
554 // fprint(2, "write error: %s\n", m->rx.ename);
556 tot += n;
557 msgput(m);
558 msgput(m);
559 m = nil;
562 if(verbose) fprint(2, "eof on %d fid %d\n", c->fd, fid->fid);
563 close(c->fd);
564 closeioproc(io);
565 if(m){
566 msgput(m);
567 msgput(m);
569 if(fid->ref == 1){
570 m = msgnew();
571 m->internal = 1;
572 m->c = c;
573 m->tx.type = Tclunk;
574 m->tx.tag = m->tag;
575 m->tx.fid = fid->fid;
576 m->fid = fid;
577 fid->ref++;
578 m->ref++;
579 sendomsg(m);
580 recvp(c->internal);
581 msgput(m);
582 msgput(m);
584 fidput(fid);
585 c->fdfid = nil;
586 chanfree(c->internal);
587 c->internal = 0;
588 free(c);
591 int
592 xopenfd(Msg *m)
594 char errs[ERRMAX];
595 int n, p[2];
596 Conn *nc;
598 if(pipe(p) < 0){
599 rerrstr(errs, sizeof errs);
600 err(m, errs);
602 if(verbose) fprint(2, "xopen pipe %d %d...", p[0], p[1]);
604 /* now we're committed. */
606 /* a new connection for this fid */
607 nc = emalloc(sizeof(Conn));
608 nc->internal = chancreate(sizeof(void*), 0);
610 /* a ref for us */
611 nc->fdfid = m->fid;
612 m->fid->ref++;
613 nc->fdmode = m->tx.mode;
614 nc->fd = p[0];
616 /* a thread to tend the pipe */
617 threadcreate(openfdthread, nc, STACK);
619 /* if mode is ORDWR, that openfdthread will write; start a reader */
620 if((m->tx.mode&3) == ORDWR){
621 nc = emalloc(sizeof(Conn));
622 nc->internal = chancreate(sizeof(void*), 0);
623 nc->fdfid = m->fid;
624 m->fid->ref++;
625 nc->fdmode = OREAD;
626 nc->fd = dup(p[0], -1);
627 threadcreate(openfdthread, nc, STACK);
630 /* steal fid from other connection */
631 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
632 fidput(m->fid);
634 /* rewrite as Ropenfd */
635 m->rx.type = Ropenfd;
636 n = GBIT32(m->rpkt);
637 m->rpkt = erealloc(m->rpkt, n+4);
638 PBIT32(m->rpkt+n, p[1]);
639 n += 4;
640 PBIT32(m->rpkt, n);
641 m->rpkt[4] = Ropenfd;
642 m->rx.unixfd = p[1];
643 return 0;
646 void
647 connoutthread(void *arg)
649 int err;
650 Conn *c;
651 Queue *outq;
652 Msg *m, *om;
653 Ioproc *io;
655 c = arg;
656 outq = c->outq;
657 io = ioproc();
658 while((m = recvq(outq)) != nil){
659 err = m->tx.type+1 != m->rx.type;
660 if(!err && m->isopenfd)
661 if(xopenfd(m) < 0)
662 continue;
663 switch(m->tx.type){
664 case Tflush:
665 om = m->oldm;
666 if(om)
667 if(delhash(om->c->tag, om->ctag, om) == 0)
668 msgput(om);
669 break;
670 case Tclunk:
671 case Tremove:
672 if(m->fid)
673 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
674 fidput(m->fid);
675 break;
676 case Tauth:
677 if(err && m->afid){
678 fprint(2, "auth error\n");
679 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
680 fidput(m->fid);
682 break;
683 case Tattach:
684 if(err && m->fid)
685 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
686 fidput(m->fid);
687 break;
688 case Twalk:
689 if(err && m->tx.fid != m->tx.newfid && m->newfid)
690 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
691 fidput(m->newfid);
692 break;
694 if(delhash(m->c->tag, m->ctag, m) == 0)
695 msgput(m);
696 if(verbose > 1) fprint(2, "fd#%d <- %F\n", c->fd, &m->rx);
697 rewritehdr(&m->rx, m->rpkt);
698 if(mwrite9p(io, c->fd, m->rpkt) < 0)
699 if(verbose) fprint(2, "write error: %r\n");
700 msgput(m);
701 if(c->inputstalled && c->nmsg < MAXMSG)
702 nbsendp(c->inc, 0);
704 closeioproc(io);
705 free(outq);
706 c->outq = nil;
709 void
710 outputthread(void *arg)
712 Msg *m;
713 Ioproc *io;
715 USED(arg);
716 io = ioproc();
717 while((m = recvq(outq)) != nil){
718 if(verbose > 1) fprint(2, "* <- %F\n", &m->tx);
719 rewritehdr(&m->tx, m->tpkt);
720 if(mwrite9p(io, 1, m->tpkt) < 0)
721 sysfatal("output error: %r");
722 msgput(m);
724 closeioproc(io);
725 fprint(2, "output eof\n");
726 threadexitsall(0);
729 void
730 inputthread(void *arg)
732 uchar *pkt;
733 int n, nn, tag;
734 Msg *m;
735 Ioproc *io;
737 if(verbose) fprint(2, "input thread\n");
738 io = ioproc();
739 USED(arg);
740 while((pkt = read9ppkt(io, 0)) != nil){
741 n = GBIT32(pkt);
742 if(n < 7){
743 fprint(2, "short 9P packet from server\n");
744 free(pkt);
745 continue;
747 if(verbose > 2) fprint(2, "read %.*H\n", n, pkt);
748 tag = GBIT16(pkt+5);
749 if((m = msgget(tag)) == nil){
750 fprint(2, "unexpected 9P response tag %d\n", tag);
751 free(pkt);
752 continue;
754 if((nn = convM2S(pkt, n, &m->rx)) != n){
755 fprint(2, "bad packet - convM2S %d but %d\n", nn, n);
756 free(pkt);
757 msgput(m);
758 continue;
760 if(verbose > 1) fprint(2, "* -> %F%s\n", &m->rx,
761 m->internal ? " (internal)" : "");
762 m->rpkt = pkt;
763 m->rx.tag = m->ctag;
764 if(m->internal)
765 sendp(m->c->internal, m);
766 else if(m->c->outq)
767 sendq(m->c->outq, m);
768 else
769 msgput(m);
771 closeioproc(io);
772 //fprint(2, "input eof\n");
773 threadexitsall(0);
776 void*
777 gethash(Hash **ht, uint n)
779 Hash *h;
781 for(h=ht[n%NHASH]; h; h=h->next)
782 if(h->n == n)
783 return h->v;
784 return nil;
787 int
788 delhash(Hash **ht, uint n, void *v)
790 Hash *h, **l;
792 for(l=&ht[n%NHASH]; h=*l; l=&h->next)
793 if(h->n == n){
794 if(h->v != v){
795 if(verbose) fprint(2, "delhash %d got %p want %p\n", n, h->v, v);
796 return -1;
798 *l = h->next;
799 free(h);
800 return 0;
802 return -1;
805 int
806 puthash(Hash **ht, uint n, void *v)
808 Hash *h;
810 if(gethash(ht, n))
811 return -1;
812 h = emalloc(sizeof(Hash));
813 h->next = ht[n%NHASH];
814 h->n = n;
815 h->v = v;
816 ht[n%NHASH] = h;
817 return 0;
820 Fid **fidtab;
821 int nfidtab;
822 Fid *freefid;
824 Fid*
825 fidnew(int cfid)
827 Fid *f;
829 if(freefid == nil){
830 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
831 fidtab[nfidtab] = emalloc(sizeof(Fid));
832 freefid = fidtab[nfidtab];
833 freefid->fid = nfidtab++;
835 f = freefid;
836 freefid = f->next;
837 f->cfid = cfid;
838 f->ref = 1;
839 return f;
842 void
843 fidput(Fid *f)
845 if(f == nil)
846 return;
847 assert(f->ref > 0);
848 if(--f->ref > 0)
849 return;
850 f->next = freefid;
851 f->cfid = -1;
852 freefid = f;
855 Msg **msgtab;
856 int nmsgtab;
857 Msg *freemsg;
859 Msg*
860 msgnew(void)
862 Msg *m;
864 if(freemsg == nil){
865 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
866 msgtab[nmsgtab] = emalloc(sizeof(Msg));
867 freemsg = msgtab[nmsgtab];
868 freemsg->tag = nmsgtab++;
870 m = freemsg;
871 freemsg = m->next;
872 m->ref = 1;
873 return m;
876 void
877 msgput(Msg *m)
879 if(m == nil)
880 return;
882 if(verbose > 2) fprint(2, "msgput tag %d/%d ref %d\n", m->tag, m->ctag, m->ref);
883 assert(m->ref > 0);
884 if(--m->ref > 0)
885 return;
886 m->c->nmsg--;
887 m->c = nil;
888 msgput(m->oldm);
889 m->oldm = nil;
890 fidput(m->fid);
891 m->fid = nil;
892 fidput(m->afid);
893 m->afid = nil;
894 fidput(m->newfid);
895 m->newfid = nil;
896 free(m->tpkt);
897 m->tpkt = nil;
898 free(m->rpkt);
899 m->rpkt = nil;
900 if(m->rx.type == Ropenfd)
901 close(m->rx.unixfd);
902 m->rx.unixfd = -1;
903 m->isopenfd = 0;
904 m->internal = 0;
905 m->next = freemsg;
906 freemsg = m;
909 Msg*
910 msgget(int n)
912 Msg *m;
914 if(n < 0 || n >= nmsgtab)
915 return nil;
916 m = msgtab[n];
917 if(m->ref == 0)
918 return nil;
919 if(verbose) fprint(2, "msgget %d = %p\n", n, m);
920 m->ref++;
921 return m;
925 void*
926 emalloc(int n)
928 void *v;
930 v = mallocz(n, 1);
931 if(v == nil){
932 abort();
933 sysfatal("out of memory allocating %d", n);
935 return v;
938 void*
939 erealloc(void *v, int n)
941 v = realloc(v, n);
942 if(v == nil){
943 abort();
944 sysfatal("out of memory reallocating %d", n);
946 return v;
949 typedef struct Qel Qel;
950 struct Qel
952 Qel *next;
953 void *p;
954 };
956 struct Queue
958 int hungup;
959 QLock lk;
960 Rendez r;
961 Qel *head;
962 Qel *tail;
963 };
965 Queue*
966 qalloc(void)
968 Queue *q;
970 q = mallocz(sizeof(Queue), 1);
971 if(q == nil)
972 return nil;
973 q->r.l = &q->lk;
974 return q;
977 int
978 sendq(Queue *q, void *p)
980 Qel *e;
982 e = emalloc(sizeof(Qel));
983 qlock(&q->lk);
984 if(q->hungup){
985 werrstr("hungup queue");
986 qunlock(&q->lk);
987 return -1;
989 e->p = p;
990 e->next = nil;
991 if(q->head == nil)
992 q->head = e;
993 else
994 q->tail->next = e;
995 q->tail = e;
996 rwakeup(&q->r);
997 qunlock(&q->lk);
998 return 0;
1001 void*
1002 recvq(Queue *q)
1004 void *p;
1005 Qel *e;
1007 qlock(&q->lk);
1008 while(q->head == nil && !q->hungup)
1009 rsleep(&q->r);
1010 if(q->hungup){
1011 qunlock(&q->lk);
1012 return nil;
1014 e = q->head;
1015 q->head = e->next;
1016 qunlock(&q->lk);
1017 p = e->p;
1018 free(e);
1019 return p;
1022 uchar*
1023 read9ppkt(Ioproc *io, int fd)
1025 uchar buf[4], *pkt;
1026 int n, nn;
1028 n = ioreadn(io, fd, buf, 4);
1029 if(n != 4)
1030 return nil;
1031 n = GBIT32(buf);
1032 pkt = emalloc(n);
1033 PBIT32(pkt, n);
1034 nn = ioreadn(io, fd, pkt+4, n-4);
1035 if(nn != n-4){
1036 free(pkt);
1037 return nil;
1039 /* would do this if we ever got one of these, but we only generate them
1040 if(pkt[4] == Ropenfd){
1041 newfd = iorecvfd(io, fd);
1042 PBIT32(pkt+n-4, newfd);
1045 return pkt;
1048 Msg*
1049 mread9p(Ioproc *io, int fd)
1051 int n, nn;
1052 uchar *pkt;
1053 Msg *m;
1055 if((pkt = read9ppkt(io, fd)) == nil)
1056 return nil;
1058 m = msgnew();
1059 m->tpkt = pkt;
1060 n = GBIT32(pkt);
1061 nn = convM2S(pkt, n, &m->tx);
1062 if(nn != n){
1063 fprint(2, "read bad packet from %d\n", fd);
1064 return nil;
1066 return m;
1069 int
1070 mwrite9p(Ioproc *io, int fd, uchar *pkt)
1072 int n, nfd;
1074 n = GBIT32(pkt);
1075 if(verbose > 2) fprint(2, "write %d %d %.*H\n", fd, n, n, pkt);
1076 if(iowrite(io, fd, pkt, n) != n){
1077 fprint(2, "write error: %r\n");
1078 return -1;
1080 if(pkt[4] == Ropenfd){
1081 nfd = GBIT32(pkt+n-4);
1082 if(iosendfd(io, fd, nfd) < 0){
1083 fprint(2, "send fd error: %r\n");
1084 return -1;
1087 return 0;
1090 void
1091 restring(uchar *pkt, int pn, char *s)
1093 int n;
1095 if(s < (char*)pkt || s >= (char*)pkt+pn)
1096 return;
1098 n = strlen(s);
1099 memmove(s+1, s, n);
1100 PBIT16((uchar*)s-1, n);
1103 void
1104 rewritehdr(Fcall *f, uchar *pkt)
1106 int i, n;
1108 n = GBIT32(pkt);
1109 PBIT16(pkt+5, f->tag);
1110 switch(f->type){
1111 case Tversion:
1112 case Rversion:
1113 restring(pkt, n, f->version);
1114 break;
1115 case Tauth:
1116 PBIT32(pkt+7, f->afid);
1117 restring(pkt, n, f->uname);
1118 restring(pkt, n, f->aname);
1119 break;
1120 case Tflush:
1121 PBIT16(pkt+7, f->oldtag);
1122 break;
1123 case Tattach:
1124 restring(pkt, n, f->uname);
1125 restring(pkt, n, f->aname);
1126 PBIT32(pkt+7, f->fid);
1127 PBIT32(pkt+11, f->afid);
1128 break;
1129 case Twalk:
1130 PBIT32(pkt+7, f->fid);
1131 PBIT32(pkt+11, f->newfid);
1132 for(i=0; i<f->nwname; i++)
1133 restring(pkt, n, f->wname[i]);
1134 break;
1135 case Tcreate:
1136 restring(pkt, n, f->name);
1137 /* fall through */
1138 case Topen:
1139 case Tread:
1140 case Twrite:
1141 case Tclunk:
1142 case Tremove:
1143 case Tstat:
1144 case Twstat:
1145 PBIT32(pkt+7, f->fid);
1146 break;
1147 case Rerror:
1148 restring(pkt, n, f->ename);
1149 break;
1153 #ifdef _LIBC_H_
1154 /* unix select-based polling */
1155 struct Ioproc
1157 Channel *c;
1158 Ioproc *next;
1159 int index;
1162 Ioproc*
1163 ioproc(void)
1165 return (Ioproc*)-1;
1168 void
1169 closeioproc(Ioproc *io)
1173 long
1174 ioread(Ioproc *io, int fd, void *v, long n)
1176 USED(io);
1178 return threadread(fd, v, n);
1181 long
1182 ioreadn(Ioproc *io, int fd, void *v, long n)
1184 long tot, m;
1185 uchar *u;
1187 u = v;
1188 for(tot=0; tot<n; tot+=m){
1189 m = ioread(io, fd, u+tot, n-tot);
1190 if(m <= 0){
1191 if(tot)
1192 break;
1193 return m;
1196 return tot;
1199 int
1200 iorecvfd(Ioproc *io, int fd)
1202 int r;
1204 threadfdnoblock(fd);
1205 while((r=recvfd(fd)) < 0){
1206 if(errno == EINTR)
1207 continue;
1208 if(errno == EWOULDBLOCK || errno == EAGAIN){
1209 threadfdwait(fd, 'r');
1210 continue;
1212 break;
1214 return r;
1217 int
1218 iosendfd(Ioproc *io, int s, int fd)
1220 int r;
1222 threadfdnoblock(s);
1223 while((r=sendfd(s, fd)) < 0){
1224 if(errno == EINTR)
1225 continue;
1226 if(errno == EWOULDBLOCK || errno == EAGAIN){
1227 threadfdwait(fd, 'w');
1228 continue;
1230 break;
1232 return r;
1235 static long
1236 _iowrite(Ioproc *io, int fd, void *v, long n)
1238 USED(io);
1239 return threadwrite(fd, v, n);
1242 long
1243 iowrite(Ioproc *io, int fd, void *v, long n)
1245 long tot, m;
1246 uchar *u;
1248 u = v;
1249 for(tot=0; tot<n; tot+=m){
1250 m = _iowrite(io, fd, u+tot, n-tot);
1251 if(m < 0){
1252 if(tot)
1253 break;
1254 return m;
1257 return tot;
1260 int
1261 iolisten(Ioproc *io, char *dir, char *ndir)
1263 int fd;
1264 int r;
1265 extern int _p9netfd(char*);
1266 USED(io);
1268 if((fd = _p9netfd(dir)) < 0)
1269 return -1;
1270 threadfdnoblock(fd);
1271 while((r=listen(dir, ndir)) < 0){
1272 if(errno == EINTR)
1273 continue;
1274 if(errno == EWOULDBLOCK || errno == EAGAIN){
1275 threadfdwait(fd, 'r');
1276 continue;
1278 break;
1280 return r;
1283 int
1284 ioaccept(Ioproc *io, int fd, char *dir)
1286 int r;
1287 USED(io);
1289 threadfdnoblock(fd);
1290 while((r=accept(fd, dir)) < 0){
1291 if(errno == EINTR)
1292 continue;
1293 if(errno == EWOULDBLOCK || errno == EAGAIN){
1294 threadfdwait(fd, 'r');
1295 continue;
1297 break;
1299 return r;
1302 #else
1303 /* real plan 9 io procs */
1304 static long
1305 _iolisten(va_list *arg)
1307 char *a, *b;
1309 a = va_arg(*arg, char*);
1310 b = va_arg(*arg, char*);
1311 return listen(a, b);
1314 int
1315 iolisten(Ioproc *io, char *a, char *b)
1317 return iocall(io, _iolisten, a, b);
1320 static long
1321 _ioaccept(va_list *arg)
1323 int fd;
1324 char *dir;
1326 fd = va_arg(*arg, int);
1327 dir = va_arg(*arg, char*);
1328 return accept(fd, dir);
1331 int
1332 ioaccept(Ioproc *io, int fd, char *dir)
1334 return iocall(io, _ioaccept, fd, dir);
1336 #endif