Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <fcall.h>
4 #include <thread.h>
5 #include <poll.h>
6 #include <errno.h>
8 enum
9 {
10 STACK = 32768,
11 NHASH = 31,
12 MAXMSG = 64, /* per connection */
13 };
15 typedef struct Hash Hash;
16 typedef struct Fid Fid;
17 typedef struct Msg Msg;
18 typedef struct Conn Conn;
19 typedef struct Queue Queue;
21 struct Hash
22 {
23 Hash *next;
24 uint n;
25 void *v;
26 };
28 struct Fid
29 {
30 int fid;
31 int ref;
32 int cfid;
33 Fid *next;
34 };
36 struct Msg
37 {
38 Conn *c;
39 int internal;
40 int ref;
41 int ctag;
42 int tag;
43 int isopenfd;
44 Fcall tx;
45 Fcall rx;
46 Fid *fid;
47 Fid *newfid;
48 Fid *afid;
49 Msg *oldm;
50 Msg *next;
51 uchar *tpkt;
52 uchar *rpkt;
53 };
55 struct Conn
56 {
57 int fd;
58 int fdmode;
59 Fid *fdfid;
60 int nmsg;
61 int nfid;
62 Channel *inc;
63 Channel *internal;
64 int inputstalled;
65 char dir[40];
66 Hash *tag[NHASH];
67 Hash *fid[NHASH];
68 Queue *outq;
69 Queue *inq;
70 };
72 char *addr;
73 int afd;
74 char adir[40];
75 int isunix;
76 Queue *outq;
77 Queue *inq;
78 int verbose;
80 void *gethash(Hash**, uint);
81 int puthash(Hash**, uint, void*);
82 int delhash(Hash**, uint, void*);
83 Msg *mread9p(Ioproc*, int);
84 int mwrite9p(Ioproc*, int, uchar*);
85 uchar *read9ppkt(Ioproc*, int);
86 int write9ppkt(int, uchar*);
87 Msg *msgnew(void);
88 void msgput(Msg*);
89 Msg *msgget(int);
90 Fid *fidnew(int);
91 void fidput(Fid*);
92 void *emalloc(int);
93 void *erealloc(void*, int);
94 Queue *qalloc(void);
95 int sendq(Queue*, void*);
96 void *recvq(Queue*);
97 void pollthread(void*);
98 void connthread(void*);
99 void connoutthread(void*);
100 void listenthread(void*);
101 void outputthread(void*);
102 void inputthread(void*);
103 void rewritehdr(Fcall*, uchar*);
104 int tlisten(char*, char*);
105 int taccept(int, char*);
106 int iolisten(Ioproc*, char*, char*);
107 int ioaccept(Ioproc*, int, char*);
108 int iorecvfd(Ioproc*, int);
109 int iosendfd(Ioproc*, int, int);
110 void mainproc(void*);
111 int ignorepipe(void*, char*);
113 void
114 usage(void)
116 fprint(2, "usage: 9pserve [-s service] [-u] address\n");
117 fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
118 exits("usage");
121 uchar vbuf[128];
122 extern int _threaddebuglevel;
123 void
124 threadmain(int argc, char **argv)
126 char *file;
128 if(verbose) fprint(2, "9pserve running\n");
129 ARGBEGIN{
130 default:
131 usage();
132 case 'v':
133 verbose++;
134 break;
135 case 's':
136 close(0);
137 if(open(file=EARGF(usage()), ORDWR) != 0)
138 sysfatal("open %s: %r", file);
139 dup(0, 1);
140 break;
141 case 'u':
142 isunix = 1;
143 break;
144 }ARGEND
146 if(argc != 1)
147 usage();
148 addr = argv[0];
150 if((afd = announce(addr, adir)) < 0)
151 sysfatal("announce %s: %r", addr);
153 proccreate(mainproc, nil, STACK);
154 threadexits(0);
157 void
158 mainproc(void *v)
160 int n;
161 Fcall f;
162 USED(v);
164 yield(); /* let threadmain exit */
166 atnotify(ignorepipe, 1);
167 fmtinstall('D', dirfmt);
168 fmtinstall('M', dirmodefmt);
169 fmtinstall('F', fcallfmt);
170 fmtinstall('H', encodefmt);
172 outq = qalloc();
173 inq = qalloc();
175 f.type = Tversion;
176 f.version = "9P2000";
177 f.msize = 8192;
178 f.tag = NOTAG;
179 n = convS2M(&f, vbuf, sizeof vbuf);
180 if(verbose > 1) fprint(2, "* <- %F\n", &f);
181 write(1, vbuf, n);
182 n = read9pmsg(0, vbuf, sizeof vbuf);
183 if(convM2S(vbuf, n, &f) != n)
184 sysfatal("convM2S failure");
185 if(verbose > 1) fprint(2, "* -> %F\n", &f);
187 threadcreate(inputthread, nil, STACK);
188 threadcreate(outputthread, nil, STACK);
189 threadcreate(listenthread, nil, STACK);
190 threadcreateidle(pollthread, nil, STACK);
191 threadexits(0);
194 int
195 ignorepipe(void *v, char *s)
197 USED(v);
198 if(strcmp(s, "sys: write on closed pipe") == 0)
199 return 1;
200 fprint(2, "msg: %s\n", s);
201 return 0;
204 void
205 listenthread(void *arg)
207 Conn *c;
208 Ioproc *io;
210 io = ioproc();
211 USED(arg);
212 for(;;){
213 c = emalloc(sizeof(Conn));
214 c->fd = iolisten(io, adir, c->dir);
215 if(c->fd < 0){
216 if(verbose) fprint(2, "listen: %r\n");
217 close(afd);
218 free(c);
219 return;
221 c->inc = chancreate(sizeof(void*), 0);
222 c->internal = chancreate(sizeof(void*), 0);
223 c->inq = qalloc();
224 c->outq = qalloc();
225 if(verbose) fprint(2, "incoming call on %s\n", c->dir);
226 threadcreate(connthread, c, STACK);
230 void
231 send9pmsg(Msg *m)
233 int n, nn;
235 n = sizeS2M(&m->rx);
236 m->rpkt = emalloc(n);
237 nn = convS2M(&m->rx, m->rpkt, n);
238 if(nn != n)
239 sysfatal("sizeS2M + convS2M disagree");
240 sendq(m->c->outq, m);
243 void
244 sendomsg(Msg *m)
246 int n, nn;
248 n = sizeS2M(&m->tx);
249 m->tpkt = emalloc(n);
250 nn = convS2M(&m->tx, m->tpkt, n);
251 if(nn != n)
252 sysfatal("sizeS2M + convS2M disagree");
253 sendq(outq, m);
256 void
257 err(Msg *m, char *ename)
259 m->rx.type = Rerror;
260 m->rx.ename = ename;
261 m->rx.tag = m->tx.tag;
262 send9pmsg(m);
265 void
266 connthread(void *arg)
268 int i, fd;
269 Conn *c;
270 Hash *h;
271 Msg *m, *om;
272 Fid *f;
273 Ioproc *io;
275 c = arg;
276 io = ioproc();
277 fd = ioaccept(io, c->fd, c->dir);
278 if(fd < 0){
279 if(verbose) fprint(2, "accept %s: %r\n", c->dir);
280 goto out;
282 close(c->fd);
283 c->fd = fd;
284 threadcreate(connoutthread, c, STACK);
285 while((m = mread9p(io, c->fd)) != nil){
286 if(verbose > 1) fprint(2, "fd#%d -> %F\n", c->fd, &m->tx);
287 m->c = c;
288 m->ctag = m->tx.tag;
289 c->nmsg++;
290 if(puthash(c->tag, m->tx.tag, m) < 0){
291 err(m, "duplicate tag");
292 continue;
294 m->ref++;
295 switch(m->tx.type){
296 case Tversion:
297 m->rx.tag = m->tx.tag;
298 m->rx.msize = m->tx.msize;
299 if(m->rx.msize > 8192)
300 m->rx.msize = 8192;
301 m->rx.version = "9P2000";
302 m->rx.type = Rversion;
303 send9pmsg(m);
304 continue;
305 case Tflush:
306 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
307 m->rx.tag = m->tx.tag;
308 m->rx.type = Rflush;
309 send9pmsg(m);
310 continue;
312 m->oldm->ref++;
313 break;
314 case Tattach:
315 m->afid = nil;
316 if(m->tx.afid != NOFID
317 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){
318 err(m, "unknown fid");
319 continue;
321 m->fid = fidnew(m->tx.fid);
322 if(puthash(c->fid, m->tx.fid, m->fid) < 0){
323 err(m, "duplicate fid");
324 continue;
326 m->fid->ref++;
327 break;
328 case Twalk:
329 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
330 err(m, "unknown fid");
331 continue;
333 m->fid->ref++;
334 if(m->tx.newfid == m->tx.fid){
335 m->fid->ref++;
336 m->newfid = m->fid;
337 }else{
338 m->newfid = fidnew(m->tx.newfid);
339 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
340 err(m, "duplicate fid");
341 continue;
343 m->newfid->ref++;
345 break;
346 case Tauth:
347 m->afid = fidnew(m->tx.afid);
348 if(puthash(c->fid, m->tx.afid, m->afid) < 0){
349 err(m, "duplicate fid");
350 continue;
352 m->afid->ref++;
353 break;
354 case Topenfd:
355 if(m->tx.mode&~(OTRUNC|3)){
356 err(m, "bad openfd mode");
357 continue;
359 m->isopenfd = 1;
360 m->tx.type = Topen;
361 m->tpkt[4] = Topen;
362 /* fall through */
363 case Tcreate:
364 case Topen:
365 case Tclunk:
366 case Tread:
367 case Twrite:
368 case Tremove:
369 case Tstat:
370 case Twstat:
371 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
372 err(m, "unknown fid");
373 continue;
375 m->fid->ref++;
376 break;
379 /* have everything - translate and send */
380 m->c = c;
381 m->ctag = m->tx.tag;
382 m->tx.tag = m->tag;
383 if(m->fid)
384 m->tx.fid = m->fid->fid;
385 if(m->newfid)
386 m->tx.newfid = m->newfid->fid;
387 if(m->afid)
388 m->tx.afid = m->afid->fid;
389 if(m->oldm)
390 m->tx.oldtag = m->oldm->tag;
391 /* reference passes to outq */
392 sendq(outq, m);
393 while(c->nmsg >= MAXMSG){
394 c->inputstalled = 1;
395 recvp(c->inc);
399 if(verbose) fprint(2, "%s eof\n", c->dir);
401 /* flush all outstanding messages */
402 for(i=0; i<NHASH; i++){
403 for(h=c->tag[i]; h; h=h->next){
404 om = h->v;
405 m = msgnew();
406 m->internal = 1;
407 m->c = c;
408 c->nmsg++;
409 m->tx.type = Tflush;
410 m->tx.tag = m->tag;
411 m->tx.oldtag = om->tag;
412 m->oldm = om;
413 om->ref++;
414 m->ref++; /* for outq */
415 sendomsg(m);
416 recvp(c->internal);
417 msgput(m); /* got from recvp */
418 msgput(m); /* got from msgnew */
419 msgput(om); /* got from hash table */
423 /* clunk all outstanding fids */
424 for(i=0; i<NHASH; i++){
425 for(h=c->fid[i]; h; h=h->next){
426 f = h->v;
427 m = msgnew();
428 m->internal = 1;
429 m->c = c;
430 c->nmsg++;
431 m->tx.type = Tclunk;
432 m->tx.tag = m->tag;
433 m->tx.fid = f->fid;
434 m->fid = f;
435 f->ref++;
436 m->ref++;
437 sendomsg(m);
438 recvp(c->internal);
439 msgput(m); /* got from recvp */
440 msgput(m); /* got from msgnew */
441 fidput(f); /* got from hash table */
445 out:
446 assert(c->nmsg == 0);
447 assert(c->nfid == 0);
448 close(c->fd);
449 chanfree(c->internal);
450 c->internal = 0;
451 chanfree(c->inc);
452 c->inc = 0;
453 free(c->inq);
454 c->inq = 0;
455 free(c->outq);
456 c->outq = 0;
457 free(c);
460 static void
461 openfdthread(void *v)
463 Conn *c;
464 Fid *fid;
465 Msg *m;
466 int n;
467 vlong tot;
468 Ioproc *io;
469 char buf[1024];
471 c = v;
472 fid = c->fdfid;
473 io = ioproc();
475 tot = 0;
476 if(c->fdmode == OREAD){
477 for(;;){
478 if(verbose) fprint(2, "tread...");
479 m = msgnew();
480 m->internal = 1;
481 m->c = c;
482 m->tx.type = Tread;
483 m->tx.count = 8192;
484 m->tx.fid = fid->fid;
485 m->tx.tag = m->tag;
486 m->tx.offset = tot;
487 m->fid = fid;
488 fid->ref++;
489 m->ref++;
490 sendomsg(m);
491 recvp(c->internal);
492 if(m->rx.type == Rerror){
493 // fprint(2, "read error: %s\n", m->rx.ename);
494 break;
496 if(m->rx.count == 0)
497 break;
498 tot += m->rx.count;
499 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
500 fprint(2, "pipe write error: %r\n");
501 break;
503 msgput(m);
504 msgput(m);
506 }else{
507 for(;;){
508 if(verbose) fprint(2, "twrite...");
509 if((n=ioread(io, c->fd, buf, sizeof buf)) <= 0){
510 if(n < 0)
511 fprint(2, "pipe read error: %r\n");
512 m = nil;
513 break;
515 m = msgnew();
516 m->internal = 1;
517 m->c = c;
518 m->tx.type = Twrite;
519 m->tx.fid = fid->fid;
520 m->tx.data = buf;
521 m->tx.count = n;
522 m->tx.tag = m->tag;
523 m->tx.offset = tot;
524 m->fid = fid;
525 fid->ref++;
526 m->ref++;
527 sendomsg(m);
528 recvp(c->internal);
529 if(m->rx.type == Rerror){
530 // fprint(2, "write error: %s\n", m->rx.ename);
531 continue;
533 tot = n;
534 msgput(m);
535 msgput(m);
538 if(verbose) fprint(2, "eof on %d fid %d\n", c->fd, fid->fid);
539 close(c->fd);
540 closeioproc(io);
541 if(m){
542 msgput(m);
543 msgput(m);
545 if(fid->ref == 1){
546 m = msgnew();
547 m->internal = 1;
548 m->c = c;
549 m->tx.type = Tclunk;
550 m->tx.fid = fid->fid;
551 m->fid = fid;
552 fid->ref++;
553 m->ref++;
554 sendomsg(m);
555 recvp(c->internal);
556 msgput(m);
557 msgput(m);
559 fidput(fid);
560 c->fdfid = nil;
561 chanfree(c->internal);
562 c->internal = 0;
563 free(c);
566 int
567 xopenfd(Msg *m)
569 char errs[ERRMAX];
570 int n, p[2];
571 Conn *nc;
573 if(pipe(p) < 0){
574 rerrstr(errs, sizeof errs);
575 err(m, errs);
577 if(verbose) fprint(2, "xopen pipe %d %d...", p[0], p[1]);
579 /* now we're committed. */
581 /* a new connection for this fid */
582 nc = emalloc(sizeof(Conn));
583 nc->internal = chancreate(sizeof(void*), 0);
585 /* a ref for us */
586 nc->fdfid = m->fid;
587 m->fid->ref++;
588 nc->fdmode = m->tx.mode;
589 nc->fd = p[0];
591 /* a thread to tend the pipe */
592 threadcreate(openfdthread, nc, STACK);
594 /* if mode is ORDWR, that openfdthread will write; start a reader */
595 if((m->tx.mode&3) == ORDWR){
596 nc = emalloc(sizeof(Conn));
597 nc->internal = chancreate(sizeof(void*), 0);
598 nc->fdfid = m->fid;
599 m->fid->ref++;
600 nc->fdmode = OREAD;
601 nc->fd = dup(p[0], -1);
602 threadcreate(openfdthread, nc, STACK);
605 /* steal fid from other connection */
606 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
607 fidput(m->fid);
609 /* rewrite as Ropenfd */
610 m->rx.type = Ropenfd;
611 n = GBIT32(m->rpkt);
612 m->rpkt = erealloc(m->rpkt, n+4);
613 PBIT32(m->rpkt+n, p[1]);
614 n += 4;
615 PBIT32(m->rpkt, n);
616 m->rpkt[4] = Ropenfd;
617 m->rx.unixfd = p[1];
618 return 0;
621 void
622 connoutthread(void *arg)
624 int err;
625 Conn *c;
626 Msg *m, *om;
627 Ioproc *io;
629 c = arg;
630 io = ioproc();
631 while((m = recvq(c->outq)) != nil){
632 err = m->tx.type+1 != m->rx.type;
633 if(!err && m->isopenfd)
634 if(xopenfd(m) < 0)
635 continue;
636 switch(m->tx.type){
637 case Tflush:
638 om = m->oldm;
639 if(om)
640 if(delhash(om->c->tag, om->ctag, om) == 0)
641 msgput(om);
642 break;
643 case Tclunk:
644 case Tremove:
645 if(m->fid)
646 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
647 fidput(m->fid);
648 break;
649 case Tauth:
650 if(err && m->afid){
651 fprint(2, "auth error\n");
652 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
653 fidput(m->fid);
655 break;
656 case Tattach:
657 if(err && m->fid)
658 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
659 fidput(m->fid);
660 break;
661 case Twalk:
662 if(err && m->tx.fid != m->tx.newfid && m->newfid)
663 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
664 fidput(m->newfid);
665 break;
667 if(delhash(m->c->tag, m->ctag, m) == 0)
668 msgput(m);
669 if(verbose > 1) fprint(2, "fd#%d <- %F\n", c->fd, &m->rx);
670 rewritehdr(&m->rx, m->rpkt);
671 if(mwrite9p(io, c->fd, m->rpkt) < 0)
672 if(verbose) fprint(2, "write error: %r\n");
673 msgput(m);
674 if(c->inputstalled && c->nmsg < MAXMSG)
675 nbsendp(c->inc, 0);
677 closeioproc(io);
680 void
681 outputthread(void *arg)
683 Msg *m;
684 Ioproc *io;
686 USED(arg);
687 io = ioproc();
688 while((m = recvq(outq)) != nil){
689 if(verbose > 1) fprint(2, "* <- %F\n", &m->tx);
690 rewritehdr(&m->tx, m->tpkt);
691 if(mwrite9p(io, 1, m->tpkt) < 0)
692 sysfatal("output error: %r");
693 msgput(m);
695 closeioproc(io);
696 fprint(2, "output eof\n");
697 threadexitsall(0);
700 void
701 inputthread(void *arg)
703 uchar *pkt;
704 int n, nn, tag;
705 Msg *m;
706 Ioproc *io;
708 if(verbose) fprint(2, "input thread\n");
709 io = ioproc();
710 USED(arg);
711 while((pkt = read9ppkt(io, 0)) != nil){
712 n = GBIT32(pkt);
713 if(n < 7){
714 fprint(2, "short 9P packet from server\n");
715 free(pkt);
716 continue;
718 if(verbose > 2) fprint(2, "read %.*H\n", n, pkt);
719 tag = GBIT16(pkt+5);
720 if((m = msgget(tag)) == nil){
721 fprint(2, "unexpected 9P response tag %d\n", tag);
722 free(pkt);
723 continue;
725 if((nn = convM2S(pkt, n, &m->rx)) != n){
726 fprint(2, "bad packet - convM2S %d but %d\n", nn, n);
727 free(pkt);
728 msgput(m);
729 continue;
731 if(verbose > 1) fprint(2, "* -> %F\n", &m->rx);
732 m->rpkt = pkt;
733 m->rx.tag = m->ctag;
734 if(m->internal)
735 sendp(m->c->internal, 0);
736 else
737 sendq(m->c->outq, m);
739 closeioproc(io);
740 fprint(2, "input eof\n");
741 threadexitsall(0);
744 void*
745 gethash(Hash **ht, uint n)
747 Hash *h;
749 for(h=ht[n%NHASH]; h; h=h->next)
750 if(h->n == n)
751 return h->v;
752 return nil;
755 int
756 delhash(Hash **ht, uint n, void *v)
758 Hash *h, **l;
760 for(l=&ht[n%NHASH]; h=*l; l=&h->next)
761 if(h->n == n){
762 if(h->v != v){
763 if(verbose) fprint(2, "delhash %d got %p want %p\n", n, h->v, v);
764 return -1;
766 *l = h->next;
767 free(h);
768 return 0;
770 return -1;
773 int
774 puthash(Hash **ht, uint n, void *v)
776 Hash *h;
778 if(gethash(ht, n))
779 return -1;
780 h = emalloc(sizeof(Hash));
781 h->next = ht[n%NHASH];
782 h->n = n;
783 h->v = v;
784 ht[n%NHASH] = h;
785 return 0;
788 Fid **fidtab;
789 int nfidtab;
790 Fid *freefid;
792 Fid*
793 fidnew(int cfid)
795 Fid *f;
797 if(freefid == nil){
798 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
799 fidtab[nfidtab] = emalloc(sizeof(Fid));
800 freefid = fidtab[nfidtab];
801 freefid->fid = nfidtab++;
803 f = freefid;
804 freefid = f->next;
805 f->cfid = cfid;
806 f->ref = 1;
807 return f;
810 void
811 fidput(Fid *f)
813 if(f == nil)
814 return;
815 assert(f->ref > 0);
816 if(--f->ref > 0)
817 return;
818 f->next = freefid;
819 f->cfid = -1;
820 freefid = f;
823 Msg **msgtab;
824 int nmsgtab;
825 Msg *freemsg;
827 Msg*
828 msgnew(void)
830 Msg *m;
832 if(freemsg == nil){
833 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
834 msgtab[nmsgtab] = emalloc(sizeof(Msg));
835 freemsg = msgtab[nmsgtab];
836 freemsg->tag = nmsgtab++;
838 m = freemsg;
839 freemsg = m->next;
840 m->ref = 1;
841 return m;
844 void
845 msgput(Msg *m)
847 if(verbose > 2) fprint(2, "msgput tag %d/%d ref %d\n", m->tag, m->ctag, m->ref);
848 assert(m->ref > 0);
849 if(--m->ref > 0)
850 return;
851 m->c->nmsg--;
852 m->c = nil;
853 fidput(m->fid);
854 m->fid = nil;
855 fidput(m->afid);
856 m->afid = nil;
857 fidput(m->newfid);
858 m->newfid = nil;
859 free(m->tpkt);
860 m->tpkt = nil;
861 free(m->rpkt);
862 m->rpkt = nil;
863 if(m->rx.type == Ropenfd)
864 close(m->rx.unixfd);
865 m->rx.unixfd = -1;
866 m->isopenfd = 0;
867 m->internal = 0;
868 m->next = freemsg;
869 freemsg = m;
872 Msg*
873 msgget(int n)
875 Msg *m;
877 if(n < 0 || n >= nmsgtab)
878 return nil;
879 m = msgtab[n];
880 if(m->ref == 0)
881 return nil;
882 if(verbose) fprint(2, "msgget %d = %p\n", n, m);
883 m->ref++;
884 return m;
888 void*
889 emalloc(int n)
891 void *v;
893 v = mallocz(n, 1);
894 if(v == nil){
895 abort();
896 sysfatal("out of memory allocating %d", n);
898 return v;
901 void*
902 erealloc(void *v, int n)
904 v = realloc(v, n);
905 if(v == nil){
906 abort();
907 sysfatal("out of memory reallocating %d", n);
909 return v;
912 typedef struct Qel Qel;
913 struct Qel
915 Qel *next;
916 void *p;
917 };
919 struct Queue
921 int hungup;
922 QLock lk;
923 Rendez r;
924 Qel *head;
925 Qel *tail;
926 };
928 Queue*
929 qalloc(void)
931 Queue *q;
933 q = mallocz(sizeof(Queue), 1);
934 if(q == nil)
935 return nil;
936 q->r.l = &q->lk;
937 return q;
940 int
941 sendq(Queue *q, void *p)
943 Qel *e;
945 e = emalloc(sizeof(Qel));
946 qlock(&q->lk);
947 if(q->hungup){
948 werrstr("hungup queue");
949 qunlock(&q->lk);
950 return -1;
952 e->p = p;
953 e->next = nil;
954 if(q->head == nil)
955 q->head = e;
956 else
957 q->tail->next = e;
958 q->tail = e;
959 rwakeup(&q->r);
960 qunlock(&q->lk);
961 return 0;
964 void*
965 recvq(Queue *q)
967 void *p;
968 Qel *e;
970 qlock(&q->lk);
971 while(q->head == nil && !q->hungup)
972 rsleep(&q->r);
973 if(q->hungup){
974 qunlock(&q->lk);
975 return nil;
977 e = q->head;
978 q->head = e->next;
979 qunlock(&q->lk);
980 p = e->p;
981 free(e);
982 return p;
985 uchar*
986 read9ppkt(Ioproc *io, int fd)
988 uchar buf[4], *pkt;
989 int n, nn;
991 n = ioreadn(io, fd, buf, 4);
992 if(n != 4)
993 return nil;
994 n = GBIT32(buf);
995 pkt = emalloc(n);
996 PBIT32(pkt, n);
997 nn = ioreadn(io, fd, pkt+4, n-4);
998 if(nn != n-4){
999 free(pkt);
1000 return nil;
1002 /* would do this if we ever got one of these, but we only generate them
1003 if(pkt[4] == Ropenfd){
1004 newfd = iorecvfd(io, fd);
1005 PBIT32(pkt+n-4, newfd);
1008 return pkt;
1011 Msg*
1012 mread9p(Ioproc *io, int fd)
1014 int n, nn;
1015 uchar *pkt;
1016 Msg *m;
1018 if((pkt = read9ppkt(io, fd)) == nil)
1019 return nil;
1021 m = msgnew();
1022 m->tpkt = pkt;
1023 n = GBIT32(pkt);
1024 nn = convM2S(pkt, n, &m->tx);
1025 if(nn != n){
1026 fprint(2, "read bad packet from %d\n", fd);
1027 return nil;
1029 return m;
1032 int
1033 mwrite9p(Ioproc *io, int fd, uchar *pkt)
1035 int n, nfd;
1037 n = GBIT32(pkt);
1038 if(verbose > 2) fprint(2, "write %d %d %.*H\n", fd, n, n, pkt);
1039 if(iowrite(io, fd, pkt, n) != n){
1040 fprint(2, "write error: %r\n");
1041 return -1;
1043 if(pkt[4] == Ropenfd){
1044 nfd = GBIT32(pkt+n-4);
1045 if(iosendfd(io, fd, nfd) < 0){
1046 fprint(2, "send fd error: %r\n");
1047 return -1;
1050 return 0;
1053 void
1054 restring(uchar *pkt, int pn, char *s)
1056 int n;
1058 if(s < (char*)pkt || s >= (char*)pkt+pn)
1059 return;
1061 n = strlen(s);
1062 memmove(s+1, s, n);
1063 PBIT16((uchar*)s-1, n);
1066 void
1067 rewritehdr(Fcall *f, uchar *pkt)
1069 int i, n;
1071 n = GBIT32(pkt);
1072 PBIT16(pkt+5, f->tag);
1073 switch(f->type){
1074 case Tversion:
1075 case Rversion:
1076 restring(pkt, n, f->version);
1077 break;
1078 case Tauth:
1079 PBIT32(pkt+7, f->afid);
1080 restring(pkt, n, f->uname);
1081 restring(pkt, n, f->aname);
1082 break;
1083 case Tflush:
1084 PBIT16(pkt+7, f->oldtag);
1085 break;
1086 case Tattach:
1087 restring(pkt, n, f->uname);
1088 restring(pkt, n, f->aname);
1089 PBIT32(pkt+7, f->fid);
1090 PBIT32(pkt+11, f->afid);
1091 break;
1092 case Twalk:
1093 PBIT32(pkt+7, f->fid);
1094 PBIT32(pkt+11, f->newfid);
1095 for(i=0; i<f->nwname; i++)
1096 restring(pkt, n, f->wname[i]);
1097 break;
1098 case Tcreate:
1099 restring(pkt, n, f->name);
1100 /* fall through */
1101 case Topen:
1102 case Tread:
1103 case Twrite:
1104 case Tclunk:
1105 case Tremove:
1106 case Tstat:
1107 case Twstat:
1108 PBIT32(pkt+7, f->fid);
1109 break;
1110 case Rerror:
1111 restring(pkt, n, f->ename);
1112 break;
1116 #ifdef _LIB9_H_
1117 /* unix select-based polling */
1118 struct Ioproc
1120 Channel *c;
1121 Ioproc *next;
1122 int index;
1125 static struct Ioproc **pio;
1126 static struct pollfd *pfd;
1127 static int npfd;
1128 static struct Ioproc *iofree;
1130 Ioproc*
1131 ioproc(void)
1133 Ioproc *io;
1135 if(iofree == nil){
1136 pfd = erealloc(pfd, (npfd+1)*sizeof(pfd[0]));
1137 pfd[npfd].events = 0;
1138 pfd[npfd].fd = -1;
1139 iofree = emalloc(sizeof(Ioproc));
1140 iofree->index = npfd;
1141 iofree->c = chancreate(sizeof(ulong), 1);
1142 pio = erealloc(pio, (npfd+1)*sizeof(pio[0]));
1143 pio[npfd] = iofree;
1144 npfd++;
1146 io = iofree;
1147 iofree = io->next;
1148 return io;
1151 void
1152 closeioproc(Ioproc *io)
1154 io->next = iofree;
1155 iofree = io;
1158 void
1159 pollthread(void *v)
1161 int i, n;
1163 for(;;){
1164 yield();
1165 for(i=0; i<npfd; i++)
1166 pfd[i].revents = 0;
1167 if(verbose){
1168 fprint(2, "poll:");
1169 for(i=0; i<npfd; i++)
1170 if(pfd[i].events)
1171 fprint(2, " %d%c", pfd[i].fd, pfd[i].events==POLLIN ? 'r' : pfd[i].events==POLLOUT ? 'w' : '?');
1172 fprint(2, "\n");
1174 n = poll(pfd, npfd, -1);
1175 if(n <= 0)
1176 continue;
1177 for(i=0; i<npfd; i++)
1178 if(pfd[i].fd != -1 && pfd[i].revents){
1179 pfd[i].fd = -1;
1180 pfd[i].events = 0;
1181 pfd[i].revents = 0;
1182 nbsendul(pio[i]->c, 1);
1187 static void
1188 noblock(int fd)
1190 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0)|O_NONBLOCK);
1193 static void
1194 xwait(Ioproc *io, int fd, int e)
1196 if(verbose) fprint(2, "wait for %d%c\n", fd, e==POLLIN ? 'r' : 'w');
1197 pfd[io->index].fd = fd;
1198 pfd[io->index].events = e;
1199 recvul(io->c);
1200 if(verbose) fprint(2, "got %d\n", fd);
1203 static void
1204 rwait(Ioproc *io, int fd)
1206 xwait(io, fd, POLLIN);
1209 static void
1210 wwait(Ioproc *io, int fd)
1212 xwait(io, fd, POLLOUT);
1215 long
1216 ioread(Ioproc *io, int fd, void *v, long n)
1218 long r;
1219 USED(io);
1221 noblock(fd);
1222 while((r=read(fd, v, n)) < 0 && errno == EWOULDBLOCK)
1223 rwait(io, fd);
1224 return r;
1227 long
1228 ioreadn(Ioproc *io, int fd, void *v, long n)
1230 long tot, m;
1231 uchar *u;
1233 u = v;
1234 for(tot=0; tot<n; tot+=m){
1235 m = ioread(io, fd, u+tot, n-tot);
1236 if(m <= 0){
1237 if(tot)
1238 break;
1239 return m;
1242 return tot;
1245 int
1246 iorecvfd(Ioproc *io, int fd)
1248 int r;
1250 noblock(fd);
1251 while((r=recvfd(fd)) < 0 && errno == EWOULDBLOCK)
1252 rwait(io, fd);
1253 return r;
1256 int
1257 iosendfd(Ioproc *io, int s, int fd)
1259 int r;
1261 noblock(s);
1262 while((r=sendfd(s, fd)) < 0 && errno == EWOULDBLOCK)
1263 wwait(io, s);
1264 if(r < 0) fprint(2, "sent %d, %d\n", s, fd);
1265 return r;
1268 static long
1269 _iowrite(Ioproc *io, int fd, void *v, long n)
1271 long r;
1272 USED(io);
1274 noblock(fd);
1275 while((r=write(fd, v, n)) < 0 && errno == EWOULDBLOCK)
1276 wwait(io, fd);
1277 return r;
1280 long
1281 iowrite(Ioproc *io, int fd, void *v, long n)
1283 long tot, m;
1284 uchar *u;
1286 u = v;
1287 for(tot=0; tot<n; tot+=m){
1288 m = _iowrite(io, fd, u+tot, n-tot);
1289 if(m < 0){
1290 if(tot)
1291 break;
1292 return m;
1295 return tot;
1298 int
1299 iolisten(Ioproc *io, char *dir, char *ndir)
1301 int fd;
1302 int r;
1303 extern int _p9netfd(char*);
1304 USED(io);
1306 if((fd = _p9netfd(dir)) < 0)
1307 return -1;
1308 noblock(fd);
1309 while((r=listen(dir, ndir)) < 0 && errno == EWOULDBLOCK)
1310 rwait(io, fd);
1311 return r;
1314 int
1315 ioaccept(Ioproc *io, int fd, char *dir)
1317 int r;
1318 USED(io);
1320 noblock(fd);
1321 while((r=accept(fd, dir)) < 0 && errno == EWOULDBLOCK)
1322 rwait(io, fd);
1323 return r;
1326 #else
1327 /* real plan 9 io procs */
1328 static long
1329 _iolisten(va_list *arg)
1331 char *a, *b;
1333 a = va_arg(*arg, char*);
1334 b = va_arg(*arg, char*);
1335 return listen(a, b);
1338 int
1339 iolisten(Ioproc *io, char *a, char *b)
1341 return iocall(io, _iolisten, a, b);
1344 static long
1345 _ioaccept(va_list *arg)
1347 int fd;
1348 char *dir;
1350 fd = va_arg(*arg, int);
1351 dir = va_arg(*arg, char*);
1352 return accept(fd, dir);
1355 int
1356 ioaccept(Ioproc *io, int fd, char *dir)
1358 return iocall(io, _ioaccept, fd, dir);
1360 #endif