11 MAXMSG = 64, /* per connection */
14 typedef struct Hash Hash;
15 typedef struct Fid Fid;
16 typedef struct Msg Msg;
17 typedef struct Conn Conn;
18 typedef struct Queue Queue;
82 void *gethash(Hash**, uint);
83 int puthash(Hash**, uint, void*);
84 int delhash(Hash**, uint, void*);
85 Msg *mread9p(Ioproc*, int);
86 int mwrite9p(Ioproc*, int, uchar*);
87 uchar *read9ppkt(Ioproc*, int);
88 int write9ppkt(int, uchar*);
97 void *erealloc(void*, int);
99 int sendq(Queue*, void*);
101 void connthread(void*);
102 void connoutthread(void*);
103 void listenthread(void*);
104 void outputthread(void*);
105 void inputthread(void*);
106 void rewritehdr(Fcall*, uchar*);
107 int tlisten(char*, char*);
108 int taccept(int, char*);
109 int iolisten(Ioproc*, char*, char*);
110 int ioaccept(Ioproc*, int, char*);
111 int iorecvfd(Ioproc*, int);
112 int iosendfd(Ioproc*, int, int);
113 void mainproc(void*);
114 int ignorepipe(void*, char*);
120 fprint(2, "usage: 9pserve [-lv] address\n");
121 fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
126 extern int _threaddebuglevel;
128 threadmain(int argc, char **argv)
151 fmtinstall('T', timefmt);
153 if((afd = announce(addr, adir)) < 0)
154 sysfatal("announce %s: %r", addr);
156 if(strncmp(addr, "unix!", 5) == 0)
158 file = smprint("%s.log", addr);
160 sysfatal("smprint log: %r");
161 if((fd = create(file, OWRITE, 0666)) < 0)
162 sysfatal("create %s: %r", file);
167 if(verbose) fprint(2, "%T 9pserve running\n");
168 proccreate(mainproc, nil, STACK);
178 atnotify(ignorepipe, 1);
179 fmtinstall('D', dirfmt);
180 fmtinstall('M', dirmodefmt);
181 fmtinstall('F', fcallfmt);
182 fmtinstall('H', encodefmt);
188 f.version = "9P2000";
191 n = convS2M(&f, vbuf, sizeof vbuf);
192 if(verbose > 1) fprint(2, "%T * <- %F\n", &f);
193 nn = write(1, vbuf, n);
195 sysfatal("error writing Tversion: %r\n");
196 n = read9pmsg(0, vbuf, sizeof vbuf);
197 if(convM2S(vbuf, n, &f) != n)
198 sysfatal("convM2S failure");
201 if(verbose > 1) fprint(2, "%T * -> %F\n", &f);
203 threadcreate(inputthread, nil, STACK);
204 threadcreate(outputthread, nil, STACK);
205 threadcreate(listenthread, nil, STACK);
210 ignorepipe(void *v, char *s)
213 if(strcmp(s, "sys: write on closed pipe") == 0)
215 fprint(2, "%T msg: %s\n", s);
220 listenthread(void *arg)
227 threadsetname("listen %s", adir);
229 c = emalloc(sizeof(Conn));
230 c->fd = iolisten(io, adir, c->dir);
232 if(verbose) fprint(2, "%T listen: %r\n");
237 c->inc = chancreate(sizeof(void*), 0);
238 c->internal = chancreate(sizeof(void*), 0);
241 if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
242 threadcreate(connthread, c, STACK);
252 m->rpkt = emalloc(n);
253 nn = convS2M(&m->rx, m->rpkt, n);
255 sysfatal("sizeS2M + convS2M disagree");
256 sendq(m->c->outq, m);
265 m->tpkt = emalloc(n);
266 nn = convS2M(&m->tx, m->tpkt, n);
268 sysfatal("sizeS2M + convS2M disagree");
273 err(Msg *m, char *ename)
277 m->rx.tag = m->tx.tag;
282 connthread(void *arg)
292 threadsetname("conn %s", c->dir);
294 fd = ioaccept(io, c->fd, c->dir);
296 if(verbose) fprint(2, "%T accept %s: %r\n", c->dir);
301 threadcreate(connoutthread, c, STACK);
302 while((m = mread9p(io, c->fd)) != nil){
303 if(verbose > 1) fprint(2, "%T fd#%d -> %F\n", c->fd, &m->tx);
307 if(verbose > 1) fprint(2, "%T fd#%d: new msg %p\n", c->fd, m);
308 if(puthash(c->tag, m->tx.tag, m) < 0){
309 err(m, "duplicate tag");
315 m->rx.tag = m->tx.tag;
316 m->rx.msize = m->tx.msize;
317 if(m->rx.msize > msize)
319 m->rx.version = "9P2000";
320 m->rx.type = Rversion;
324 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
325 m->rx.tag = m->tx.tag;
334 if(m->tx.afid != NOFID
335 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){
336 err(m, "unknown fid");
339 m->fid = fidnew(m->tx.fid);
340 if(puthash(c->fid, m->tx.fid, m->fid) < 0){
341 err(m, "duplicate fid");
347 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
348 err(m, "unknown fid");
352 if(m->tx.newfid == m->tx.fid){
356 m->newfid = fidnew(m->tx.newfid);
357 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
358 err(m, "duplicate fid");
365 m->afid = fidnew(m->tx.afid);
366 if(puthash(c->fid, m->tx.afid, m->afid) < 0){
367 err(m, "duplicate fid");
373 if(m->tx.mode&~(OTRUNC|3)){
374 err(m, "bad openfd mode");
389 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
390 err(m, "unknown fid");
397 /* have everything - translate and send */
402 m->tx.fid = m->fid->fid;
404 m->tx.newfid = m->newfid->fid;
406 m->tx.afid = m->afid->fid;
408 m->tx.oldtag = m->oldm->tag;
409 /* reference passes to outq */
411 while(c->nmsg >= MAXMSG){
417 if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
419 /* flush the output queue */
421 while(c->outq != nil)
424 /* flush all outstanding messages */
425 for(i=0; i<NHASH; i++){
426 for(h=c->tag[i]; h; h=hnext){
434 m->tx.oldtag = om->tag;
437 msgincref(m); /* for outq */
439 mm = recvp(c->internal);
441 msgput(m); /* got from recvp */
442 msgput(m); /* got from msgnew */
443 msgput(om); /* got from hash table */
449 /* clunk all outstanding fids */
450 for(i=0; i<NHASH; i++){
451 for(h=c->fid[i]; h; h=hnext){
464 mm = recvp(c->internal);
467 msgput(m); /* got from recvp */
468 msgput(m); /* got from msgnew */
469 fidput(f); /* got from hash table */
477 assert(c->nmsg == 0);
478 assert(c->nfid == 0);
480 chanfree(c->internal);
490 openfdthread(void *v)
503 threadsetname("openfd %s", c->fdfid);
506 if(c->fdmode == OREAD){
508 if(verbose) fprint(2, "%T tread...");
513 m->tx.count = msize - IOHDRSZ;
514 m->tx.fid = fid->fid;
522 if(m->rx.type == Rerror){
523 // fprint(2, "%T read error: %s\n", m->rx.ename);
529 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
530 // fprint(2, "%T pipe write error: %r\n");
539 if(verbose) fprint(2, "%T twrite...");
543 if((n=ioread(io, c->fd, buf, n)) <= 0){
545 fprint(2, "%T pipe read error: %r\n");
552 m->tx.fid = fid->fid;
562 if(m->rx.type == Rerror){
563 // fprint(2, "%T write error: %s\n", m->rx.ename);
571 if(verbose) fprint(2, "%T eof on %d fid %d\n", c->fd, fid->fid);
578 if(verbose) fprint(2, "%T eof on %d fid %d ref %d\n", c->fd, fid->fid, fid->ref);
579 if(--fid->openfd == 0){
585 m->tx.fid = fid->fid;
596 chanfree(c->internal);
609 rerrstr(errs, sizeof errs);
611 /* XXX return here? */
613 if(verbose) fprint(2, "%T xopen pipe %d %d...", p[0], p[1]);
615 /* now we're committed. */
617 /* a new connection for this fid */
618 nc = emalloc(sizeof(Conn));
619 nc->internal = chancreate(sizeof(void*), 0);
625 nc->fdmode = m->tx.mode;
628 /* a thread to tend the pipe */
629 threadcreate(openfdthread, nc, STACK);
631 /* if mode is ORDWR, that openfdthread will write; start a reader */
632 if((m->tx.mode&3) == ORDWR){
633 nc = emalloc(sizeof(Conn));
634 nc->internal = chancreate(sizeof(void*), 0);
639 nc->fd = dup(p[0], -1);
640 threadcreate(openfdthread, nc, STACK);
643 /* steal fid from other connection */
644 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
647 /* rewrite as Ropenfd */
648 m->rx.type = Ropenfd;
650 m->rpkt = erealloc(m->rpkt, n+4);
651 PBIT32(m->rpkt+n, p[1]);
654 m->rpkt[4] = Ropenfd;
660 connoutthread(void *arg)
671 threadsetname("connout %s", c->dir);
672 while((m = recvq(outq)) != nil){
673 err = m->tx.type+1 != m->rx.type;
674 if(!err && m->isopenfd)
681 if(delhash(om->c->tag, om->ctag, om) == 0)
687 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
692 fprint(2, "%T auth error\n");
693 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
699 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
703 if(err && m->tx.fid != m->tx.newfid && m->newfid)
704 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
708 if(delhash(m->c->tag, m->ctag, m) == 0)
710 if(verbose > 1) fprint(2, "%T fd#%d <- %F\n", c->fd, &m->rx);
711 rewritehdr(&m->rx, m->rpkt);
712 if(mwrite9p(io, c->fd, m->rpkt) < 0)
713 if(verbose) fprint(2, "%T write error: %r\n");
715 if(c->inputstalled && c->nmsg < MAXMSG)
724 outputthread(void *arg)
731 threadsetname("output");
732 while((m = recvq(outq)) != nil){
733 if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
734 rewritehdr(&m->tx, m->tpkt);
735 if(mwrite9p(io, 1, m->tpkt) < 0)
736 sysfatal("output error: %r");
740 fprint(2, "%T output eof\n");
745 inputthread(void *arg)
752 threadsetname("input");
753 if(verbose) fprint(2, "%T input thread\n");
756 while((pkt = read9ppkt(io, 0)) != nil){
759 fprint(2, "%T short 9P packet from server\n");
763 if(verbose > 2) fprint(2, "%T read %.*H\n", n, pkt);
765 if((m = msgget(tag)) == nil){
766 fprint(2, "%T unexpected 9P response tag %d\n", tag);
770 if((nn = convM2S(pkt, n, &m->rx)) != n){
771 fprint(2, "%T bad packet - convM2S %d but %d\n", nn, n);
776 if(verbose > 1) fprint(2, "%T * -> %F%s\n", &m->rx,
777 m->internal ? " (internal)" : "");
781 sendp(m->c->internal, m);
783 sendq(m->c->outq, m);
788 //fprint(2, "%T input eof\n");
793 gethash(Hash **ht, uint n)
797 for(h=ht[n%NHASH]; h; h=h->next)
804 delhash(Hash **ht, uint n, void *v)
808 for(l=&ht[n%NHASH]; h=*l; l=&h->next)
811 if(verbose) fprint(2, "%T delhash %d got %p want %p\n", n, h->v, v);
822 puthash(Hash **ht, uint n, void *v)
828 h = emalloc(sizeof(Hash));
829 h->next = ht[n%NHASH];
846 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
847 fidtab[nfidtab] = emalloc(sizeof(Fid));
848 freefid = fidtab[nfidtab];
849 freefid->fid = nfidtab++;
878 if(verbose > 1) fprint(2, "%T msgincref @0x%lux %p tag %d/%d ref %d=>%d\n",
879 getcallerpc(&m), m, m->tag, m->ctag, m->ref, m->ref+1);
889 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
890 msgtab[nmsgtab] = emalloc(sizeof(Msg));
891 freemsg = msgtab[nmsgtab];
892 freemsg->tag = nmsgtab++;
897 if(verbose > 1) fprint(2, "%T msgnew @0x%lux %p tag %d ref %d\n",
898 getcallerpc(&x), m, m->tag, m->ref);
903 * Clear data associated with connections, so that
904 * if all msgs have been msgcleared, the connection
905 * can be freed. Note that this does *not* free the tpkt
906 * and rpkt; they are freed in msgput with the msg itself.
907 * The io write thread might still be holding a ref to msg
908 * even once the connection has finished with it.
933 if(m->rx.type == Ropenfd && m->rx.unixfd >= 0){
945 if(verbose > 1) fprint(2, "%T msgput 0x%lux %p tag %d/%d ref %d\n",
946 getcallerpc(&m), m, m->tag, m->ctag, m->ref);
970 if(n < 0 || n >= nmsgtab)
975 if(verbose) fprint(2, "%T msgget %d = %p\n", n, m);
989 sysfatal("out of memory allocating %d", n);
995 erealloc(void *v, int n)
1000 sysfatal("out of memory reallocating %d", n);
1005 typedef struct Qel Qel;
1026 q = mallocz(sizeof(Queue), 1);
1034 sendq(Queue *q, void *p)
1038 e = emalloc(sizeof(Qel));
1041 werrstr("hungup queue");
1064 while(q->head == nil && !q->hungup)
1079 read9ppkt(Ioproc *io, int fd)
1084 n = ioreadn(io, fd, buf, 4);
1090 nn = ioreadn(io, fd, pkt+4, n-4);
1095 /* would do this if we ever got one of these, but we only generate them
1096 if(pkt[4] == Ropenfd){
1097 newfd = iorecvfd(io, fd);
1098 PBIT32(pkt+n-4, newfd);
1105 mread9p(Ioproc *io, int fd)
1111 if((pkt = read9ppkt(io, fd)) == nil)
1117 nn = convM2S(pkt, n, &m->tx);
1119 fprint(2, "%T read bad packet from %d\n", fd);
1126 mwrite9p(Ioproc *io, int fd, uchar *pkt)
1131 if(verbose > 2) fprint(2, "%T write %d %d %.*H\n", fd, n, n, pkt);
1132 if(verbose > 1) fprint(2, "%T before iowrite\n");
1133 if(iowrite(io, fd, pkt, n) != n){
1134 fprint(2, "%T write error: %r\n");
1137 if(verbose > 1) fprint(2, "%T after iowrite\n");
1138 if(pkt[4] == Ropenfd){
1139 nfd = GBIT32(pkt+n-4);
1140 if(iosendfd(io, fd, nfd) < 0){
1141 fprint(2, "%T send fd error: %r\n");
1149 restring(uchar *pkt, int pn, char *s)
1153 if(s < (char*)pkt || s >= (char*)pkt+pn)
1158 PBIT16((uchar*)s-1, n);
1162 rewritehdr(Fcall *f, uchar *pkt)
1167 PBIT16(pkt+5, f->tag);
1171 restring(pkt, n, f->version);
1174 PBIT32(pkt+7, f->afid);
1175 restring(pkt, n, f->uname);
1176 restring(pkt, n, f->aname);
1179 PBIT16(pkt+7, f->oldtag);
1182 restring(pkt, n, f->uname);
1183 restring(pkt, n, f->aname);
1184 PBIT32(pkt+7, f->fid);
1185 PBIT32(pkt+11, f->afid);
1188 PBIT32(pkt+7, f->fid);
1189 PBIT32(pkt+11, f->newfid);
1190 for(i=0; i<f->nwname; i++)
1191 restring(pkt, n, f->wname[i]);
1194 restring(pkt, n, f->name);
1203 PBIT32(pkt+7, f->fid);
1206 restring(pkt, n, f->ename);
1212 _iolisten(va_list *arg)
1216 a = va_arg(*arg, char*);
1217 b = va_arg(*arg, char*);
1218 return listen(a, b);
1222 iolisten(Ioproc *io, char *a, char *b)
1224 return iocall(io, _iolisten, a, b);
1228 _ioaccept(va_list *arg)
1233 fd = va_arg(*arg, int);
1234 dir = va_arg(*arg, char*);
1235 return accept(fd, dir);
1239 ioaccept(Ioproc *io, int fd, char *dir)
1241 return iocall(io, _ioaccept, fd, dir);
1247 static char *mon[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1248 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1252 tm = *localtime(time(0));
1253 return fmtprint(fmt, "%s %2d %02d:%02d:%02d.%03d",
1254 mon[tm.mon], tm.mday, tm.hour, tm.min, tm.sec,
1255 (int)(ns%1000000000)/1000000);