11 MAXMSG = 64, /* per connection */
14 typedef struct Hash Hash;
15 typedef struct Fid Fid;
16 typedef struct Msg Msg;
17 typedef struct Conn Conn;
18 typedef struct Queue Queue;
86 void *gethash(Hash**, uint);
87 int puthash(Hash**, uint, void*);
88 int delhash(Hash**, uint, void*);
89 Msg *mread9p(Ioproc*, int);
90 int mwrite9p(Ioproc*, int, uchar*);
91 uchar *read9ppkt(Ioproc*, int);
92 int write9ppkt(int, uchar*);
101 void *erealloc(void*, int);
103 int sendq(Queue*, void*);
105 void connthread(void*);
106 void connoutthread(void*);
107 void listenthread(void*);
108 void outputthread(void*);
109 void inputthread(void*);
110 void rewritehdr(Fcall*, uchar*);
111 void repack(Fcall*, uchar**);
112 int tlisten(char*, char*);
113 int taccept(int, char*);
114 int iolisten(Ioproc*, char*, char*);
115 int ioaccept(Ioproc*, int, char*);
116 int iorecvfd(Ioproc*, int);
117 int iosendfd(Ioproc*, int, int);
118 void mainproc(void*);
119 int ignorepipe(void*, char*);
121 void dorootstat(void);
126 fprint(2, "usage: 9pserve [-lv] [-A aname afid] [-M msize] address\n");
127 fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
128 threadexitsall("usage");
132 extern int _threaddebuglevel;
134 threadmain(int argc, char **argv)
139 x = getenv("verbose9pserve");
142 fprint(2, "verbose9pserve %s => %d\n", x, verbose);
149 xaname = EARGF(usage());
150 xafid = atoi(EARGF(usage()));
154 msize = atoi(EARGF(usage()));
167 if(attached && !versioned){
168 fprint(2, "-A must be used with -M\n");
176 fmtinstall('T', timefmt);
178 if((afd = announce(addr, adir)) < 0)
179 sysfatal("announce %s: %r", addr);
181 if(strncmp(addr, "unix!", 5) == 0)
183 file = smprint("%s.log", addr);
185 sysfatal("smprint log: %r");
186 if((fd = create(file, OWRITE, 0666)) < 0)
187 sysfatal("create %s: %r", file);
192 if(verbose) fprint(2, "%T 9pserve running\n");
193 proccreate(mainproc, nil, STACK);
203 atnotify(ignorepipe, 1);
204 fmtinstall('D', dirfmt);
205 fmtinstall('M', dirmodefmt);
206 fmtinstall('F', fcallfmt);
207 fmtinstall('H', encodefmt);
214 f.version = "9P2000";
217 n = convS2M(&f, vbuf, sizeof vbuf);
218 if(verbose > 1) fprint(2, "%T * <- %F\n", &f);
219 nn = write(1, vbuf, n);
221 sysfatal("error writing Tversion: %r\n");
222 n = read9pmsg(0, vbuf, sizeof vbuf);
223 if(convM2S(vbuf, n, &f) != n)
224 sysfatal("convM2S failure");
227 if(verbose > 1) fprint(2, "%T * -> %F\n", &f);
230 threadcreate(inputthread, nil, STACK);
231 threadcreate(outputthread, nil, STACK);
236 threadcreate(listenthread, nil, STACK);
241 ignorepipe(void *v, char *s)
244 if(strcmp(s, "sys: write on closed pipe") == 0)
246 if(strcmp(s, "sys: tstp") == 0)
248 fprint(2, "9pserve %s: %T note: %s\n", addr, s);
253 listenthread(void *arg)
260 threadsetname("listen %s", adir);
262 c = emalloc(sizeof(Conn));
263 c->fd = iolisten(io, adir, c->dir);
265 if(verbose) fprint(2, "%T listen: %r\n");
270 c->inc = chancreate(sizeof(void*), 0);
271 c->internal = chancreate(sizeof(void*), 0);
274 if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
275 threadcreate(connthread, c, STACK);
285 m->rpkt = emalloc(n);
286 nn = convS2M(&m->rx, m->rpkt, n);
288 sysfatal("sizeS2M + convS2M disagree");
289 sendq(m->c->outq, m);
298 m->tpkt = emalloc(n);
299 nn = convS2M(&m->tx, m->tpkt, n);
301 sysfatal("sizeS2M + convS2M disagree");
306 err(Msg *m, char *ename)
310 m->rx.tag = m->tx.tag;
319 t = emalloc(strlen(s)+1);
325 connthread(void *arg)
335 threadsetname("conn %s", c->dir);
337 fd = ioaccept(io, c->fd, c->dir);
339 if(verbose) fprint(2, "%T accept %s: %r\n", c->dir);
344 threadcreate(connoutthread, c, STACK);
345 while((m = mread9p(io, c->fd)) != nil){
346 if(verbose > 1) fprint(2, "%T fd#%d -> %F\n", c->fd, &m->tx);
350 if(verbose > 1) fprint(2, "%T fd#%d: new msg %p\n", c->fd, m);
351 if(puthash(c->tag, m->tx.tag, m) < 0){
352 err(m, "duplicate tag");
358 m->rx.tag = m->tx.tag;
359 m->rx.msize = m->tx.msize;
360 if(m->rx.msize > msize)
362 m->rx.version = "9P2000";
363 m->rx.type = Rversion;
367 if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
368 m->rx.tag = m->tx.tag;
377 if(m->tx.afid != NOFID
378 && (m->afid = gethash(c->fid, m->tx.afid)) == nil){
379 err(m, "unknown fid");
384 m->fid = fidnew(m->tx.fid);
385 if(puthash(c->fid, m->tx.fid, m->fid) < 0){
386 err(m, "duplicate fid");
390 if(attached && m->afid==nil){
391 if(m->tx.aname[0] && strcmp(xaname, m->tx.aname) != 0){
392 err(m, "invalid attach name");
396 m->tx.aname = xaname;
397 m->tx.uname = estrdup(m->tx.uname);
398 repack(&m->tx, &m->tpkt);
404 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
405 err(m, "unknown fid");
409 if(m->tx.newfid == m->tx.fid){
413 m->newfid = fidnew(m->tx.newfid);
414 if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
415 err(m, "duplicate fid");
423 err(m, "authentication not required");
426 m->afid = fidnew(m->tx.afid);
427 if(puthash(c->fid, m->tx.afid, m->afid) < 0){
428 err(m, "duplicate fid");
434 if(m->tx.mode&~(OTRUNC|3)){
435 err(m, "bad openfd mode");
450 if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
451 err(m, "unknown fid");
458 /* have everything - translate and send */
463 m->tx.fid = m->fid->fid;
465 m->tx.newfid = m->newfid->fid;
467 m->tx.afid = m->afid->fid;
469 m->tx.oldtag = m->oldm->tag;
470 /* reference passes to outq */
472 while(c->nmsg >= MAXMSG){
478 if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
480 /* flush the output queue */
482 while(c->outq != nil)
485 /* flush all outstanding messages */
486 for(i=0; i<NHASH; i++){
487 for(h=c->tag[i]; h; h=hnext){
495 m->tx.oldtag = om->tag;
498 msgincref(m); /* for outq */
500 mm = recvp(c->internal);
502 msgput(m); /* got from recvp */
503 msgput(m); /* got from msgnew */
504 msgput(om); /* got from hash table */
510 /* clunk all outstanding fids */
511 for(i=0; i<NHASH; i++){
512 for(h=c->fid[i]; h; h=hnext){
525 mm = recvp(c->internal);
528 msgput(m); /* got from recvp */
529 msgput(m); /* got from msgnew */
530 fidput(f); /* got from hash table */
538 assert(c->nmsg == 0);
539 assert(c->nfid == 0);
541 chanfree(c->internal);
551 openfdthread(void *v)
564 threadsetname("openfd %s", c->fdfid);
567 if(c->fdmode == OREAD){
569 if(verbose) fprint(2, "%T tread...");
574 m->tx.count = msize - IOHDRSZ;
575 m->tx.fid = fid->fid;
583 if(m->rx.type == Rerror){
584 // fprint(2, "%T read error: %s\n", m->rx.ename);
590 if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
591 // fprint(2, "%T pipe write error: %r\n");
600 if(verbose) fprint(2, "%T twrite...");
604 if((n=ioread(io, c->fd, buf, n)) <= 0){
606 fprint(2, "%T pipe read error: %r\n");
613 m->tx.fid = fid->fid;
623 if(m->rx.type == Rerror){
624 // fprint(2, "%T write error: %s\n", m->rx.ename);
632 if(verbose) fprint(2, "%T eof on %d fid %d\n", c->fd, fid->fid);
639 if(verbose) fprint(2, "%T eof on %d fid %d ref %d\n", c->fd, fid->fid, fid->ref);
640 if(--fid->openfd == 0){
646 m->tx.fid = fid->fid;
657 chanfree(c->internal);
670 rerrstr(errs, sizeof errs);
672 /* XXX return here? */
674 if(verbose) fprint(2, "%T xopen pipe %d %d...", p[0], p[1]);
676 /* now we're committed. */
678 /* a new connection for this fid */
679 nc = emalloc(sizeof(Conn));
680 nc->internal = chancreate(sizeof(void*), 0);
686 nc->fdmode = m->tx.mode;
689 /* a thread to tend the pipe */
690 threadcreate(openfdthread, nc, STACK);
692 /* if mode is ORDWR, that openfdthread will write; start a reader */
693 if((m->tx.mode&3) == ORDWR){
694 nc = emalloc(sizeof(Conn));
695 nc->internal = chancreate(sizeof(void*), 0);
700 nc->fd = dup(p[0], -1);
701 threadcreate(openfdthread, nc, STACK);
704 /* steal fid from other connection */
705 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
708 /* rewrite as Ropenfd */
709 m->rx.type = Ropenfd;
711 m->rpkt = erealloc(m->rpkt, n+4);
712 PBIT32(m->rpkt+n, p[1]);
715 m->rpkt[4] = Ropenfd;
721 connoutthread(void *arg)
732 threadsetname("connout %s", c->dir);
733 while((m = recvq(outq)) != nil){
734 err = m->tx.type+1 != m->rx.type;
735 if(!err && m->isopenfd)
742 if(delhash(om->c->tag, om->ctag, om) == 0)
748 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
753 if(verbose) fprint(2, "%T auth error\n");
754 if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
760 if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
764 if(err || m->rx.nwqid < m->tx.nwname)
765 if(m->tx.fid != m->tx.newfid && m->newfid)
766 if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
770 if(delhash(m->c->tag, m->ctag, m) == 0)
772 if(verbose > 1) fprint(2, "%T fd#%d <- %F\n", c->fd, &m->rx);
773 rewritehdr(&m->rx, m->rpkt);
774 if(mwrite9p(io, c->fd, m->rpkt) < 0)
775 if(verbose) fprint(2, "%T write error: %r\n");
777 if(c->inputstalled && c->nmsg < MAXMSG)
786 outputthread(void *arg)
793 threadsetname("output");
794 while((m = recvq(outq)) != nil){
795 if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
796 rewritehdr(&m->tx, m->tpkt);
797 if(mwrite9p(io, 1, m->tpkt) < 0)
798 sysfatal("output error: %r");
802 fprint(2, "%T output eof\n");
807 inputthread(void *arg)
814 threadsetname("input");
815 if(verbose) fprint(2, "%T input thread\n");
818 while((pkt = read9ppkt(io, 0)) != nil){
821 fprint(2, "%T short 9P packet from server\n");
825 if(verbose > 2) fprint(2, "%T read %.*H\n", n, pkt);
827 if((m = msgget(tag)) == nil){
828 fprint(2, "%T unexpected 9P response tag %d\n", tag);
832 if((nn = convM2S(pkt, n, &m->rx)) != n){
833 fprint(2, "%T bad packet - convM2S %d but %d\n", nn, n);
838 if(verbose > 1) fprint(2, "%T * -> %F%s\n", &m->rx,
839 m->internal ? " (internal)" : "");
843 sendp(m->c->internal, m);
845 sendq(m->c->outq, m);
850 //fprint(2, "%T input eof\n");
855 gethash(Hash **ht, uint n)
859 for(h=ht[n%NHASH]; h; h=h->next)
866 delhash(Hash **ht, uint n, void *v)
870 for(l=&ht[n%NHASH]; h=*l; l=&h->next)
873 if(verbose) fprint(2, "%T delhash %d got %p want %p\n", n, h->v, v);
884 puthash(Hash **ht, uint n, void *v)
890 h = emalloc(sizeof(Hash));
891 h->next = ht[n%NHASH];
908 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
909 if(nfidtab == xafid){
910 fidtab[nfidtab++] = nil;
911 fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
913 fidtab[nfidtab] = emalloc(sizeof(Fid));
914 freefid = fidtab[nfidtab];
915 freefid->fid = nfidtab++;
944 if(verbose > 1) fprint(2, "%T msgincref @0x%lux %p tag %d/%d ref %d=>%d\n",
945 getcallerpc(&m), m, m->tag, m->ctag, m->ref, m->ref+1);
955 msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
956 msgtab[nmsgtab] = emalloc(sizeof(Msg));
957 freemsg = msgtab[nmsgtab];
958 freemsg->tag = nmsgtab++;
963 if(verbose > 1) fprint(2, "%T msgnew @0x%lux %p tag %d ref %d\n",
964 getcallerpc(&x), m, m->tag, m->ref);
969 * Clear data associated with connections, so that
970 * if all msgs have been msgcleared, the connection
971 * can be freed. Note that this does *not* free the tpkt
972 * and rpkt; they are freed in msgput with the msg itself.
973 * The io write thread might still be holding a ref to msg
974 * even once the connection has finished with it.
999 if(m->rx.type == Ropenfd && m->rx.unixfd >= 0){
1000 close(m->rx.unixfd);
1011 if(verbose > 1) fprint(2, "%T msgput 0x%lux %p tag %d/%d ref %d\n",
1012 getcallerpc(&m), m, m->tag, m->ctag, m->ref);
1036 if(n < 0 || n >= nmsgtab)
1041 if(verbose) fprint(2, "%T msgget %d = %p\n", n, m);
1055 sysfatal("out of memory allocating %d", n);
1061 erealloc(void *v, int n)
1066 sysfatal("out of memory reallocating %d", n);
1071 typedef struct Qel Qel;
1092 q = mallocz(sizeof(Queue), 1);
1100 sendq(Queue *q, void *p)
1104 e = emalloc(sizeof(Qel));
1107 werrstr("hungup queue");
1130 while(q->head == nil && !q->hungup)
1145 read9ppkt(Ioproc *io, int fd)
1150 n = ioreadn(io, fd, buf, 4);
1156 nn = ioreadn(io, fd, pkt+4, n-4);
1161 /* would do this if we ever got one of these, but we only generate them
1162 if(pkt[4] == Ropenfd){
1163 newfd = iorecvfd(io, fd);
1164 PBIT32(pkt+n-4, newfd);
1171 mread9p(Ioproc *io, int fd)
1177 if((pkt = read9ppkt(io, fd)) == nil)
1183 nn = convM2S(pkt, n, &m->tx);
1185 fprint(2, "%T read bad packet from %d\n", fd);
1192 mwrite9p(Ioproc *io, int fd, uchar *pkt)
1197 if(verbose > 2) fprint(2, "%T write %d %d %.*H\n", fd, n, n, pkt);
1198 if(verbose > 1) fprint(2, "%T before iowrite\n");
1199 if(iowrite(io, fd, pkt, n) != n){
1200 fprint(2, "%T write error: %r\n");
1203 if(verbose > 1) fprint(2, "%T after iowrite\n");
1204 if(pkt[4] == Ropenfd){
1205 nfd = GBIT32(pkt+n-4);
1206 if(iosendfd(io, fd, nfd) < 0){
1207 fprint(2, "%T send fd error: %r\n");
1215 restring(uchar *pkt, int pn, char *s)
1219 if(s < (char*)pkt || s >= (char*)pkt+pn)
1224 PBIT16((uchar*)s-1, n);
1228 repack(Fcall *f, uchar **ppkt)
1241 convS2M(f, pkt, nn);
1245 rewritehdr(Fcall *f, uchar *pkt)
1250 PBIT16(pkt+5, f->tag);
1254 restring(pkt, n, f->version);
1257 PBIT32(pkt+7, f->afid);
1258 restring(pkt, n, f->uname);
1259 restring(pkt, n, f->aname);
1262 PBIT16(pkt+7, f->oldtag);
1265 restring(pkt, n, f->uname);
1266 restring(pkt, n, f->aname);
1267 PBIT32(pkt+7, f->fid);
1268 PBIT32(pkt+11, f->afid);
1271 PBIT32(pkt+7, f->fid);
1272 PBIT32(pkt+11, f->newfid);
1273 for(i=0; i<f->nwname; i++)
1274 restring(pkt, n, f->wname[i]);
1277 restring(pkt, n, f->name);
1286 PBIT32(pkt+7, f->fid);
1289 restring(pkt, n, f->ename);
1295 _iolisten(va_list *arg)
1299 a = va_arg(*arg, char*);
1300 b = va_arg(*arg, char*);
1301 return listen(a, b);
1305 iolisten(Ioproc *io, char *a, char *b)
1307 return iocall(io, _iolisten, a, b);
1311 _ioaccept(va_list *arg)
1316 fd = va_arg(*arg, int);
1317 dir = va_arg(*arg, char*);
1318 return accept(fd, dir);
1322 ioaccept(Ioproc *io, int fd, char *dir)
1324 return iocall(io, _ioaccept, fd, dir);
1330 static char *mon[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1331 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1335 tm = *localtime(time(0));
1336 return fmtprint(fmt, "%s %2d %02d:%02d:%02d.%03d",
1337 mon[tm.mon], tm.mday, tm.hour, tm.min, tm.sec,
1338 (int)(ns%1000000000)/1000000);