Blob
1 #include <u.h>2 #include <libc.h>3 #include <fcall.h>4 #include <thread.h>5 #include <poll.h>6 #include <errno.h>8 enum9 {10 STACK = 32768,11 NHASH = 31,12 MAXMSG = 64, /* per connection */13 };15 typedef struct Hash Hash;16 typedef struct Fid Fid;17 typedef struct Msg Msg;18 typedef struct Conn Conn;19 typedef struct Queue Queue;21 struct Hash22 {23 Hash *next;24 uint n;25 void *v;26 };28 struct Fid29 {30 int fid;31 int ref;32 int cfid;33 Fid *next;34 };36 struct Msg37 {38 Conn *c;39 int internal;40 int ref;41 int ctag;42 int tag;43 int isopenfd;44 Fcall tx;45 Fcall rx;46 Fid *fid;47 Fid *newfid;48 Fid *afid;49 Msg *oldm;50 Msg *next;51 uchar *tpkt;52 uchar *rpkt;53 };55 struct Conn56 {57 int fd;58 int fdmode;59 Fid *fdfid;60 int nmsg;61 int nfid;62 Channel *inc;63 Channel *internal;64 int inputstalled;65 char dir[40];66 Hash *tag[NHASH];67 Hash *fid[NHASH];68 Queue *outq;69 Queue *inq;70 };72 char *addr;73 int afd;74 char adir[40];75 int isunix;76 Queue *outq;77 Queue *inq;78 int verbose;79 int msize = 8192;81 void *gethash(Hash**, uint);82 int puthash(Hash**, uint, void*);83 int delhash(Hash**, uint, void*);84 Msg *mread9p(Ioproc*, int);85 int mwrite9p(Ioproc*, int, uchar*);86 uchar *read9ppkt(Ioproc*, int);87 int write9ppkt(int, uchar*);88 Msg *msgnew(void);89 void msgput(Msg*);90 Msg *msgget(int);91 Fid *fidnew(int);92 void fidput(Fid*);93 void *emalloc(int);94 void *erealloc(void*, int);95 Queue *qalloc(void);96 int sendq(Queue*, void*);97 void *recvq(Queue*);98 void connthread(void*);99 void connoutthread(void*);100 void listenthread(void*);101 void outputthread(void*);102 void inputthread(void*);103 void rewritehdr(Fcall*, uchar*);104 int tlisten(char*, char*);105 int taccept(int, char*);106 int iolisten(Ioproc*, char*, char*);107 int ioaccept(Ioproc*, int, char*);108 int iorecvfd(Ioproc*, int);109 int iosendfd(Ioproc*, int, int);110 void mainproc(void*);111 int ignorepipe(void*, char*);113 void114 usage(void)115 {116 fprint(2, "usage: 9pserve [-s service] [-u] address\n");117 fprint(2, "\treads/writes 9P messages on stdin/stdout\n");118 exits("usage");119 }121 uchar vbuf[128];122 extern int _threaddebuglevel;123 void124 threadmain(int argc, char **argv)125 {126 char *file;128 ARGBEGIN{129 default:130 usage();131 case 'v':132 verbose++;133 break;134 case 's':135 close(0);136 if(open(file=EARGF(usage()), ORDWR) != 0)137 sysfatal("open %s: %r", file);138 dup(0, 1);139 break;140 case 'u':141 isunix = 1;142 break;143 }ARGEND145 if(verbose) fprint(2, "9pserve running\n");146 if(argc != 1)147 usage();148 addr = argv[0];150 if((afd = announce(addr, adir)) < 0)151 sysfatal("announce %s: %r", addr);153 if(verbose) fprint(2, "9pserve forking\n");154 switch(fork()){155 case -1:156 sysfatal("fork: %r");157 case 0:158 if(verbose) fprint(2, "running mainproc\n");159 mainproc(nil);160 if(verbose) fprint(2, "mainproc finished\n");161 _exits(0);162 default:163 if(verbose) fprint(2, "9pserve exiting\n");164 _exits(0);165 }166 }168 void169 mainproc(void *v)170 {171 int n;172 Fcall f;173 USED(v);175 atnotify(ignorepipe, 1);176 fmtinstall('D', dirfmt);177 fmtinstall('M', dirmodefmt);178 fmtinstall('F', fcallfmt);179 fmtinstall('H', encodefmt);181 outq = qalloc();182 inq = qalloc();184 f.type = Tversion;185 f.version = "9P2000";186 f.msize = msize;187 f.tag = NOTAG;188 n = convS2M(&f, vbuf, sizeof vbuf);189 if(verbose > 1) fprint(2, "* <- %F\n", &f);190 write(1, vbuf, n);191 n = read9pmsg(0, vbuf, sizeof vbuf);192 if(convM2S(vbuf, n, &f) != n)193 sysfatal("convM2S failure");194 if(f.msize < msize)195 msize = f.msize;196 if(verbose > 1) fprint(2, "* -> %F\n", &f);198 threadcreate(inputthread, nil, STACK);199 threadcreate(outputthread, nil, STACK);200 threadcreate(listenthread, nil, STACK);201 threadexits(0);202 }204 int205 ignorepipe(void *v, char *s)206 {207 USED(v);208 if(strcmp(s, "sys: write on closed pipe") == 0)209 return 1;210 fprint(2, "msg: %s\n", s);211 return 0;212 }214 void215 listenthread(void *arg)216 {217 Conn *c;218 Ioproc *io;220 io = ioproc();221 USED(arg);222 for(;;){223 c = emalloc(sizeof(Conn));224 c->fd = iolisten(io, adir, c->dir);225 if(c->fd < 0){226 if(verbose) fprint(2, "listen: %r\n");227 close(afd);228 free(c);229 return;230 }231 c->inc = chancreate(sizeof(void*), 0);232 c->internal = chancreate(sizeof(void*), 0);233 c->inq = qalloc();234 c->outq = qalloc();235 if(verbose) fprint(2, "incoming call on %s\n", c->dir);236 threadcreate(connthread, c, STACK);237 }238 }240 void241 send9pmsg(Msg *m)242 {243 int n, nn;245 n = sizeS2M(&m->rx);246 m->rpkt = emalloc(n);247 nn = convS2M(&m->rx, m->rpkt, n);248 if(nn != n)249 sysfatal("sizeS2M + convS2M disagree");250 sendq(m->c->outq, m);251 }253 void254 sendomsg(Msg *m)255 {256 int n, nn;258 n = sizeS2M(&m->tx);259 m->tpkt = emalloc(n);260 nn = convS2M(&m->tx, m->tpkt, n);261 if(nn != n)262 sysfatal("sizeS2M + convS2M disagree");263 sendq(outq, m);264 }266 void267 err(Msg *m, char *ename)268 {269 m->rx.type = Rerror;270 m->rx.ename = ename;271 m->rx.tag = m->tx.tag;272 send9pmsg(m);273 }275 void276 connthread(void *arg)277 {278 int i, fd;279 Conn *c;280 Hash *h;281 Msg *m, *om;282 Fid *f;283 Ioproc *io;285 c = arg;286 io = ioproc();287 fd = ioaccept(io, c->fd, c->dir);288 if(fd < 0){289 if(verbose) fprint(2, "accept %s: %r\n", c->dir);290 goto out;291 }292 close(c->fd);293 c->fd = fd;294 threadcreate(connoutthread, c, STACK);295 while((m = mread9p(io, c->fd)) != nil){296 if(verbose > 1) fprint(2, "fd#%d -> %F\n", c->fd, &m->tx);297 m->c = c;298 m->ctag = m->tx.tag;299 c->nmsg++;300 if(puthash(c->tag, m->tx.tag, m) < 0){301 err(m, "duplicate tag");302 continue;303 }304 m->ref++;305 switch(m->tx.type){306 case Tversion:307 m->rx.tag = m->tx.tag;308 m->rx.msize = m->tx.msize;309 if(m->rx.msize > msize)310 m->rx.msize = msize;311 m->rx.version = "9P2000";312 m->rx.type = Rversion;313 send9pmsg(m);314 continue;315 case Tflush:316 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){317 m->rx.tag = m->tx.tag;318 m->rx.type = Rflush;319 send9pmsg(m);320 continue;321 }322 m->oldm->ref++;323 break;324 case Tattach:325 m->afid = nil;326 if(m->tx.afid != NOFID327 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){328 err(m, "unknown fid");329 continue;330 }331 m->fid = fidnew(m->tx.fid);332 if(puthash(c->fid, m->tx.fid, m->fid) < 0){333 err(m, "duplicate fid");334 continue;335 }336 m->fid->ref++;337 break;338 case Twalk:339 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){340 err(m, "unknown fid");341 continue;342 }343 m->fid->ref++;344 if(m->tx.newfid == m->tx.fid){345 m->fid->ref++;346 m->newfid = m->fid;347 }else{348 m->newfid = fidnew(m->tx.newfid);349 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){350 err(m, "duplicate fid");351 continue;352 }353 m->newfid->ref++;354 }355 break;356 case Tauth:357 m->afid = fidnew(m->tx.afid);358 if(puthash(c->fid, m->tx.afid, m->afid) < 0){359 err(m, "duplicate fid");360 continue;361 }362 m->afid->ref++;363 break;364 case Topenfd:365 if(m->tx.mode&~(OTRUNC|3)){366 err(m, "bad openfd mode");367 continue;368 }369 m->isopenfd = 1;370 m->tx.type = Topen;371 m->tpkt[4] = Topen;372 /* fall through */373 case Tcreate:374 case Topen:375 case Tclunk:376 case Tread:377 case Twrite:378 case Tremove:379 case Tstat:380 case Twstat:381 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){382 err(m, "unknown fid");383 continue;384 }385 m->fid->ref++;386 break;387 }389 /* have everything - translate and send */390 m->c = c;391 m->ctag = m->tx.tag;392 m->tx.tag = m->tag;393 if(m->fid)394 m->tx.fid = m->fid->fid;395 if(m->newfid)396 m->tx.newfid = m->newfid->fid;397 if(m->afid)398 m->tx.afid = m->afid->fid;399 if(m->oldm)400 m->tx.oldtag = m->oldm->tag;401 /* reference passes to outq */402 sendq(outq, m);403 while(c->nmsg >= MAXMSG){404 c->inputstalled = 1;405 recvp(c->inc);406 }407 }409 if(verbose) fprint(2, "%s eof\n", c->dir);411 /* flush all outstanding messages */412 for(i=0; i<NHASH; i++){413 for(h=c->tag[i]; h; h=h->next){414 om = h->v;415 m = msgnew();416 m->internal = 1;417 m->c = c;418 c->nmsg++;419 m->tx.type = Tflush;420 m->tx.tag = m->tag;421 m->tx.oldtag = om->tag;422 m->oldm = om;423 om->ref++;424 m->ref++; /* for outq */425 sendomsg(m);426 recvp(c->internal);427 msgput(m); /* got from recvp */428 msgput(m); /* got from msgnew */429 msgput(om); /* got from hash table */430 }431 }433 /* clunk all outstanding fids */434 for(i=0; i<NHASH; i++){435 for(h=c->fid[i]; h; h=h->next){436 f = h->v;437 m = msgnew();438 m->internal = 1;439 m->c = c;440 c->nmsg++;441 m->tx.type = Tclunk;442 m->tx.tag = m->tag;443 m->tx.fid = f->fid;444 m->fid = f;445 f->ref++;446 m->ref++;447 sendomsg(m);448 recvp(c->internal);449 msgput(m); /* got from recvp */450 msgput(m); /* got from msgnew */451 fidput(f); /* got from hash table */452 }453 }455 out:456 assert(c->nmsg == 0);457 assert(c->nfid == 0);458 close(c->fd);459 chanfree(c->internal);460 c->internal = 0;461 chanfree(c->inc);462 c->inc = 0;463 free(c->inq);464 c->inq = 0;465 free(c->outq);466 c->outq = 0;467 free(c);468 }470 static void471 openfdthread(void *v)472 {473 Conn *c;474 Fid *fid;475 Msg *m;476 int n;477 vlong tot;478 Ioproc *io;479 char buf[1024];481 c = v;482 fid = c->fdfid;483 io = ioproc();485 tot = 0;486 if(c->fdmode == OREAD){487 for(;;){488 if(verbose) fprint(2, "tread...");489 m = msgnew();490 m->internal = 1;491 m->c = c;492 m->tx.type = Tread;493 m->tx.count = msize - IOHDRSZ;494 m->tx.fid = fid->fid;495 m->tx.tag = m->tag;496 m->tx.offset = tot;497 m->fid = fid;498 fid->ref++;499 m->ref++;500 sendomsg(m);501 recvp(c->internal);502 if(m->rx.type == Rerror){503 // fprint(2, "read error: %s\n", m->rx.ename);504 break;505 }506 if(m->rx.count == 0)507 break;508 tot += m->rx.count;509 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){510 fprint(2, "pipe write error: %r\n");511 break;512 }513 msgput(m);514 msgput(m);515 }516 }else{517 for(;;){518 if(verbose) fprint(2, "twrite...");519 n = sizeof buf;520 if(n > msize)521 n = msize;522 if((n=ioread(io, c->fd, buf, n)) <= 0){523 if(n < 0)524 fprint(2, "pipe read error: %r\n");525 m = nil;526 break;527 }528 m = msgnew();529 m->internal = 1;530 m->c = c;531 m->tx.type = Twrite;532 m->tx.fid = fid->fid;533 m->tx.data = buf;534 m->tx.count = n;535 m->tx.tag = m->tag;536 m->tx.offset = tot;537 m->fid = fid;538 fid->ref++;539 m->ref++;540 sendomsg(m);541 recvp(c->internal);542 if(m->rx.type == Rerror){543 // fprint(2, "write error: %s\n", m->rx.ename);544 continue;545 }546 tot = n;547 msgput(m);548 msgput(m);549 }550 }551 if(verbose) fprint(2, "eof on %d fid %d\n", c->fd, fid->fid);552 close(c->fd);553 closeioproc(io);554 if(m){555 msgput(m);556 msgput(m);557 }558 if(fid->ref == 1){559 m = msgnew();560 m->internal = 1;561 m->c = c;562 m->tx.type = Tclunk;563 m->tx.fid = fid->fid;564 m->fid = fid;565 fid->ref++;566 m->ref++;567 sendomsg(m);568 recvp(c->internal);569 msgput(m);570 msgput(m);571 }572 fidput(fid);573 c->fdfid = nil;574 chanfree(c->internal);575 c->internal = 0;576 free(c);577 }579 int580 xopenfd(Msg *m)581 {582 char errs[ERRMAX];583 int n, p[2];584 Conn *nc;586 if(pipe(p) < 0){587 rerrstr(errs, sizeof errs);588 err(m, errs);589 }590 if(verbose) fprint(2, "xopen pipe %d %d...", p[0], p[1]);592 /* now we're committed. */594 /* a new connection for this fid */595 nc = emalloc(sizeof(Conn));596 nc->internal = chancreate(sizeof(void*), 0);598 /* a ref for us */599 nc->fdfid = m->fid;600 m->fid->ref++;601 nc->fdmode = m->tx.mode;602 nc->fd = p[0];604 /* a thread to tend the pipe */605 threadcreate(openfdthread, nc, STACK);607 /* if mode is ORDWR, that openfdthread will write; start a reader */608 if((m->tx.mode&3) == ORDWR){609 nc = emalloc(sizeof(Conn));610 nc->internal = chancreate(sizeof(void*), 0);611 nc->fdfid = m->fid;612 m->fid->ref++;613 nc->fdmode = OREAD;614 nc->fd = dup(p[0], -1);615 threadcreate(openfdthread, nc, STACK);616 }618 /* steal fid from other connection */619 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)620 fidput(m->fid);622 /* rewrite as Ropenfd */623 m->rx.type = Ropenfd;624 n = GBIT32(m->rpkt);625 m->rpkt = erealloc(m->rpkt, n+4);626 PBIT32(m->rpkt+n, p[1]);627 n += 4;628 PBIT32(m->rpkt, n);629 m->rpkt[4] = Ropenfd;630 m->rx.unixfd = p[1];631 return 0;632 }634 void635 connoutthread(void *arg)636 {637 int err;638 Conn *c;639 Msg *m, *om;640 Ioproc *io;642 c = arg;643 io = ioproc();644 while((m = recvq(c->outq)) != nil){645 err = m->tx.type+1 != m->rx.type;646 if(!err && m->isopenfd)647 if(xopenfd(m) < 0)648 continue;649 switch(m->tx.type){650 case Tflush:651 om = m->oldm;652 if(om)653 if(delhash(om->c->tag, om->ctag, om) == 0)654 msgput(om);655 break;656 case Tclunk:657 case Tremove:658 if(m->fid)659 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)660 fidput(m->fid);661 break;662 case Tauth:663 if(err && m->afid){664 fprint(2, "auth error\n");665 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)666 fidput(m->fid);667 }668 break;669 case Tattach:670 if(err && m->fid)671 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)672 fidput(m->fid);673 break;674 case Twalk:675 if(err && m->tx.fid != m->tx.newfid && m->newfid)676 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)677 fidput(m->newfid);678 break;679 }680 if(delhash(m->c->tag, m->ctag, m) == 0)681 msgput(m);682 if(verbose > 1) fprint(2, "fd#%d <- %F\n", c->fd, &m->rx);683 rewritehdr(&m->rx, m->rpkt);684 if(mwrite9p(io, c->fd, m->rpkt) < 0)685 if(verbose) fprint(2, "write error: %r\n");686 msgput(m);687 if(c->inputstalled && c->nmsg < MAXMSG)688 nbsendp(c->inc, 0);689 }690 closeioproc(io);691 }693 void694 outputthread(void *arg)695 {696 Msg *m;697 Ioproc *io;699 USED(arg);700 io = ioproc();701 while((m = recvq(outq)) != nil){702 if(verbose > 1) fprint(2, "* <- %F\n", &m->tx);703 rewritehdr(&m->tx, m->tpkt);704 if(mwrite9p(io, 1, m->tpkt) < 0)705 sysfatal("output error: %r");706 msgput(m);707 }708 closeioproc(io);709 fprint(2, "output eof\n");710 threadexitsall(0);711 }713 void714 inputthread(void *arg)715 {716 uchar *pkt;717 int n, nn, tag;718 Msg *m;719 Ioproc *io;721 if(verbose) fprint(2, "input thread\n");722 io = ioproc();723 USED(arg);724 while((pkt = read9ppkt(io, 0)) != nil){725 n = GBIT32(pkt);726 if(n < 7){727 fprint(2, "short 9P packet from server\n");728 free(pkt);729 continue;730 }731 if(verbose > 2) fprint(2, "read %.*H\n", n, pkt);732 tag = GBIT16(pkt+5);733 if((m = msgget(tag)) == nil){734 fprint(2, "unexpected 9P response tag %d\n", tag);735 free(pkt);736 continue;737 }738 if((nn = convM2S(pkt, n, &m->rx)) != n){739 fprint(2, "bad packet - convM2S %d but %d\n", nn, n);740 free(pkt);741 msgput(m);742 continue;743 }744 if(verbose > 1) fprint(2, "* -> %F\n", &m->rx);745 m->rpkt = pkt;746 m->rx.tag = m->ctag;747 if(m->internal)748 sendp(m->c->internal, 0);749 else750 sendq(m->c->outq, m);751 }752 closeioproc(io);753 //fprint(2, "input eof\n");754 threadexitsall(0);755 }757 void*758 gethash(Hash **ht, uint n)759 {760 Hash *h;762 for(h=ht[n%NHASH]; h; h=h->next)763 if(h->n == n)764 return h->v;765 return nil;766 }768 int769 delhash(Hash **ht, uint n, void *v)770 {771 Hash *h, **l;773 for(l=&ht[n%NHASH]; h=*l; l=&h->next)774 if(h->n == n){775 if(h->v != v){776 if(verbose) fprint(2, "delhash %d got %p want %p\n", n, h->v, v);777 return -1;778 }779 *l = h->next;780 free(h);781 return 0;782 }783 return -1;784 }786 int787 puthash(Hash **ht, uint n, void *v)788 {789 Hash *h;791 if(gethash(ht, n))792 return -1;793 h = emalloc(sizeof(Hash));794 h->next = ht[n%NHASH];795 h->n = n;796 h->v = v;797 ht[n%NHASH] = h;798 return 0;799 }801 Fid **fidtab;802 int nfidtab;803 Fid *freefid;805 Fid*806 fidnew(int cfid)807 {808 Fid *f;810 if(freefid == nil){811 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));812 fidtab[nfidtab] = emalloc(sizeof(Fid));813 freefid = fidtab[nfidtab];814 freefid->fid = nfidtab++;815 }816 f = freefid;817 freefid = f->next;818 f->cfid = cfid;819 f->ref = 1;820 return f;821 }823 void824 fidput(Fid *f)825 {826 if(f == nil)827 return;828 assert(f->ref > 0);829 if(--f->ref > 0)830 return;831 f->next = freefid;832 f->cfid = -1;833 freefid = f;834 }836 Msg **msgtab;837 int nmsgtab;838 Msg *freemsg;840 Msg*841 msgnew(void)842 {843 Msg *m;845 if(freemsg == nil){846 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));847 msgtab[nmsgtab] = emalloc(sizeof(Msg));848 freemsg = msgtab[nmsgtab];849 freemsg->tag = nmsgtab++;850 }851 m = freemsg;852 freemsg = m->next;853 m->ref = 1;854 return m;855 }857 void858 msgput(Msg *m)859 {860 if(verbose > 2) fprint(2, "msgput tag %d/%d ref %d\n", m->tag, m->ctag, m->ref);861 assert(m->ref > 0);862 if(--m->ref > 0)863 return;864 m->c->nmsg--;865 m->c = nil;866 fidput(m->fid);867 m->fid = nil;868 fidput(m->afid);869 m->afid = nil;870 fidput(m->newfid);871 m->newfid = nil;872 free(m->tpkt);873 m->tpkt = nil;874 free(m->rpkt);875 m->rpkt = nil;876 if(m->rx.type == Ropenfd)877 close(m->rx.unixfd);878 m->rx.unixfd = -1;879 m->isopenfd = 0;880 m->internal = 0;881 m->next = freemsg;882 freemsg = m;883 }885 Msg*886 msgget(int n)887 {888 Msg *m;890 if(n < 0 || n >= nmsgtab)891 return nil;892 m = msgtab[n];893 if(m->ref == 0)894 return nil;895 if(verbose) fprint(2, "msgget %d = %p\n", n, m);896 m->ref++;897 return m;898 }901 void*902 emalloc(int n)903 {904 void *v;906 v = mallocz(n, 1);907 if(v == nil){908 abort();909 sysfatal("out of memory allocating %d", n);910 }911 return v;912 }914 void*915 erealloc(void *v, int n)916 {917 v = realloc(v, n);918 if(v == nil){919 abort();920 sysfatal("out of memory reallocating %d", n);921 }922 return v;923 }925 typedef struct Qel Qel;926 struct Qel927 {928 Qel *next;929 void *p;930 };932 struct Queue933 {934 int hungup;935 QLock lk;936 Rendez r;937 Qel *head;938 Qel *tail;939 };941 Queue*942 qalloc(void)943 {944 Queue *q;946 q = mallocz(sizeof(Queue), 1);947 if(q == nil)948 return nil;949 q->r.l = &q->lk;950 return q;951 }953 int954 sendq(Queue *q, void *p)955 {956 Qel *e;958 e = emalloc(sizeof(Qel));959 qlock(&q->lk);960 if(q->hungup){961 werrstr("hungup queue");962 qunlock(&q->lk);963 return -1;964 }965 e->p = p;966 e->next = nil;967 if(q->head == nil)968 q->head = e;969 else970 q->tail->next = e;971 q->tail = e;972 rwakeup(&q->r);973 qunlock(&q->lk);974 return 0;975 }977 void*978 recvq(Queue *q)979 {980 void *p;981 Qel *e;983 qlock(&q->lk);984 while(q->head == nil && !q->hungup)985 rsleep(&q->r);986 if(q->hungup){987 qunlock(&q->lk);988 return nil;989 }990 e = q->head;991 q->head = e->next;992 qunlock(&q->lk);993 p = e->p;994 free(e);995 return p;996 }998 uchar*999 read9ppkt(Ioproc *io, int fd)1000 {1001 uchar buf[4], *pkt;1002 int n, nn;1004 n = ioreadn(io, fd, buf, 4);1005 if(n != 4)1006 return nil;1007 n = GBIT32(buf);1008 pkt = emalloc(n);1009 PBIT32(pkt, n);1010 nn = ioreadn(io, fd, pkt+4, n-4);1011 if(nn != n-4){1012 free(pkt);1013 return nil;1014 }1015 /* would do this if we ever got one of these, but we only generate them1016 if(pkt[4] == Ropenfd){1017 newfd = iorecvfd(io, fd);1018 PBIT32(pkt+n-4, newfd);1019 }1020 */1021 return pkt;1022 }1024 Msg*1025 mread9p(Ioproc *io, int fd)1026 {1027 int n, nn;1028 uchar *pkt;1029 Msg *m;1031 if((pkt = read9ppkt(io, fd)) == nil)1032 return nil;1034 m = msgnew();1035 m->tpkt = pkt;1036 n = GBIT32(pkt);1037 nn = convM2S(pkt, n, &m->tx);1038 if(nn != n){1039 fprint(2, "read bad packet from %d\n", fd);1040 return nil;1041 }1042 return m;1043 }1045 int1046 mwrite9p(Ioproc *io, int fd, uchar *pkt)1047 {1048 int n, nfd;1050 n = GBIT32(pkt);1051 if(verbose > 2) fprint(2, "write %d %d %.*H\n", fd, n, n, pkt);1052 if(iowrite(io, fd, pkt, n) != n){1053 fprint(2, "write error: %r\n");1054 return -1;1055 }1056 if(pkt[4] == Ropenfd){1057 nfd = GBIT32(pkt+n-4);1058 if(iosendfd(io, fd, nfd) < 0){1059 fprint(2, "send fd error: %r\n");1060 return -1;1061 }1062 }1063 return 0;1064 }1066 void1067 restring(uchar *pkt, int pn, char *s)1068 {1069 int n;1071 if(s < (char*)pkt || s >= (char*)pkt+pn)1072 return;1074 n = strlen(s);1075 memmove(s+1, s, n);1076 PBIT16((uchar*)s-1, n);1077 }1079 void1080 rewritehdr(Fcall *f, uchar *pkt)1081 {1082 int i, n;1084 n = GBIT32(pkt);1085 PBIT16(pkt+5, f->tag);1086 switch(f->type){1087 case Tversion:1088 case Rversion:1089 restring(pkt, n, f->version);1090 break;1091 case Tauth:1092 PBIT32(pkt+7, f->afid);1093 restring(pkt, n, f->uname);1094 restring(pkt, n, f->aname);1095 break;1096 case Tflush:1097 PBIT16(pkt+7, f->oldtag);1098 break;1099 case Tattach:1100 restring(pkt, n, f->uname);1101 restring(pkt, n, f->aname);1102 PBIT32(pkt+7, f->fid);1103 PBIT32(pkt+11, f->afid);1104 break;1105 case Twalk:1106 PBIT32(pkt+7, f->fid);1107 PBIT32(pkt+11, f->newfid);1108 for(i=0; i<f->nwname; i++)1109 restring(pkt, n, f->wname[i]);1110 break;1111 case Tcreate:1112 restring(pkt, n, f->name);1113 /* fall through */1114 case Topen:1115 case Tread:1116 case Twrite:1117 case Tclunk:1118 case Tremove:1119 case Tstat:1120 case Twstat:1121 PBIT32(pkt+7, f->fid);1122 break;1123 case Rerror:1124 restring(pkt, n, f->ename);1125 break;1126 }1127 }1129 #ifdef _LIB9_H_1130 /* unix select-based polling */1131 struct Ioproc1132 {1133 Channel *c;1134 Ioproc *next;1135 int index;1136 };1138 Ioproc*1139 ioproc(void)1140 {1141 return (Ioproc*)-1;1142 }1144 void1145 closeioproc(Ioproc *io)1146 {1147 }1149 long1150 ioread(Ioproc *io, int fd, void *v, long n)1151 {1152 USED(io);1154 return threadread(fd, v, n);1155 }1157 long1158 ioreadn(Ioproc *io, int fd, void *v, long n)1159 {1160 long tot, m;1161 uchar *u;1163 u = v;1164 for(tot=0; tot<n; tot+=m){1165 m = ioread(io, fd, u+tot, n-tot);1166 if(m <= 0){1167 if(tot)1168 break;1169 return m;1170 }1171 }1172 return tot;1173 }1175 int1176 iorecvfd(Ioproc *io, int fd)1177 {1178 int r;1180 threadfdnoblock(fd);1181 while((r=recvfd(fd)) < 0){1182 if(errno == EINTR)1183 continue;1184 if(errno == EWOULDBLOCK || errno == EAGAIN){1185 threadfdwait(fd, 'r');1186 continue;1187 }1188 break;1189 }1190 return r;1191 }1193 int1194 iosendfd(Ioproc *io, int s, int fd)1195 {1196 int r;1198 threadfdnoblock(s);1199 while((r=sendfd(s, fd)) < 0){1200 if(errno == EINTR)1201 continue;1202 if(errno == EWOULDBLOCK || errno == EAGAIN){1203 threadfdwait(fd, 'w');1204 continue;1205 }1206 break;1207 }1208 return r;1209 }1211 static long1212 _iowrite(Ioproc *io, int fd, void *v, long n)1213 {1214 USED(io);1215 return threadwrite(fd, v, n);1216 }1218 long1219 iowrite(Ioproc *io, int fd, void *v, long n)1220 {1221 long tot, m;1222 uchar *u;1224 u = v;1225 for(tot=0; tot<n; tot+=m){1226 m = _iowrite(io, fd, u+tot, n-tot);1227 if(m < 0){1228 if(tot)1229 break;1230 return m;1231 }1232 }1233 return tot;1234 }1236 int1237 iolisten(Ioproc *io, char *dir, char *ndir)1238 {1239 int fd;1240 int r;1241 extern int _p9netfd(char*);1242 USED(io);1244 if((fd = _p9netfd(dir)) < 0)1245 return -1;1246 threadfdnoblock(fd);1247 while((r=listen(dir, ndir)) < 0){1248 if(errno == EINTR)1249 continue;1250 if(errno == EWOULDBLOCK || errno == EAGAIN){1251 threadfdwait(fd, 'r');1252 continue;1253 }1254 break;1255 }1256 return r;1257 }1259 int1260 ioaccept(Ioproc *io, int fd, char *dir)1261 {1262 int r;1263 USED(io);1265 threadfdnoblock(fd);1266 while((r=accept(fd, dir)) < 0){1267 if(errno == EINTR)1268 continue;1269 if(errno == EWOULDBLOCK || errno == EAGAIN){1270 threadfdwait(fd, 'r');1271 continue;1272 }1273 break;1274 }1275 return r;1276 }1278 #else1279 /* real plan 9 io procs */1280 static long1281 _iolisten(va_list *arg)1282 {1283 char *a, *b;1285 a = va_arg(*arg, char*);1286 b = va_arg(*arg, char*);1287 return listen(a, b);1288 }1290 int1291 iolisten(Ioproc *io, char *a, char *b)1292 {1293 return iocall(io, _iolisten, a, b);1294 }1296 static long1297 _ioaccept(va_list *arg)1298 {1299 int fd;1300 char *dir;1302 fd = va_arg(*arg, int);1303 dir = va_arg(*arg, char*);1304 return accept(fd, dir);1305 }1307 int1308 ioaccept(Ioproc *io, int fd, char *dir)1309 {1310 return iocall(io, _ioaccept, fd, dir);1311 }1312 #endif