Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <fcall.h>
4 #include <thread.h>
5 #include <poll.h>
6 #include <errno.h>
8 enum
9 {
10 STACK = 32768,
11 NHASH = 31,
12 MAXMSG = 64, /* per connection */
13 };
15 typedef struct Hash Hash;
16 typedef struct Fid Fid;
17 typedef struct Msg Msg;
18 typedef struct Conn Conn;
19 typedef struct Queue Queue;
21 struct Hash
22 {
23 Hash *next;
24 uint n;
25 void *v;
26 };
28 struct Fid
29 {
30 int fid;
31 int ref;
32 int cfid;
33 Fid *next;
34 };
36 struct Msg
37 {
38 Conn *c;
39 int internal;
40 int ref;
41 int ctag;
42 int tag;
43 int isopenfd;
44 Fcall tx;
45 Fcall rx;
46 Fid *fid;
47 Fid *newfid;
48 Fid *afid;
49 Msg *oldm;
50 Msg *next;
51 uchar *tpkt;
52 uchar *rpkt;
53 };
55 struct Conn
56 {
57 int fd;
58 int fdmode;
59 Fid *fdfid;
60 int nmsg;
61 int nfid;
62 Channel *inc;
63 Channel *internal;
64 int inputstalled;
65 char dir[40];
66 Hash *tag[NHASH];
67 Hash *fid[NHASH];
68 Queue *outq;
69 Queue *inq;
70 };
72 char *addr;
73 int afd;
74 char adir[40];
75 int isunix;
76 Queue *outq;
77 Queue *inq;
78 int verbose;
79 int msize = 8192;
81 void *gethash(Hash**, uint);
82 int puthash(Hash**, uint, void*);
83 int delhash(Hash**, uint, void*);
84 Msg *mread9p(Ioproc*, int);
85 int mwrite9p(Ioproc*, int, uchar*);
86 uchar *read9ppkt(Ioproc*, int);
87 int write9ppkt(int, uchar*);
88 Msg *msgnew(void);
89 void msgput(Msg*);
90 Msg *msgget(int);
91 Fid *fidnew(int);
92 void fidput(Fid*);
93 void *emalloc(int);
94 void *erealloc(void*, int);
95 Queue *qalloc(void);
96 int sendq(Queue*, void*);
97 void *recvq(Queue*);
98 void connthread(void*);
99 void connoutthread(void*);
100 void listenthread(void*);
101 void outputthread(void*);
102 void inputthread(void*);
103 void rewritehdr(Fcall*, uchar*);
104 int tlisten(char*, char*);
105 int taccept(int, char*);
106 int iolisten(Ioproc*, char*, char*);
107 int ioaccept(Ioproc*, int, char*);
108 int iorecvfd(Ioproc*, int);
109 int iosendfd(Ioproc*, int, int);
110 void mainproc(void*);
111 int ignorepipe(void*, char*);
113 void
114 usage(void)
116 fprint(2, "usage: 9pserve [-s service] [-u] address\n");
117 fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
118 exits("usage");
121 uchar vbuf[128];
122 extern int _threaddebuglevel;
123 void
124 threadmain(int argc, char **argv)
126 char *file;
128 ARGBEGIN{
129 default:
130 usage();
131 case 'v':
132 verbose++;
133 break;
134 case 's':
135 close(0);
136 if(open(file=EARGF(usage()), ORDWR) != 0)
137 sysfatal("open %s: %r", file);
138 dup(0, 1);
139 break;
140 case 'u':
141 isunix = 1;
142 break;
143 }ARGEND
145 if(verbose) fprint(2, "9pserve running\n");
146 if(argc != 1)
147 usage();
148 addr = argv[0];
150 if((afd = announce(addr, adir)) < 0)
151 sysfatal("announce %s: %r", addr);
153 if(verbose) fprint(2, "9pserve forking\n");
154 switch(fork()){
155 case -1:
156 sysfatal("fork: %r");
157 case 0:
158 if(verbose) fprint(2, "running mainproc\n");
159 mainproc(nil);
160 if(verbose) fprint(2, "mainproc finished\n");
161 _exits(0);
162 default:
163 if(verbose) fprint(2, "9pserve exiting\n");
164 _exits(0);
168 void
169 mainproc(void *v)
171 int n;
172 Fcall f;
173 USED(v);
175 atnotify(ignorepipe, 1);
176 fmtinstall('D', dirfmt);
177 fmtinstall('M', dirmodefmt);
178 fmtinstall('F', fcallfmt);
179 fmtinstall('H', encodefmt);
181 outq = qalloc();
182 inq = qalloc();
184 f.type = Tversion;
185 f.version = "9P2000";
186 f.msize = msize;
187 f.tag = NOTAG;
188 n = convS2M(&f, vbuf, sizeof vbuf);
189 if(verbose > 1) fprint(2, "* <- %F\n", &f);
190 write(1, vbuf, n);
191 n = read9pmsg(0, vbuf, sizeof vbuf);
192 if(convM2S(vbuf, n, &f) != n)
193 sysfatal("convM2S failure");
194 if(f.msize < msize)
195 msize = f.msize;
196 if(verbose > 1) fprint(2, "* -> %F\n", &f);
198 threadcreate(inputthread, nil, STACK);
199 threadcreate(outputthread, nil, STACK);
200 threadcreate(listenthread, nil, STACK);
201 threadexits(0);
204 int
205 ignorepipe(void *v, char *s)
207 USED(v);
208 if(strcmp(s, "sys: write on closed pipe") == 0)
209 return 1;
210 fprint(2, "msg: %s\n", s);
211 return 0;
214 void
215 listenthread(void *arg)
217 Conn *c;
218 Ioproc *io;
220 io = ioproc();
221 USED(arg);
222 for(;;){
223 c = emalloc(sizeof(Conn));
224 c->fd = iolisten(io, adir, c->dir);
225 if(c->fd < 0){
226 if(verbose) fprint(2, "listen: %r\n");
227 close(afd);
228 free(c);
229 return;
231 c->inc = chancreate(sizeof(void*), 0);
232 c->internal = chancreate(sizeof(void*), 0);
233 c->inq = qalloc();
234 c->outq = qalloc();
235 if(verbose) fprint(2, "incoming call on %s\n", c->dir);
236 threadcreate(connthread, c, STACK);
240 void
241 send9pmsg(Msg *m)
243 int n, nn;
245 n = sizeS2M(&m->rx);
246 m->rpkt = emalloc(n);
247 nn = convS2M(&m->rx, m->rpkt, n);
248 if(nn != n)
249 sysfatal("sizeS2M + convS2M disagree");
250 sendq(m->c->outq, m);
253 void
254 sendomsg(Msg *m)
256 int n, nn;
258 n = sizeS2M(&m->tx);
259 m->tpkt = emalloc(n);
260 nn = convS2M(&m->tx, m->tpkt, n);
261 if(nn != n)
262 sysfatal("sizeS2M + convS2M disagree");
263 sendq(outq, m);
266 void
267 err(Msg *m, char *ename)
269 m->rx.type = Rerror;
270 m->rx.ename = ename;
271 m->rx.tag = m->tx.tag;
272 send9pmsg(m);
275 void
276 connthread(void *arg)
278 int i, fd;
279 Conn *c;
280 Hash *h;
281 Msg *m, *om;
282 Fid *f;
283 Ioproc *io;
285 c = arg;
286 io = ioproc();
287 fd = ioaccept(io, c->fd, c->dir);
288 if(fd < 0){
289 if(verbose) fprint(2, "accept %s: %r\n", c->dir);
290 goto out;
292 close(c->fd);
293 c->fd = fd;
294 threadcreate(connoutthread, c, STACK);
295 while((m = mread9p(io, c->fd)) != nil){
296 if(verbose > 1) fprint(2, "fd#%d -> %F\n", c->fd, &m->tx);
297 m->c = c;
298 m->ctag = m->tx.tag;
299 c->nmsg++;
300 if(puthash(c->tag, m->tx.tag, m) < 0){
301 err(m, "duplicate tag");
302 continue;
304 m->ref++;
305 switch(m->tx.type){
306 case Tversion:
307 m->rx.tag = m->tx.tag;
308 m->rx.msize = m->tx.msize;
309 if(m->rx.msize > msize)
310 m->rx.msize = msize;
311 m->rx.version = "9P2000";
312 m->rx.type = Rversion;
313 send9pmsg(m);
314 continue;
315 case Tflush:
316 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
317 m->rx.tag = m->tx.tag;
318 m->rx.type = Rflush;
319 send9pmsg(m);
320 continue;
322 m->oldm->ref++;
323 break;
324 case Tattach:
325 m->afid = nil;
326 if(m->tx.afid != NOFID
327 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){
328 err(m, "unknown fid");
329 continue;
331 m->fid = fidnew(m->tx.fid);
332 if(puthash(c->fid, m->tx.fid, m->fid) < 0){
333 err(m, "duplicate fid");
334 continue;
336 m->fid->ref++;
337 break;
338 case Twalk:
339 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
340 err(m, "unknown fid");
341 continue;
343 m->fid->ref++;
344 if(m->tx.newfid == m->tx.fid){
345 m->fid->ref++;
346 m->newfid = m->fid;
347 }else{
348 m->newfid = fidnew(m->tx.newfid);
349 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
350 err(m, "duplicate fid");
351 continue;
353 m->newfid->ref++;
355 break;
356 case Tauth:
357 m->afid = fidnew(m->tx.afid);
358 if(puthash(c->fid, m->tx.afid, m->afid) < 0){
359 err(m, "duplicate fid");
360 continue;
362 m->afid->ref++;
363 break;
364 case Topenfd:
365 if(m->tx.mode&~(OTRUNC|3)){
366 err(m, "bad openfd mode");
367 continue;
369 m->isopenfd = 1;
370 m->tx.type = Topen;
371 m->tpkt[4] = Topen;
372 /* fall through */
373 case Tcreate:
374 case Topen:
375 case Tclunk:
376 case Tread:
377 case Twrite:
378 case Tremove:
379 case Tstat:
380 case Twstat:
381 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
382 err(m, "unknown fid");
383 continue;
385 m->fid->ref++;
386 break;
389 /* have everything - translate and send */
390 m->c = c;
391 m->ctag = m->tx.tag;
392 m->tx.tag = m->tag;
393 if(m->fid)
394 m->tx.fid = m->fid->fid;
395 if(m->newfid)
396 m->tx.newfid = m->newfid->fid;
397 if(m->afid)
398 m->tx.afid = m->afid->fid;
399 if(m->oldm)
400 m->tx.oldtag = m->oldm->tag;
401 /* reference passes to outq */
402 sendq(outq, m);
403 while(c->nmsg >= MAXMSG){
404 c->inputstalled = 1;
405 recvp(c->inc);
409 if(verbose) fprint(2, "%s eof\n", c->dir);
411 /* flush all outstanding messages */
412 for(i=0; i<NHASH; i++){
413 for(h=c->tag[i]; h; h=h->next){
414 om = h->v;
415 m = msgnew();
416 m->internal = 1;
417 m->c = c;
418 c->nmsg++;
419 m->tx.type = Tflush;
420 m->tx.tag = m->tag;
421 m->tx.oldtag = om->tag;
422 m->oldm = om;
423 om->ref++;
424 m->ref++; /* for outq */
425 sendomsg(m);
426 recvp(c->internal);
427 msgput(m); /* got from recvp */
428 msgput(m); /* got from msgnew */
429 msgput(om); /* got from hash table */
433 /* clunk all outstanding fids */
434 for(i=0; i<NHASH; i++){
435 for(h=c->fid[i]; h; h=h->next){
436 f = h->v;
437 m = msgnew();
438 m->internal = 1;
439 m->c = c;
440 c->nmsg++;
441 m->tx.type = Tclunk;
442 m->tx.tag = m->tag;
443 m->tx.fid = f->fid;
444 m->fid = f;
445 f->ref++;
446 m->ref++;
447 sendomsg(m);
448 recvp(c->internal);
449 msgput(m); /* got from recvp */
450 msgput(m); /* got from msgnew */
451 fidput(f); /* got from hash table */
455 out:
456 assert(c->nmsg == 0);
457 assert(c->nfid == 0);
458 close(c->fd);
459 chanfree(c->internal);
460 c->internal = 0;
461 chanfree(c->inc);
462 c->inc = 0;
463 free(c->inq);
464 c->inq = 0;
465 free(c->outq);
466 c->outq = 0;
467 free(c);
470 static void
471 openfdthread(void *v)
473 Conn *c;
474 Fid *fid;
475 Msg *m;
476 int n;
477 vlong tot;
478 Ioproc *io;
479 char buf[1024];
481 c = v;
482 fid = c->fdfid;
483 io = ioproc();
485 tot = 0;
486 if(c->fdmode == OREAD){
487 for(;;){
488 if(verbose) fprint(2, "tread...");
489 m = msgnew();
490 m->internal = 1;
491 m->c = c;
492 m->tx.type = Tread;
493 m->tx.count = msize - IOHDRSZ;
494 m->tx.fid = fid->fid;
495 m->tx.tag = m->tag;
496 m->tx.offset = tot;
497 m->fid = fid;
498 fid->ref++;
499 m->ref++;
500 sendomsg(m);
501 recvp(c->internal);
502 if(m->rx.type == Rerror){
503 // fprint(2, "read error: %s\n", m->rx.ename);
504 break;
506 if(m->rx.count == 0)
507 break;
508 tot += m->rx.count;
509 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
510 fprint(2, "pipe write error: %r\n");
511 break;
513 msgput(m);
514 msgput(m);
516 }else{
517 for(;;){
518 if(verbose) fprint(2, "twrite...");
519 n = sizeof buf;
520 if(n > msize)
521 n = msize;
522 if((n=ioread(io, c->fd, buf, n)) <= 0){
523 if(n < 0)
524 fprint(2, "pipe read error: %r\n");
525 m = nil;
526 break;
528 m = msgnew();
529 m->internal = 1;
530 m->c = c;
531 m->tx.type = Twrite;
532 m->tx.fid = fid->fid;
533 m->tx.data = buf;
534 m->tx.count = n;
535 m->tx.tag = m->tag;
536 m->tx.offset = tot;
537 m->fid = fid;
538 fid->ref++;
539 m->ref++;
540 sendomsg(m);
541 recvp(c->internal);
542 if(m->rx.type == Rerror){
543 // fprint(2, "write error: %s\n", m->rx.ename);
544 continue;
546 tot = n;
547 msgput(m);
548 msgput(m);
551 if(verbose) fprint(2, "eof on %d fid %d\n", c->fd, fid->fid);
552 close(c->fd);
553 closeioproc(io);
554 if(m){
555 msgput(m);
556 msgput(m);
558 if(fid->ref == 1){
559 m = msgnew();
560 m->internal = 1;
561 m->c = c;
562 m->tx.type = Tclunk;
563 m->tx.fid = fid->fid;
564 m->fid = fid;
565 fid->ref++;
566 m->ref++;
567 sendomsg(m);
568 recvp(c->internal);
569 msgput(m);
570 msgput(m);
572 fidput(fid);
573 c->fdfid = nil;
574 chanfree(c->internal);
575 c->internal = 0;
576 free(c);
579 int
580 xopenfd(Msg *m)
582 char errs[ERRMAX];
583 int n, p[2];
584 Conn *nc;
586 if(pipe(p) < 0){
587 rerrstr(errs, sizeof errs);
588 err(m, errs);
590 if(verbose) fprint(2, "xopen pipe %d %d...", p[0], p[1]);
592 /* now we're committed. */
594 /* a new connection for this fid */
595 nc = emalloc(sizeof(Conn));
596 nc->internal = chancreate(sizeof(void*), 0);
598 /* a ref for us */
599 nc->fdfid = m->fid;
600 m->fid->ref++;
601 nc->fdmode = m->tx.mode;
602 nc->fd = p[0];
604 /* a thread to tend the pipe */
605 threadcreate(openfdthread, nc, STACK);
607 /* if mode is ORDWR, that openfdthread will write; start a reader */
608 if((m->tx.mode&3) == ORDWR){
609 nc = emalloc(sizeof(Conn));
610 nc->internal = chancreate(sizeof(void*), 0);
611 nc->fdfid = m->fid;
612 m->fid->ref++;
613 nc->fdmode = OREAD;
614 nc->fd = dup(p[0], -1);
615 threadcreate(openfdthread, nc, STACK);
618 /* steal fid from other connection */
619 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
620 fidput(m->fid);
622 /* rewrite as Ropenfd */
623 m->rx.type = Ropenfd;
624 n = GBIT32(m->rpkt);
625 m->rpkt = erealloc(m->rpkt, n+4);
626 PBIT32(m->rpkt+n, p[1]);
627 n += 4;
628 PBIT32(m->rpkt, n);
629 m->rpkt[4] = Ropenfd;
630 m->rx.unixfd = p[1];
631 return 0;
634 void
635 connoutthread(void *arg)
637 int err;
638 Conn *c;
639 Msg *m, *om;
640 Ioproc *io;
642 c = arg;
643 io = ioproc();
644 while((m = recvq(c->outq)) != nil){
645 err = m->tx.type+1 != m->rx.type;
646 if(!err && m->isopenfd)
647 if(xopenfd(m) < 0)
648 continue;
649 switch(m->tx.type){
650 case Tflush:
651 om = m->oldm;
652 if(om)
653 if(delhash(om->c->tag, om->ctag, om) == 0)
654 msgput(om);
655 break;
656 case Tclunk:
657 case Tremove:
658 if(m->fid)
659 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
660 fidput(m->fid);
661 break;
662 case Tauth:
663 if(err && m->afid){
664 fprint(2, "auth error\n");
665 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
666 fidput(m->fid);
668 break;
669 case Tattach:
670 if(err && m->fid)
671 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
672 fidput(m->fid);
673 break;
674 case Twalk:
675 if(err && m->tx.fid != m->tx.newfid && m->newfid)
676 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
677 fidput(m->newfid);
678 break;
680 if(delhash(m->c->tag, m->ctag, m) == 0)
681 msgput(m);
682 if(verbose > 1) fprint(2, "fd#%d <- %F\n", c->fd, &m->rx);
683 rewritehdr(&m->rx, m->rpkt);
684 if(mwrite9p(io, c->fd, m->rpkt) < 0)
685 if(verbose) fprint(2, "write error: %r\n");
686 msgput(m);
687 if(c->inputstalled && c->nmsg < MAXMSG)
688 nbsendp(c->inc, 0);
690 closeioproc(io);
693 void
694 outputthread(void *arg)
696 Msg *m;
697 Ioproc *io;
699 USED(arg);
700 io = ioproc();
701 while((m = recvq(outq)) != nil){
702 if(verbose > 1) fprint(2, "* <- %F\n", &m->tx);
703 rewritehdr(&m->tx, m->tpkt);
704 if(mwrite9p(io, 1, m->tpkt) < 0)
705 sysfatal("output error: %r");
706 msgput(m);
708 closeioproc(io);
709 fprint(2, "output eof\n");
710 threadexitsall(0);
713 void
714 inputthread(void *arg)
716 uchar *pkt;
717 int n, nn, tag;
718 Msg *m;
719 Ioproc *io;
721 if(verbose) fprint(2, "input thread\n");
722 io = ioproc();
723 USED(arg);
724 while((pkt = read9ppkt(io, 0)) != nil){
725 n = GBIT32(pkt);
726 if(n < 7){
727 fprint(2, "short 9P packet from server\n");
728 free(pkt);
729 continue;
731 if(verbose > 2) fprint(2, "read %.*H\n", n, pkt);
732 tag = GBIT16(pkt+5);
733 if((m = msgget(tag)) == nil){
734 fprint(2, "unexpected 9P response tag %d\n", tag);
735 free(pkt);
736 continue;
738 if((nn = convM2S(pkt, n, &m->rx)) != n){
739 fprint(2, "bad packet - convM2S %d but %d\n", nn, n);
740 free(pkt);
741 msgput(m);
742 continue;
744 if(verbose > 1) fprint(2, "* -> %F\n", &m->rx);
745 m->rpkt = pkt;
746 m->rx.tag = m->ctag;
747 if(m->internal)
748 sendp(m->c->internal, 0);
749 else
750 sendq(m->c->outq, m);
752 closeioproc(io);
753 //fprint(2, "input eof\n");
754 threadexitsall(0);
757 void*
758 gethash(Hash **ht, uint n)
760 Hash *h;
762 for(h=ht[n%NHASH]; h; h=h->next)
763 if(h->n == n)
764 return h->v;
765 return nil;
768 int
769 delhash(Hash **ht, uint n, void *v)
771 Hash *h, **l;
773 for(l=&ht[n%NHASH]; h=*l; l=&h->next)
774 if(h->n == n){
775 if(h->v != v){
776 if(verbose) fprint(2, "delhash %d got %p want %p\n", n, h->v, v);
777 return -1;
779 *l = h->next;
780 free(h);
781 return 0;
783 return -1;
786 int
787 puthash(Hash **ht, uint n, void *v)
789 Hash *h;
791 if(gethash(ht, n))
792 return -1;
793 h = emalloc(sizeof(Hash));
794 h->next = ht[n%NHASH];
795 h->n = n;
796 h->v = v;
797 ht[n%NHASH] = h;
798 return 0;
801 Fid **fidtab;
802 int nfidtab;
803 Fid *freefid;
805 Fid*
806 fidnew(int cfid)
808 Fid *f;
810 if(freefid == nil){
811 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
812 fidtab[nfidtab] = emalloc(sizeof(Fid));
813 freefid = fidtab[nfidtab];
814 freefid->fid = nfidtab++;
816 f = freefid;
817 freefid = f->next;
818 f->cfid = cfid;
819 f->ref = 1;
820 return f;
823 void
824 fidput(Fid *f)
826 if(f == nil)
827 return;
828 assert(f->ref > 0);
829 if(--f->ref > 0)
830 return;
831 f->next = freefid;
832 f->cfid = -1;
833 freefid = f;
836 Msg **msgtab;
837 int nmsgtab;
838 Msg *freemsg;
840 Msg*
841 msgnew(void)
843 Msg *m;
845 if(freemsg == nil){
846 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
847 msgtab[nmsgtab] = emalloc(sizeof(Msg));
848 freemsg = msgtab[nmsgtab];
849 freemsg->tag = nmsgtab++;
851 m = freemsg;
852 freemsg = m->next;
853 m->ref = 1;
854 return m;
857 void
858 msgput(Msg *m)
860 if(verbose > 2) fprint(2, "msgput tag %d/%d ref %d\n", m->tag, m->ctag, m->ref);
861 assert(m->ref > 0);
862 if(--m->ref > 0)
863 return;
864 m->c->nmsg--;
865 m->c = nil;
866 fidput(m->fid);
867 m->fid = nil;
868 fidput(m->afid);
869 m->afid = nil;
870 fidput(m->newfid);
871 m->newfid = nil;
872 free(m->tpkt);
873 m->tpkt = nil;
874 free(m->rpkt);
875 m->rpkt = nil;
876 if(m->rx.type == Ropenfd)
877 close(m->rx.unixfd);
878 m->rx.unixfd = -1;
879 m->isopenfd = 0;
880 m->internal = 0;
881 m->next = freemsg;
882 freemsg = m;
885 Msg*
886 msgget(int n)
888 Msg *m;
890 if(n < 0 || n >= nmsgtab)
891 return nil;
892 m = msgtab[n];
893 if(m->ref == 0)
894 return nil;
895 if(verbose) fprint(2, "msgget %d = %p\n", n, m);
896 m->ref++;
897 return m;
901 void*
902 emalloc(int n)
904 void *v;
906 v = mallocz(n, 1);
907 if(v == nil){
908 abort();
909 sysfatal("out of memory allocating %d", n);
911 return v;
914 void*
915 erealloc(void *v, int n)
917 v = realloc(v, n);
918 if(v == nil){
919 abort();
920 sysfatal("out of memory reallocating %d", n);
922 return v;
925 typedef struct Qel Qel;
926 struct Qel
928 Qel *next;
929 void *p;
930 };
932 struct Queue
934 int hungup;
935 QLock lk;
936 Rendez r;
937 Qel *head;
938 Qel *tail;
939 };
941 Queue*
942 qalloc(void)
944 Queue *q;
946 q = mallocz(sizeof(Queue), 1);
947 if(q == nil)
948 return nil;
949 q->r.l = &q->lk;
950 return q;
953 int
954 sendq(Queue *q, void *p)
956 Qel *e;
958 e = emalloc(sizeof(Qel));
959 qlock(&q->lk);
960 if(q->hungup){
961 werrstr("hungup queue");
962 qunlock(&q->lk);
963 return -1;
965 e->p = p;
966 e->next = nil;
967 if(q->head == nil)
968 q->head = e;
969 else
970 q->tail->next = e;
971 q->tail = e;
972 rwakeup(&q->r);
973 qunlock(&q->lk);
974 return 0;
977 void*
978 recvq(Queue *q)
980 void *p;
981 Qel *e;
983 qlock(&q->lk);
984 while(q->head == nil && !q->hungup)
985 rsleep(&q->r);
986 if(q->hungup){
987 qunlock(&q->lk);
988 return nil;
990 e = q->head;
991 q->head = e->next;
992 qunlock(&q->lk);
993 p = e->p;
994 free(e);
995 return p;
998 uchar*
999 read9ppkt(Ioproc *io, int fd)
1001 uchar buf[4], *pkt;
1002 int n, nn;
1004 n = ioreadn(io, fd, buf, 4);
1005 if(n != 4)
1006 return nil;
1007 n = GBIT32(buf);
1008 pkt = emalloc(n);
1009 PBIT32(pkt, n);
1010 nn = ioreadn(io, fd, pkt+4, n-4);
1011 if(nn != n-4){
1012 free(pkt);
1013 return nil;
1015 /* would do this if we ever got one of these, but we only generate them
1016 if(pkt[4] == Ropenfd){
1017 newfd = iorecvfd(io, fd);
1018 PBIT32(pkt+n-4, newfd);
1021 return pkt;
1024 Msg*
1025 mread9p(Ioproc *io, int fd)
1027 int n, nn;
1028 uchar *pkt;
1029 Msg *m;
1031 if((pkt = read9ppkt(io, fd)) == nil)
1032 return nil;
1034 m = msgnew();
1035 m->tpkt = pkt;
1036 n = GBIT32(pkt);
1037 nn = convM2S(pkt, n, &m->tx);
1038 if(nn != n){
1039 fprint(2, "read bad packet from %d\n", fd);
1040 return nil;
1042 return m;
1045 int
1046 mwrite9p(Ioproc *io, int fd, uchar *pkt)
1048 int n, nfd;
1050 n = GBIT32(pkt);
1051 if(verbose > 2) fprint(2, "write %d %d %.*H\n", fd, n, n, pkt);
1052 if(iowrite(io, fd, pkt, n) != n){
1053 fprint(2, "write error: %r\n");
1054 return -1;
1056 if(pkt[4] == Ropenfd){
1057 nfd = GBIT32(pkt+n-4);
1058 if(iosendfd(io, fd, nfd) < 0){
1059 fprint(2, "send fd error: %r\n");
1060 return -1;
1063 return 0;
1066 void
1067 restring(uchar *pkt, int pn, char *s)
1069 int n;
1071 if(s < (char*)pkt || s >= (char*)pkt+pn)
1072 return;
1074 n = strlen(s);
1075 memmove(s+1, s, n);
1076 PBIT16((uchar*)s-1, n);
1079 void
1080 rewritehdr(Fcall *f, uchar *pkt)
1082 int i, n;
1084 n = GBIT32(pkt);
1085 PBIT16(pkt+5, f->tag);
1086 switch(f->type){
1087 case Tversion:
1088 case Rversion:
1089 restring(pkt, n, f->version);
1090 break;
1091 case Tauth:
1092 PBIT32(pkt+7, f->afid);
1093 restring(pkt, n, f->uname);
1094 restring(pkt, n, f->aname);
1095 break;
1096 case Tflush:
1097 PBIT16(pkt+7, f->oldtag);
1098 break;
1099 case Tattach:
1100 restring(pkt, n, f->uname);
1101 restring(pkt, n, f->aname);
1102 PBIT32(pkt+7, f->fid);
1103 PBIT32(pkt+11, f->afid);
1104 break;
1105 case Twalk:
1106 PBIT32(pkt+7, f->fid);
1107 PBIT32(pkt+11, f->newfid);
1108 for(i=0; i<f->nwname; i++)
1109 restring(pkt, n, f->wname[i]);
1110 break;
1111 case Tcreate:
1112 restring(pkt, n, f->name);
1113 /* fall through */
1114 case Topen:
1115 case Tread:
1116 case Twrite:
1117 case Tclunk:
1118 case Tremove:
1119 case Tstat:
1120 case Twstat:
1121 PBIT32(pkt+7, f->fid);
1122 break;
1123 case Rerror:
1124 restring(pkt, n, f->ename);
1125 break;
1129 #ifdef _LIB9_H_
1130 /* unix select-based polling */
1131 struct Ioproc
1133 Channel *c;
1134 Ioproc *next;
1135 int index;
1138 Ioproc*
1139 ioproc(void)
1141 return (Ioproc*)-1;
1144 void
1145 closeioproc(Ioproc *io)
1149 long
1150 ioread(Ioproc *io, int fd, void *v, long n)
1152 USED(io);
1154 return threadread(fd, v, n);
1157 long
1158 ioreadn(Ioproc *io, int fd, void *v, long n)
1160 long tot, m;
1161 uchar *u;
1163 u = v;
1164 for(tot=0; tot<n; tot+=m){
1165 m = ioread(io, fd, u+tot, n-tot);
1166 if(m <= 0){
1167 if(tot)
1168 break;
1169 return m;
1172 return tot;
1175 int
1176 iorecvfd(Ioproc *io, int fd)
1178 int r;
1180 threadfdnoblock(fd);
1181 while((r=recvfd(fd)) < 0){
1182 if(errno == EINTR)
1183 continue;
1184 if(errno == EWOULDBLOCK || errno == EAGAIN){
1185 threadfdwait(fd, 'r');
1186 continue;
1188 break;
1190 return r;
1193 int
1194 iosendfd(Ioproc *io, int s, int fd)
1196 int r;
1198 threadfdnoblock(s);
1199 while((r=sendfd(s, fd)) < 0){
1200 if(errno == EINTR)
1201 continue;
1202 if(errno == EWOULDBLOCK || errno == EAGAIN){
1203 threadfdwait(fd, 'w');
1204 continue;
1206 break;
1208 return r;
1211 static long
1212 _iowrite(Ioproc *io, int fd, void *v, long n)
1214 USED(io);
1215 return threadwrite(fd, v, n);
1218 long
1219 iowrite(Ioproc *io, int fd, void *v, long n)
1221 long tot, m;
1222 uchar *u;
1224 u = v;
1225 for(tot=0; tot<n; tot+=m){
1226 m = _iowrite(io, fd, u+tot, n-tot);
1227 if(m < 0){
1228 if(tot)
1229 break;
1230 return m;
1233 return tot;
1236 int
1237 iolisten(Ioproc *io, char *dir, char *ndir)
1239 int fd;
1240 int r;
1241 extern int _p9netfd(char*);
1242 USED(io);
1244 if((fd = _p9netfd(dir)) < 0)
1245 return -1;
1246 threadfdnoblock(fd);
1247 while((r=listen(dir, ndir)) < 0){
1248 if(errno == EINTR)
1249 continue;
1250 if(errno == EWOULDBLOCK || errno == EAGAIN){
1251 threadfdwait(fd, 'r');
1252 continue;
1254 break;
1256 return r;
1259 int
1260 ioaccept(Ioproc *io, int fd, char *dir)
1262 int r;
1263 USED(io);
1265 threadfdnoblock(fd);
1266 while((r=accept(fd, dir)) < 0){
1267 if(errno == EINTR)
1268 continue;
1269 if(errno == EWOULDBLOCK || errno == EAGAIN){
1270 threadfdwait(fd, 'r');
1271 continue;
1273 break;
1275 return r;
1278 #else
1279 /* real plan 9 io procs */
1280 static long
1281 _iolisten(va_list *arg)
1283 char *a, *b;
1285 a = va_arg(*arg, char*);
1286 b = va_arg(*arg, char*);
1287 return listen(a, b);
1290 int
1291 iolisten(Ioproc *io, char *a, char *b)
1293 return iocall(io, _iolisten, a, b);
1296 static long
1297 _ioaccept(va_list *arg)
1299 int fd;
1300 char *dir;
1302 fd = va_arg(*arg, int);
1303 dir = va_arg(*arg, char*);
1304 return accept(fd, dir);
1307 int
1308 ioaccept(Ioproc *io, int fd, char *dir)
1310 return iocall(io, _ioaccept, fd, dir);
1312 #endif