12 MAXMSG = 64, /* per connection */
15 typedef struct Hash Hash;
16 typedef struct Fid Fid;
17 typedef struct Msg Msg;
18 typedef struct Conn Conn;
19 typedef struct Queue Queue;
80 void *gethash(Hash**, uint);
81 int puthash(Hash**, uint, void*);
82 int delhash(Hash**, uint, void*);
83 Msg *mread9p(Ioproc*, int);
84 int mwrite9p(Ioproc*, int, uchar*);
85 uchar *read9ppkt(Ioproc*, int);
86 int write9ppkt(int, uchar*);
93 void *erealloc(void*, int);
95 int sendq(Queue*, void*);
97 void pollthread(void*);
98 void connthread(void*);
99 void connoutthread(void*);
100 void listenthread(void*);
101 void outputthread(void*);
102 void inputthread(void*);
103 void rewritehdr(Fcall*, uchar*);
104 int tlisten(char*, char*);
105 int taccept(int, char*);
106 int iolisten(Ioproc*, char*, char*);
107 int ioaccept(Ioproc*, int, char*);
108 int iorecvfd(Ioproc*, int);
109 int iosendfd(Ioproc*, int, int);
110 void mainproc(void*);
111 int ignorepipe(void*, char*);
116 fprint(2, "usage: 9pserve [-s service] [-u] address\n");
117 fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
122 extern int _threaddebuglevel;
124 threadmain(int argc, char **argv)
128 if(verbose) fprint(2, "9pserve running\n");
137 if(open(file=EARGF(usage()), ORDWR) != 0)
138 sysfatal("open %s: %r", file);
150 if((afd = announce(addr, adir)) < 0)
151 sysfatal("announce %s: %r", addr);
153 proccreate(mainproc, nil, STACK);
164 yield(); /* let threadmain exit */
166 atnotify(ignorepipe, 1);
167 fmtinstall('D', dirfmt);
168 fmtinstall('M', dirmodefmt);
169 fmtinstall('F', fcallfmt);
170 fmtinstall('H', encodefmt);
176 f.version = "9P2000";
179 n = convS2M(&f, vbuf, sizeof vbuf);
180 if(verbose > 1) fprint(2, "* <- %F\n", &f);
182 n = read9pmsg(0, vbuf, sizeof vbuf);
183 if(convM2S(vbuf, n, &f) != n)
184 sysfatal("convM2S failure");
185 if(verbose > 1) fprint(2, "* -> %F\n", &f);
187 threadcreate(inputthread, nil, STACK);
188 threadcreate(outputthread, nil, STACK);
189 threadcreate(listenthread, nil, STACK);
190 threadcreateidle(pollthread, nil, STACK);
195 ignorepipe(void *v, char *s)
198 if(strcmp(s, "sys: write on closed pipe") == 0)
200 fprint(2, "msg: %s\n", s);
205 listenthread(void *arg)
213 c = emalloc(sizeof(Conn));
214 c->fd = iolisten(io, adir, c->dir);
216 if(verbose) fprint(2, "listen: %r\n");
221 c->inc = chancreate(sizeof(void*), 0);
222 c->internal = chancreate(sizeof(void*), 0);
225 if(verbose) fprint(2, "incoming call on %s\n", c->dir);
226 threadcreate(connthread, c, STACK);
236 m->rpkt = emalloc(n);
237 nn = convS2M(&m->rx, m->rpkt, n);
239 sysfatal("sizeS2M + convS2M disagree");
240 sendq(m->c->outq, m);
249 m->tpkt = emalloc(n);
250 nn = convS2M(&m->tx, m->tpkt, n);
252 sysfatal("sizeS2M + convS2M disagree");
257 err(Msg *m, char *ename)
261 m->rx.tag = m->tx.tag;
266 connthread(void *arg)
277 fd = ioaccept(io, c->fd, c->dir);
279 if(verbose) fprint(2, "accept %s: %r\n", c->dir);
284 threadcreate(connoutthread, c, STACK);
285 while((m = mread9p(io, c->fd)) != nil){
286 if(verbose > 1) fprint(2, "fd#%d -> %F\n", c->fd, &m->tx);
290 if(puthash(c->tag, m->tx.tag, m) < 0){
291 err(m, "duplicate tag");
297 m->rx.tag = m->tx.tag;
298 m->rx.msize = m->tx.msize;
299 if(m->rx.msize > 8192)
301 m->rx.version = "9P2000";
302 m->rx.type = Rversion;
306 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
307 m->rx.tag = m->tx.tag;
316 if(m->tx.afid != NOFID
317 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){
318 err(m, "unknown fid");
321 m->fid = fidnew(m->tx.fid);
322 if(puthash(c->fid, m->tx.fid, m->fid) < 0){
323 err(m, "duplicate fid");
329 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
330 err(m, "unknown fid");
334 if(m->tx.newfid == m->tx.fid){
338 m->newfid = fidnew(m->tx.newfid);
339 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
340 err(m, "duplicate fid");
347 m->afid = fidnew(m->tx.afid);
348 if(puthash(c->fid, m->tx.afid, m->afid) < 0){
349 err(m, "duplicate fid");
355 if(m->tx.mode&~(OTRUNC|3)){
356 err(m, "bad openfd mode");
371 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
372 err(m, "unknown fid");
379 /* have everything - translate and send */
384 m->tx.fid = m->fid->fid;
386 m->tx.newfid = m->newfid->fid;
388 m->tx.afid = m->afid->fid;
390 m->tx.oldtag = m->oldm->tag;
391 /* reference passes to outq */
393 while(c->nmsg >= MAXMSG){
399 if(verbose) fprint(2, "%s eof\n", c->dir);
401 /* flush all outstanding messages */
402 for(i=0; i<NHASH; i++){
403 for(h=c->tag[i]; h; h=h->next){
411 m->tx.oldtag = om->tag;
414 m->ref++; /* for outq */
417 msgput(m); /* got from recvp */
418 msgput(m); /* got from msgnew */
419 msgput(om); /* got from hash table */
423 /* clunk all outstanding fids */
424 for(i=0; i<NHASH; i++){
425 for(h=c->fid[i]; h; h=h->next){
439 msgput(m); /* got from recvp */
440 msgput(m); /* got from msgnew */
441 fidput(f); /* got from hash table */
446 assert(c->nmsg == 0);
447 assert(c->nfid == 0);
449 chanfree(c->internal);
461 openfdthread(void *v)
476 if(c->fdmode == OREAD){
478 if(verbose) fprint(2, "tread...");
484 m->tx.fid = fid->fid;
492 if(m->rx.type == Rerror){
493 // fprint(2, "read error: %s\n", m->rx.ename);
499 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
500 fprint(2, "pipe write error: %r\n");
508 if(verbose) fprint(2, "twrite...");
509 if((n=ioread(io, c->fd, buf, sizeof buf)) <= 0){
511 fprint(2, "pipe read error: %r\n");
519 m->tx.fid = fid->fid;
529 if(m->rx.type == Rerror){
530 // fprint(2, "write error: %s\n", m->rx.ename);
538 if(verbose) fprint(2, "eof on %d fid %d\n", c->fd, fid->fid);
550 m->tx.fid = fid->fid;
561 chanfree(c->internal);
574 rerrstr(errs, sizeof errs);
577 if(verbose) fprint(2, "xopen pipe %d %d...", p[0], p[1]);
579 /* now we're committed. */
581 /* a new connection for this fid */
582 nc = emalloc(sizeof(Conn));
583 nc->internal = chancreate(sizeof(void*), 0);
588 nc->fdmode = m->tx.mode;
591 /* a thread to tend the pipe */
592 threadcreate(openfdthread, nc, STACK);
594 /* if mode is ORDWR, that openfdthread will write; start a reader */
595 if((m->tx.mode&3) == ORDWR){
596 nc = emalloc(sizeof(Conn));
597 nc->internal = chancreate(sizeof(void*), 0);
601 nc->fd = dup(p[0], -1);
602 threadcreate(openfdthread, nc, STACK);
605 /* steal fid from other connection */
606 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
609 /* rewrite as Ropenfd */
610 m->rx.type = Ropenfd;
612 m->rpkt = erealloc(m->rpkt, n+4);
613 PBIT32(m->rpkt+n, p[1]);
616 m->rpkt[4] = Ropenfd;
622 connoutthread(void *arg)
631 while((m = recvq(c->outq)) != nil){
632 err = m->tx.type+1 != m->rx.type;
633 if(!err && m->isopenfd)
640 if(delhash(om->c->tag, om->ctag, om) == 0)
646 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
651 fprint(2, "auth error\n");
652 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
658 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
662 if(err && m->tx.fid != m->tx.newfid && m->newfid)
663 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
667 if(delhash(m->c->tag, m->ctag, m) == 0)
669 if(verbose > 1) fprint(2, "fd#%d <- %F\n", c->fd, &m->rx);
670 rewritehdr(&m->rx, m->rpkt);
671 if(mwrite9p(io, c->fd, m->rpkt) < 0)
672 if(verbose) fprint(2, "write error: %r\n");
674 if(c->inputstalled && c->nmsg < MAXMSG)
681 outputthread(void *arg)
688 while((m = recvq(outq)) != nil){
689 if(verbose > 1) fprint(2, "* <- %F\n", &m->tx);
690 rewritehdr(&m->tx, m->tpkt);
691 if(mwrite9p(io, 1, m->tpkt) < 0)
692 sysfatal("output error: %r");
696 fprint(2, "output eof\n");
701 inputthread(void *arg)
708 if(verbose) fprint(2, "input thread\n");
711 while((pkt = read9ppkt(io, 0)) != nil){
714 fprint(2, "short 9P packet from server\n");
718 if(verbose > 2) fprint(2, "read %.*H\n", n, pkt);
720 if((m = msgget(tag)) == nil){
721 fprint(2, "unexpected 9P response tag %d\n", tag);
725 if((nn = convM2S(pkt, n, &m->rx)) != n){
726 fprint(2, "bad packet - convM2S %d but %d\n", nn, n);
731 if(verbose > 1) fprint(2, "* -> %F\n", &m->rx);
735 sendp(m->c->internal, 0);
737 sendq(m->c->outq, m);
740 fprint(2, "input eof\n");
745 gethash(Hash **ht, uint n)
749 for(h=ht[n%NHASH]; h; h=h->next)
756 delhash(Hash **ht, uint n, void *v)
760 for(l=&ht[n%NHASH]; h=*l; l=&h->next)
763 if(verbose) fprint(2, "delhash %d got %p want %p\n", n, h->v, v);
774 puthash(Hash **ht, uint n, void *v)
780 h = emalloc(sizeof(Hash));
781 h->next = ht[n%NHASH];
798 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
799 fidtab[nfidtab] = emalloc(sizeof(Fid));
800 freefid = fidtab[nfidtab];
801 freefid->fid = nfidtab++;
833 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
834 msgtab[nmsgtab] = emalloc(sizeof(Msg));
835 freemsg = msgtab[nmsgtab];
836 freemsg->tag = nmsgtab++;
847 if(verbose > 2) fprint(2, "msgput tag %d/%d ref %d\n", m->tag, m->ctag, m->ref);
863 if(m->rx.type == Ropenfd)
877 if(n < 0 || n >= nmsgtab)
882 if(verbose) fprint(2, "msgget %d = %p\n", n, m);
896 sysfatal("out of memory allocating %d", n);
902 erealloc(void *v, int n)
907 sysfatal("out of memory reallocating %d", n);
912 typedef struct Qel Qel;
933 q = mallocz(sizeof(Queue), 1);
941 sendq(Queue *q, void *p)
945 e = emalloc(sizeof(Qel));
948 werrstr("hungup queue");
971 while(q->head == nil && !q->hungup)
986 read9ppkt(Ioproc *io, int fd)
991 n = ioreadn(io, fd, buf, 4);
997 nn = ioreadn(io, fd, pkt+4, n-4);
1002 /* would do this if we ever got one of these, but we only generate them
1003 if(pkt[4] == Ropenfd){
1004 newfd = iorecvfd(io, fd);
1005 PBIT32(pkt+n-4, newfd);
1012 mread9p(Ioproc *io, int fd)
1018 if((pkt = read9ppkt(io, fd)) == nil)
1024 nn = convM2S(pkt, n, &m->tx);
1026 fprint(2, "read bad packet from %d\n", fd);
1033 mwrite9p(Ioproc *io, int fd, uchar *pkt)
1038 if(verbose > 2) fprint(2, "write %d %d %.*H\n", fd, n, n, pkt);
1039 if(iowrite(io, fd, pkt, n) != n){
1040 fprint(2, "write error: %r\n");
1043 if(pkt[4] == Ropenfd){
1044 nfd = GBIT32(pkt+n-4);
1045 if(iosendfd(io, fd, nfd) < 0){
1046 fprint(2, "send fd error: %r\n");
1054 restring(uchar *pkt, int pn, char *s)
1058 if(s < (char*)pkt || s >= (char*)pkt+pn)
1063 PBIT16((uchar*)s-1, n);
1067 rewritehdr(Fcall *f, uchar *pkt)
1072 PBIT16(pkt+5, f->tag);
1076 restring(pkt, n, f->version);
1079 PBIT32(pkt+7, f->afid);
1080 restring(pkt, n, f->uname);
1081 restring(pkt, n, f->aname);
1084 PBIT16(pkt+7, f->oldtag);
1087 restring(pkt, n, f->uname);
1088 restring(pkt, n, f->aname);
1089 PBIT32(pkt+7, f->fid);
1090 PBIT32(pkt+11, f->afid);
1093 PBIT32(pkt+7, f->fid);
1094 PBIT32(pkt+11, f->newfid);
1095 for(i=0; i<f->nwname; i++)
1096 restring(pkt, n, f->wname[i]);
1099 restring(pkt, n, f->name);
1108 PBIT32(pkt+7, f->fid);
1111 restring(pkt, n, f->ename);
1117 /* unix select-based polling */
1125 static struct Ioproc **pio;
1126 static struct pollfd *pfd;
1128 static struct Ioproc *iofree;
1136 pfd = erealloc(pfd, (npfd+1)*sizeof(pfd[0]));
1137 pfd[npfd].events = 0;
1139 iofree = emalloc(sizeof(Ioproc));
1140 iofree->index = npfd;
1141 iofree->c = chancreate(sizeof(ulong), 1);
1142 pio = erealloc(pio, (npfd+1)*sizeof(pio[0]));
1152 closeioproc(Ioproc *io)
1165 for(i=0; i<npfd; i++)
1169 for(i=0; i<npfd; i++)
1171 fprint(2, " %d%c", pfd[i].fd, pfd[i].events==POLLIN ? 'r' : pfd[i].events==POLLOUT ? 'w' : '?');
1174 n = poll(pfd, npfd, -1);
1177 for(i=0; i<npfd; i++)
1178 if(pfd[i].fd != -1 && pfd[i].revents){
1182 nbsendul(pio[i]->c, 1);
1190 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0)|O_NONBLOCK);
1194 xwait(Ioproc *io, int fd, int e)
1196 if(verbose) fprint(2, "wait for %d%c\n", fd, e==POLLIN ? 'r' : 'w');
1197 pfd[io->index].fd = fd;
1198 pfd[io->index].events = e;
1200 if(verbose) fprint(2, "got %d\n", fd);
1204 rwait(Ioproc *io, int fd)
1206 xwait(io, fd, POLLIN);
1210 wwait(Ioproc *io, int fd)
1212 xwait(io, fd, POLLOUT);
1216 ioread(Ioproc *io, int fd, void *v, long n)
1222 while((r=read(fd, v, n)) < 0 && errno == EWOULDBLOCK)
1228 ioreadn(Ioproc *io, int fd, void *v, long n)
1234 for(tot=0; tot<n; tot+=m){
1235 m = ioread(io, fd, u+tot, n-tot);
1246 iorecvfd(Ioproc *io, int fd)
1251 while((r=recvfd(fd)) < 0 && errno == EWOULDBLOCK)
1257 iosendfd(Ioproc *io, int s, int fd)
1262 while((r=sendfd(s, fd)) < 0 && errno == EWOULDBLOCK)
1264 if(r < 0) fprint(2, "sent %d, %d\n", s, fd);
1269 _iowrite(Ioproc *io, int fd, void *v, long n)
1275 while((r=write(fd, v, n)) < 0 && errno == EWOULDBLOCK)
1281 iowrite(Ioproc *io, int fd, void *v, long n)
1287 for(tot=0; tot<n; tot+=m){
1288 m = _iowrite(io, fd, u+tot, n-tot);
1299 iolisten(Ioproc *io, char *dir, char *ndir)
1303 extern int _p9netfd(char*);
1306 if((fd = _p9netfd(dir)) < 0)
1309 while((r=listen(dir, ndir)) < 0 && errno == EWOULDBLOCK)
1315 ioaccept(Ioproc *io, int fd, char *dir)
1321 while((r=accept(fd, dir)) < 0 && errno == EWOULDBLOCK)
1327 /* real plan 9 io procs */
1329 _iolisten(va_list *arg)
1333 a = va_arg(*arg, char*);
1334 b = va_arg(*arg, char*);
1335 return listen(a, b);
1339 iolisten(Ioproc *io, char *a, char *b)
1341 return iocall(io, _iolisten, a, b);
1345 _ioaccept(va_list *arg)
1350 fd = va_arg(*arg, int);
1351 dir = va_arg(*arg, char*);
1352 return accept(fd, dir);
1356 ioaccept(Ioproc *io, int fd, char *dir)
1358 return iocall(io, _ioaccept, fd, dir);