Blob
1 #include <u.h>2 #include <libc.h>3 #include <fcall.h>4 #include <thread.h>5 #include <errno.h>7 enum8 {9 STACK = 32768,10 NHASH = 31,11 MAXMSG = 64, /* per connection */12 };14 typedef struct Hash Hash;15 typedef struct Fid Fid;16 typedef struct Msg Msg;17 typedef struct Conn Conn;18 typedef struct Queue Queue;20 struct Hash21 {22 Hash *next;23 uint n;24 void *v;25 };27 struct Fid28 {29 int fid;30 int ref;31 int cfid;32 Fid *next;33 };35 struct Msg36 {37 Conn *c;38 int internal;39 int ref;40 int ctag;41 int tag;42 int isopenfd;43 Fcall tx;44 Fcall rx;45 Fid *fid;46 Fid *newfid;47 Fid *afid;48 Msg *oldm;49 Msg *next;50 uchar *tpkt;51 uchar *rpkt;52 };54 struct Conn55 {56 int fd;57 int fdmode;58 Fid *fdfid;59 int nmsg;60 int nfid;61 Channel *inc;62 Channel *internal;63 int inputstalled;64 char dir[40];65 Hash *tag[NHASH];66 Hash *fid[NHASH];67 Queue *outq;68 Queue *inq;69 };71 char *addr;72 int afd;73 char adir[40];74 int isunix;75 Queue *outq;76 Queue *inq;77 int verbose = 0;78 int msize = 8192;80 void *gethash(Hash**, uint);81 int puthash(Hash**, uint, void*);82 int delhash(Hash**, uint, void*);83 Msg *mread9p(Ioproc*, int);84 int mwrite9p(Ioproc*, int, uchar*);85 uchar *read9ppkt(Ioproc*, int);86 int write9ppkt(int, uchar*);87 Msg *msgnew(void);88 void msgput(Msg*);89 Msg *msgget(int);90 Fid *fidnew(int);91 void fidput(Fid*);92 void *emalloc(int);93 void *erealloc(void*, int);94 Queue *qalloc(void);95 int sendq(Queue*, void*);96 void *recvq(Queue*);97 void connthread(void*);98 void connoutthread(void*);99 void listenthread(void*);100 void outputthread(void*);101 void inputthread(void*);102 void rewritehdr(Fcall*, uchar*);103 int tlisten(char*, char*);104 int taccept(int, char*);105 int iolisten(Ioproc*, char*, char*);106 int ioaccept(Ioproc*, int, char*);107 int iorecvfd(Ioproc*, int);108 int iosendfd(Ioproc*, int, int);109 void mainproc(void*);110 int ignorepipe(void*, char*);112 void113 usage(void)114 {115 fprint(2, "usage: 9pserve [-s service] [-u] address\n");116 fprint(2, "\treads/writes 9P messages on stdin/stdout\n");117 exits("usage");118 }120 uchar vbuf[128];121 extern int _threaddebuglevel;122 void123 threadmain(int argc, char **argv)124 {125 char *file;127 ARGBEGIN{128 default:129 usage();130 case 'v':131 verbose++;132 break;133 case 's':134 close(0);135 if(open(file=EARGF(usage()), ORDWR) != 0)136 sysfatal("open %s: %r", file);137 dup(0, 1);138 break;139 case 'u':140 isunix = 1;141 break;142 }ARGEND144 if(verbose) fprint(2, "9pserve running\n");145 if(argc != 1)146 usage();147 addr = argv[0];149 if((afd = announce(addr, adir)) < 0)150 sysfatal("announce %s: %r", addr);152 if(verbose) fprint(2, "9pserve forking\n");153 switch(fork()){154 case -1:155 sysfatal("fork: %r");156 case 0:157 if(verbose) fprint(2, "running mainproc\n");158 mainproc(nil);159 if(verbose) fprint(2, "mainproc finished\n");160 _exits(0);161 default:162 if(verbose) fprint(2, "9pserve exiting\n");163 _exits(0);164 }165 }167 void168 mainproc(void *v)169 {170 int n, nn;171 Fcall f;172 USED(v);174 atnotify(ignorepipe, 1);175 fmtinstall('D', dirfmt);176 fmtinstall('M', dirmodefmt);177 fmtinstall('F', fcallfmt);178 fmtinstall('H', encodefmt);180 outq = qalloc();181 inq = qalloc();183 f.type = Tversion;184 f.version = "9P2000";185 f.msize = msize;186 f.tag = NOTAG;187 n = convS2M(&f, vbuf, sizeof vbuf);188 if(verbose > 1) fprint(2, "* <- %F\n", &f);189 nn = write(1, vbuf, n);190 if(n != nn)191 sysfatal("error writing Tversion: %r\n");192 n = threadread9pmsg(0, vbuf, sizeof vbuf);193 if(convM2S(vbuf, n, &f) != n)194 sysfatal("convM2S failure");195 if(f.msize < msize)196 msize = f.msize;197 if(verbose > 1) fprint(2, "* -> %F\n", &f);199 threadcreate(inputthread, nil, STACK);200 threadcreate(outputthread, nil, STACK);201 threadcreate(listenthread, nil, STACK);202 threadexits(0);203 }205 int206 ignorepipe(void *v, char *s)207 {208 USED(v);209 if(strcmp(s, "sys: write on closed pipe") == 0)210 return 1;211 fprint(2, "msg: %s\n", s);212 return 0;213 }215 void216 listenthread(void *arg)217 {218 Conn *c;219 Ioproc *io;221 io = ioproc();222 USED(arg);223 for(;;){224 c = emalloc(sizeof(Conn));225 c->fd = iolisten(io, adir, c->dir);226 if(c->fd < 0){227 if(verbose) fprint(2, "listen: %r\n");228 close(afd);229 free(c);230 return;231 }232 c->inc = chancreate(sizeof(void*), 0);233 c->internal = chancreate(sizeof(void*), 0);234 c->inq = qalloc();235 c->outq = qalloc();236 if(verbose) fprint(2, "incoming call on %s\n", c->dir);237 threadcreate(connthread, c, STACK);238 }239 }241 void242 send9pmsg(Msg *m)243 {244 int n, nn;246 n = sizeS2M(&m->rx);247 m->rpkt = emalloc(n);248 nn = convS2M(&m->rx, m->rpkt, n);249 if(nn != n)250 sysfatal("sizeS2M + convS2M disagree");251 sendq(m->c->outq, m);252 }254 void255 sendomsg(Msg *m)256 {257 int n, nn;259 n = sizeS2M(&m->tx);260 m->tpkt = emalloc(n);261 nn = convS2M(&m->tx, m->tpkt, n);262 if(nn != n)263 sysfatal("sizeS2M + convS2M disagree");264 sendq(outq, m);265 }267 void268 err(Msg *m, char *ename)269 {270 m->rx.type = Rerror;271 m->rx.ename = ename;272 m->rx.tag = m->tx.tag;273 send9pmsg(m);274 }276 void277 connthread(void *arg)278 {279 int i, fd;280 Conn *c;281 Hash *h, *hnext;282 Msg *m, *om, *mm;283 Fid *f;284 Ioproc *io;286 c = arg;287 io = ioproc();288 fd = ioaccept(io, c->fd, c->dir);289 if(fd < 0){290 if(verbose) fprint(2, "accept %s: %r\n", c->dir);291 goto out;292 }293 close(c->fd);294 c->fd = fd;295 threadcreate(connoutthread, c, STACK);296 while((m = mread9p(io, c->fd)) != nil){297 if(verbose > 1) fprint(2, "fd#%d -> %F\n", c->fd, &m->tx);298 m->c = c;299 m->ctag = m->tx.tag;300 c->nmsg++;301 if(puthash(c->tag, m->tx.tag, m) < 0){302 err(m, "duplicate tag");303 continue;304 }305 m->ref++;306 switch(m->tx.type){307 case Tversion:308 m->rx.tag = m->tx.tag;309 m->rx.msize = m->tx.msize;310 if(m->rx.msize > msize)311 m->rx.msize = msize;312 m->rx.version = "9P2000";313 m->rx.type = Rversion;314 send9pmsg(m);315 continue;316 case Tflush:317 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){318 m->rx.tag = m->tx.tag;319 m->rx.type = Rflush;320 send9pmsg(m);321 continue;322 }323 m->oldm->ref++;324 break;325 case Tattach:326 m->afid = nil;327 if(m->tx.afid != NOFID328 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){329 err(m, "unknown fid");330 continue;331 }332 m->fid = fidnew(m->tx.fid);333 if(puthash(c->fid, m->tx.fid, m->fid) < 0){334 err(m, "duplicate fid");335 continue;336 }337 m->fid->ref++;338 break;339 case Twalk:340 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){341 err(m, "unknown fid");342 continue;343 }344 m->fid->ref++;345 if(m->tx.newfid == m->tx.fid){346 m->fid->ref++;347 m->newfid = m->fid;348 }else{349 m->newfid = fidnew(m->tx.newfid);350 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){351 err(m, "duplicate fid");352 continue;353 }354 m->newfid->ref++;355 }356 break;357 case Tauth:358 m->afid = fidnew(m->tx.afid);359 if(puthash(c->fid, m->tx.afid, m->afid) < 0){360 err(m, "duplicate fid");361 continue;362 }363 m->afid->ref++;364 break;365 case Topenfd:366 if(m->tx.mode&~(OTRUNC|3)){367 err(m, "bad openfd mode");368 continue;369 }370 m->isopenfd = 1;371 m->tx.type = Topen;372 m->tpkt[4] = Topen;373 /* fall through */374 case Tcreate:375 case Topen:376 case Tclunk:377 case Tread:378 case Twrite:379 case Tremove:380 case Tstat:381 case Twstat:382 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){383 err(m, "unknown fid");384 continue;385 }386 m->fid->ref++;387 break;388 }390 /* have everything - translate and send */391 m->c = c;392 m->ctag = m->tx.tag;393 m->tx.tag = m->tag;394 if(m->fid)395 m->tx.fid = m->fid->fid;396 if(m->newfid)397 m->tx.newfid = m->newfid->fid;398 if(m->afid)399 m->tx.afid = m->afid->fid;400 if(m->oldm)401 m->tx.oldtag = m->oldm->tag;402 /* reference passes to outq */403 sendq(outq, m);404 while(c->nmsg >= MAXMSG){405 c->inputstalled = 1;406 recvp(c->inc);407 }408 }410 if(verbose) fprint(2, "fd#%d eof; flushing conn\n", c->fd);412 /* flush the output queue */413 sendq(c->outq, nil);414 while(c->outq != nil)415 yield();417 /* flush all outstanding messages */418 for(i=0; i<NHASH; i++){419 for(h=c->tag[i]; h; h=hnext){420 om = h->v;421 m = msgnew();422 m->internal = 1;423 m->c = c;424 c->nmsg++;425 m->tx.type = Tflush;426 m->tx.tag = m->tag;427 m->tx.oldtag = om->tag;428 m->oldm = om;429 om->ref++; /* for m->oldm */430 m->ref++; /* for outq */431 sendomsg(m);432 mm = recvp(c->internal);433 assert(mm == m);434 msgput(m); /* got from recvp */435 msgput(m); /* got from msgnew */436 msgput(om); /* got from hash table */437 hnext = h->next;438 free(h);439 }440 }442 /* clunk all outstanding fids */443 for(i=0; i<NHASH; i++){444 for(h=c->fid[i]; h; h=hnext){445 f = h->v;446 m = msgnew();447 m->internal = 1;448 m->c = c;449 c->nmsg++;450 m->tx.type = Tclunk;451 m->tx.tag = m->tag;452 m->tx.fid = f->fid;453 m->fid = f;454 f->ref++;455 m->ref++;456 sendomsg(m);457 mm = recvp(c->internal);458 assert(mm == m);459 msgput(m); /* got from recvp */460 msgput(m); /* got from msgnew */461 fidput(f); /* got from hash table */462 hnext = h->next;463 free(h);464 }465 }467 out:468 assert(c->nmsg == 0);469 assert(c->nfid == 0);470 close(c->fd);471 chanfree(c->internal);472 c->internal = 0;473 chanfree(c->inc);474 c->inc = 0;475 free(c->inq);476 c->inq = 0;477 free(c);478 }480 static void481 openfdthread(void *v)482 {483 Conn *c;484 Fid *fid;485 Msg *m;486 int n;487 vlong tot;488 Ioproc *io;489 char buf[1024];491 c = v;492 fid = c->fdfid;493 io = ioproc();495 tot = 0;496 m = nil;497 if(c->fdmode == OREAD){498 for(;;){499 if(verbose) fprint(2, "tread...");500 m = msgnew();501 m->internal = 1;502 m->c = c;503 m->tx.type = Tread;504 m->tx.count = msize - IOHDRSZ;505 m->tx.fid = fid->fid;506 m->tx.tag = m->tag;507 m->tx.offset = tot;508 m->fid = fid;509 fid->ref++;510 m->ref++;511 sendomsg(m);512 recvp(c->internal);513 if(m->rx.type == Rerror){514 // fprint(2, "read error: %s\n", m->rx.ename);515 break;516 }517 if(m->rx.count == 0)518 break;519 tot += m->rx.count;520 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){521 // fprint(2, "pipe write error: %r\n");522 break;523 }524 msgput(m);525 msgput(m);526 m = nil;527 }528 }else{529 for(;;){530 if(verbose) fprint(2, "twrite...");531 n = sizeof buf;532 if(n > msize)533 n = msize;534 if((n=ioread(io, c->fd, buf, n)) <= 0){535 if(n < 0)536 fprint(2, "pipe read error: %r\n");537 break;538 }539 m = msgnew();540 m->internal = 1;541 m->c = c;542 m->tx.type = Twrite;543 m->tx.fid = fid->fid;544 m->tx.data = buf;545 m->tx.count = n;546 m->tx.tag = m->tag;547 m->tx.offset = tot;548 m->fid = fid;549 fid->ref++;550 m->ref++;551 sendomsg(m);552 recvp(c->internal);553 if(m->rx.type == Rerror){554 // fprint(2, "write error: %s\n", m->rx.ename);555 }556 tot += n;557 msgput(m);558 msgput(m);559 m = nil;560 }561 }562 if(verbose) fprint(2, "eof on %d fid %d\n", c->fd, fid->fid);563 close(c->fd);564 closeioproc(io);565 if(m){566 msgput(m);567 msgput(m);568 }569 if(fid->ref == 1){570 m = msgnew();571 m->internal = 1;572 m->c = c;573 m->tx.type = Tclunk;574 m->tx.tag = m->tag;575 m->tx.fid = fid->fid;576 m->fid = fid;577 fid->ref++;578 m->ref++;579 sendomsg(m);580 recvp(c->internal);581 msgput(m);582 msgput(m);583 }584 fidput(fid);585 c->fdfid = nil;586 chanfree(c->internal);587 c->internal = 0;588 free(c);589 }591 int592 xopenfd(Msg *m)593 {594 char errs[ERRMAX];595 int n, p[2];596 Conn *nc;598 if(pipe(p) < 0){599 rerrstr(errs, sizeof errs);600 err(m, errs);601 }602 if(verbose) fprint(2, "xopen pipe %d %d...", p[0], p[1]);604 /* now we're committed. */606 /* a new connection for this fid */607 nc = emalloc(sizeof(Conn));608 nc->internal = chancreate(sizeof(void*), 0);610 /* a ref for us */611 nc->fdfid = m->fid;612 m->fid->ref++;613 nc->fdmode = m->tx.mode;614 nc->fd = p[0];616 /* a thread to tend the pipe */617 threadcreate(openfdthread, nc, STACK);619 /* if mode is ORDWR, that openfdthread will write; start a reader */620 if((m->tx.mode&3) == ORDWR){621 nc = emalloc(sizeof(Conn));622 nc->internal = chancreate(sizeof(void*), 0);623 nc->fdfid = m->fid;624 m->fid->ref++;625 nc->fdmode = OREAD;626 nc->fd = dup(p[0], -1);627 threadcreate(openfdthread, nc, STACK);628 }630 /* steal fid from other connection */631 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)632 fidput(m->fid);634 /* rewrite as Ropenfd */635 m->rx.type = Ropenfd;636 n = GBIT32(m->rpkt);637 m->rpkt = erealloc(m->rpkt, n+4);638 PBIT32(m->rpkt+n, p[1]);639 n += 4;640 PBIT32(m->rpkt, n);641 m->rpkt[4] = Ropenfd;642 m->rx.unixfd = p[1];643 return 0;644 }646 void647 connoutthread(void *arg)648 {649 int err;650 Conn *c;651 Queue *outq;652 Msg *m, *om;653 Ioproc *io;655 c = arg;656 outq = c->outq;657 io = ioproc();658 while((m = recvq(outq)) != nil){659 err = m->tx.type+1 != m->rx.type;660 if(!err && m->isopenfd)661 if(xopenfd(m) < 0)662 continue;663 switch(m->tx.type){664 case Tflush:665 om = m->oldm;666 if(om)667 if(delhash(om->c->tag, om->ctag, om) == 0)668 msgput(om);669 break;670 case Tclunk:671 case Tremove:672 if(m->fid)673 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)674 fidput(m->fid);675 break;676 case Tauth:677 if(err && m->afid){678 fprint(2, "auth error\n");679 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)680 fidput(m->fid);681 }682 break;683 case Tattach:684 if(err && m->fid)685 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)686 fidput(m->fid);687 break;688 case Twalk:689 if(err && m->tx.fid != m->tx.newfid && m->newfid)690 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)691 fidput(m->newfid);692 break;693 }694 if(delhash(m->c->tag, m->ctag, m) == 0)695 msgput(m);696 if(verbose > 1) fprint(2, "fd#%d <- %F\n", c->fd, &m->rx);697 rewritehdr(&m->rx, m->rpkt);698 if(mwrite9p(io, c->fd, m->rpkt) < 0)699 if(verbose) fprint(2, "write error: %r\n");700 msgput(m);701 if(c->inputstalled && c->nmsg < MAXMSG)702 nbsendp(c->inc, 0);703 }704 closeioproc(io);705 free(outq);706 c->outq = nil;707 }709 void710 outputthread(void *arg)711 {712 Msg *m;713 Ioproc *io;715 USED(arg);716 io = ioproc();717 while((m = recvq(outq)) != nil){718 if(verbose > 1) fprint(2, "* <- %F\n", &m->tx);719 rewritehdr(&m->tx, m->tpkt);720 if(mwrite9p(io, 1, m->tpkt) < 0)721 sysfatal("output error: %r");722 msgput(m);723 }724 closeioproc(io);725 fprint(2, "output eof\n");726 threadexitsall(0);727 }729 void730 inputthread(void *arg)731 {732 uchar *pkt;733 int n, nn, tag;734 Msg *m;735 Ioproc *io;737 if(verbose) fprint(2, "input thread\n");738 io = ioproc();739 USED(arg);740 while((pkt = read9ppkt(io, 0)) != nil){741 n = GBIT32(pkt);742 if(n < 7){743 fprint(2, "short 9P packet from server\n");744 free(pkt);745 continue;746 }747 if(verbose > 2) fprint(2, "read %.*H\n", n, pkt);748 tag = GBIT16(pkt+5);749 if((m = msgget(tag)) == nil){750 fprint(2, "unexpected 9P response tag %d\n", tag);751 free(pkt);752 continue;753 }754 if((nn = convM2S(pkt, n, &m->rx)) != n){755 fprint(2, "bad packet - convM2S %d but %d\n", nn, n);756 free(pkt);757 msgput(m);758 continue;759 }760 if(verbose > 1) fprint(2, "* -> %F%s\n", &m->rx,761 m->internal ? " (internal)" : "");762 m->rpkt = pkt;763 m->rx.tag = m->ctag;764 if(m->internal)765 sendp(m->c->internal, m);766 else if(m->c->outq)767 sendq(m->c->outq, m);768 else769 msgput(m);770 }771 closeioproc(io);772 //fprint(2, "input eof\n");773 threadexitsall(0);774 }776 void*777 gethash(Hash **ht, uint n)778 {779 Hash *h;781 for(h=ht[n%NHASH]; h; h=h->next)782 if(h->n == n)783 return h->v;784 return nil;785 }787 int788 delhash(Hash **ht, uint n, void *v)789 {790 Hash *h, **l;792 for(l=&ht[n%NHASH]; h=*l; l=&h->next)793 if(h->n == n){794 if(h->v != v){795 if(verbose) fprint(2, "delhash %d got %p want %p\n", n, h->v, v);796 return -1;797 }798 *l = h->next;799 free(h);800 return 0;801 }802 return -1;803 }805 int806 puthash(Hash **ht, uint n, void *v)807 {808 Hash *h;810 if(gethash(ht, n))811 return -1;812 h = emalloc(sizeof(Hash));813 h->next = ht[n%NHASH];814 h->n = n;815 h->v = v;816 ht[n%NHASH] = h;817 return 0;818 }820 Fid **fidtab;821 int nfidtab;822 Fid *freefid;824 Fid*825 fidnew(int cfid)826 {827 Fid *f;829 if(freefid == nil){830 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));831 fidtab[nfidtab] = emalloc(sizeof(Fid));832 freefid = fidtab[nfidtab];833 freefid->fid = nfidtab++;834 }835 f = freefid;836 freefid = f->next;837 f->cfid = cfid;838 f->ref = 1;839 return f;840 }842 void843 fidput(Fid *f)844 {845 if(f == nil)846 return;847 assert(f->ref > 0);848 if(--f->ref > 0)849 return;850 f->next = freefid;851 f->cfid = -1;852 freefid = f;853 }855 Msg **msgtab;856 int nmsgtab;857 Msg *freemsg;859 Msg*860 msgnew(void)861 {862 Msg *m;864 if(freemsg == nil){865 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));866 msgtab[nmsgtab] = emalloc(sizeof(Msg));867 freemsg = msgtab[nmsgtab];868 freemsg->tag = nmsgtab++;869 }870 m = freemsg;871 freemsg = m->next;872 m->ref = 1;873 return m;874 }876 void877 msgput(Msg *m)878 {879 if(m == nil)880 return;882 if(verbose > 2) fprint(2, "msgput tag %d/%d ref %d\n", m->tag, m->ctag, m->ref);883 assert(m->ref > 0);884 if(--m->ref > 0)885 return;886 m->c->nmsg--;887 m->c = nil;888 msgput(m->oldm);889 m->oldm = nil;890 fidput(m->fid);891 m->fid = nil;892 fidput(m->afid);893 m->afid = nil;894 fidput(m->newfid);895 m->newfid = nil;896 free(m->tpkt);897 m->tpkt = nil;898 free(m->rpkt);899 m->rpkt = nil;900 if(m->rx.type == Ropenfd)901 close(m->rx.unixfd);902 m->rx.unixfd = -1;903 m->isopenfd = 0;904 m->internal = 0;905 m->next = freemsg;906 freemsg = m;907 }909 Msg*910 msgget(int n)911 {912 Msg *m;914 if(n < 0 || n >= nmsgtab)915 return nil;916 m = msgtab[n];917 if(m->ref == 0)918 return nil;919 if(verbose) fprint(2, "msgget %d = %p\n", n, m);920 m->ref++;921 return m;922 }925 void*926 emalloc(int n)927 {928 void *v;930 v = mallocz(n, 1);931 if(v == nil){932 abort();933 sysfatal("out of memory allocating %d", n);934 }935 return v;936 }938 void*939 erealloc(void *v, int n)940 {941 v = realloc(v, n);942 if(v == nil){943 abort();944 sysfatal("out of memory reallocating %d", n);945 }946 return v;947 }949 typedef struct Qel Qel;950 struct Qel951 {952 Qel *next;953 void *p;954 };956 struct Queue957 {958 int hungup;959 QLock lk;960 Rendez r;961 Qel *head;962 Qel *tail;963 };965 Queue*966 qalloc(void)967 {968 Queue *q;970 q = mallocz(sizeof(Queue), 1);971 if(q == nil)972 return nil;973 q->r.l = &q->lk;974 return q;975 }977 int978 sendq(Queue *q, void *p)979 {980 Qel *e;982 e = emalloc(sizeof(Qel));983 qlock(&q->lk);984 if(q->hungup){985 werrstr("hungup queue");986 qunlock(&q->lk);987 return -1;988 }989 e->p = p;990 e->next = nil;991 if(q->head == nil)992 q->head = e;993 else994 q->tail->next = e;995 q->tail = e;996 rwakeup(&q->r);997 qunlock(&q->lk);998 return 0;999 }1001 void*1002 recvq(Queue *q)1003 {1004 void *p;1005 Qel *e;1007 qlock(&q->lk);1008 while(q->head == nil && !q->hungup)1009 rsleep(&q->r);1010 if(q->hungup){1011 qunlock(&q->lk);1012 return nil;1013 }1014 e = q->head;1015 q->head = e->next;1016 qunlock(&q->lk);1017 p = e->p;1018 free(e);1019 return p;1020 }1022 uchar*1023 read9ppkt(Ioproc *io, int fd)1024 {1025 uchar buf[4], *pkt;1026 int n, nn;1028 n = ioreadn(io, fd, buf, 4);1029 if(n != 4)1030 return nil;1031 n = GBIT32(buf);1032 pkt = emalloc(n);1033 PBIT32(pkt, n);1034 nn = ioreadn(io, fd, pkt+4, n-4);1035 if(nn != n-4){1036 free(pkt);1037 return nil;1038 }1039 /* would do this if we ever got one of these, but we only generate them1040 if(pkt[4] == Ropenfd){1041 newfd = iorecvfd(io, fd);1042 PBIT32(pkt+n-4, newfd);1043 }1044 */1045 return pkt;1046 }1048 Msg*1049 mread9p(Ioproc *io, int fd)1050 {1051 int n, nn;1052 uchar *pkt;1053 Msg *m;1055 if((pkt = read9ppkt(io, fd)) == nil)1056 return nil;1058 m = msgnew();1059 m->tpkt = pkt;1060 n = GBIT32(pkt);1061 nn = convM2S(pkt, n, &m->tx);1062 if(nn != n){1063 fprint(2, "read bad packet from %d\n", fd);1064 return nil;1065 }1066 return m;1067 }1069 int1070 mwrite9p(Ioproc *io, int fd, uchar *pkt)1071 {1072 int n, nfd;1074 n = GBIT32(pkt);1075 if(verbose > 2) fprint(2, "write %d %d %.*H\n", fd, n, n, pkt);1076 if(iowrite(io, fd, pkt, n) != n){1077 fprint(2, "write error: %r\n");1078 return -1;1079 }1080 if(pkt[4] == Ropenfd){1081 nfd = GBIT32(pkt+n-4);1082 if(iosendfd(io, fd, nfd) < 0){1083 fprint(2, "send fd error: %r\n");1084 return -1;1085 }1086 }1087 return 0;1088 }1090 void1091 restring(uchar *pkt, int pn, char *s)1092 {1093 int n;1095 if(s < (char*)pkt || s >= (char*)pkt+pn)1096 return;1098 n = strlen(s);1099 memmove(s+1, s, n);1100 PBIT16((uchar*)s-1, n);1101 }1103 void1104 rewritehdr(Fcall *f, uchar *pkt)1105 {1106 int i, n;1108 n = GBIT32(pkt);1109 PBIT16(pkt+5, f->tag);1110 switch(f->type){1111 case Tversion:1112 case Rversion:1113 restring(pkt, n, f->version);1114 break;1115 case Tauth:1116 PBIT32(pkt+7, f->afid);1117 restring(pkt, n, f->uname);1118 restring(pkt, n, f->aname);1119 break;1120 case Tflush:1121 PBIT16(pkt+7, f->oldtag);1122 break;1123 case Tattach:1124 restring(pkt, n, f->uname);1125 restring(pkt, n, f->aname);1126 PBIT32(pkt+7, f->fid);1127 PBIT32(pkt+11, f->afid);1128 break;1129 case Twalk:1130 PBIT32(pkt+7, f->fid);1131 PBIT32(pkt+11, f->newfid);1132 for(i=0; i<f->nwname; i++)1133 restring(pkt, n, f->wname[i]);1134 break;1135 case Tcreate:1136 restring(pkt, n, f->name);1137 /* fall through */1138 case Topen:1139 case Tread:1140 case Twrite:1141 case Tclunk:1142 case Tremove:1143 case Tstat:1144 case Twstat:1145 PBIT32(pkt+7, f->fid);1146 break;1147 case Rerror:1148 restring(pkt, n, f->ename);1149 break;1150 }1151 }1153 #ifdef _LIBC_H_1154 /* unix select-based polling */1155 struct Ioproc1156 {1157 Channel *c;1158 Ioproc *next;1159 int index;1160 };1162 Ioproc*1163 ioproc(void)1164 {1165 return (Ioproc*)-1;1166 }1168 void1169 closeioproc(Ioproc *io)1170 {1171 }1173 long1174 ioread(Ioproc *io, int fd, void *v, long n)1175 {1176 USED(io);1178 return threadread(fd, v, n);1179 }1181 long1182 ioreadn(Ioproc *io, int fd, void *v, long n)1183 {1184 long tot, m;1185 uchar *u;1187 u = v;1188 for(tot=0; tot<n; tot+=m){1189 m = ioread(io, fd, u+tot, n-tot);1190 if(m <= 0){1191 if(tot)1192 break;1193 return m;1194 }1195 }1196 return tot;1197 }1199 int1200 iorecvfd(Ioproc *io, int fd)1201 {1202 int r;1204 threadfdnoblock(fd);1205 while((r=recvfd(fd)) < 0){1206 if(errno == EINTR)1207 continue;1208 if(errno == EWOULDBLOCK || errno == EAGAIN){1209 threadfdwait(fd, 'r');1210 continue;1211 }1212 break;1213 }1214 return r;1215 }1217 int1218 iosendfd(Ioproc *io, int s, int fd)1219 {1220 int r;1222 threadfdnoblock(s);1223 while((r=sendfd(s, fd)) < 0){1224 if(errno == EINTR)1225 continue;1226 if(errno == EWOULDBLOCK || errno == EAGAIN){1227 threadfdwait(fd, 'w');1228 continue;1229 }1230 break;1231 }1232 return r;1233 }1235 static long1236 _iowrite(Ioproc *io, int fd, void *v, long n)1237 {1238 USED(io);1239 return threadwrite(fd, v, n);1240 }1242 long1243 iowrite(Ioproc *io, int fd, void *v, long n)1244 {1245 long tot, m;1246 uchar *u;1248 u = v;1249 for(tot=0; tot<n; tot+=m){1250 m = _iowrite(io, fd, u+tot, n-tot);1251 if(m < 0){1252 if(tot)1253 break;1254 return m;1255 }1256 }1257 return tot;1258 }1260 int1261 iolisten(Ioproc *io, char *dir, char *ndir)1262 {1263 int fd;1264 int r;1265 extern int _p9netfd(char*);1266 USED(io);1268 if((fd = _p9netfd(dir)) < 0)1269 return -1;1270 threadfdnoblock(fd);1271 while((r=listen(dir, ndir)) < 0){1272 if(errno == EINTR)1273 continue;1274 if(errno == EWOULDBLOCK || errno == EAGAIN){1275 threadfdwait(fd, 'r');1276 continue;1277 }1278 break;1279 }1280 return r;1281 }1283 int1284 ioaccept(Ioproc *io, int fd, char *dir)1285 {1286 int r;1287 USED(io);1289 threadfdnoblock(fd);1290 while((r=accept(fd, dir)) < 0){1291 if(errno == EINTR)1292 continue;1293 if(errno == EWOULDBLOCK || errno == EAGAIN){1294 threadfdwait(fd, 'r');1295 continue;1296 }1297 break;1298 }1299 return r;1300 }1302 #else1303 /* real plan 9 io procs */1304 static long1305 _iolisten(va_list *arg)1306 {1307 char *a, *b;1309 a = va_arg(*arg, char*);1310 b = va_arg(*arg, char*);1311 return listen(a, b);1312 }1314 int1315 iolisten(Ioproc *io, char *a, char *b)1316 {1317 return iocall(io, _iolisten, a, b);1318 }1320 static long1321 _ioaccept(va_list *arg)1322 {1323 int fd;1324 char *dir;1326 fd = va_arg(*arg, int);1327 dir = va_arg(*arg, char*);1328 return accept(fd, dir);1329 }1331 int1332 ioaccept(Ioproc *io, int fd, char *dir)1333 {1334 return iocall(io, _ioaccept, fd, dir);1335 }1336 #endif