commit 05b7f431f01dad68d31b4681a5583a0c3de2921a from: rsc date: Tue Mar 02 19:21:48 2004 UTC Long-standing stability bugs fixed in 9pserve. Update win to use acme interface directly instead of via pipes. Add comment to pipe about lack of message boundaries. commit - c4991217e1c34d6bc14c7a5d2371a74342581539 commit + 05b7f431f01dad68d31b4681a5583a0c3de2921a blob - 3d72437f12e559065cd257247596586712dbf33d blob + 44965a135fc48c2a30b47e9af0e9edeb536950fe --- src/cmd/9pserve.c +++ src/cmd/9pserve.c @@ -74,7 +74,7 @@ char adir[40]; int isunix; Queue *outq; Queue *inq; -int verbose; +int verbose = 0; int msize = 8192; void *gethash(Hash**, uint); @@ -276,8 +276,8 @@ connthread(void *arg) { int i, fd; Conn *c; - Hash *h; - Msg *m, *om; + Hash *h, *hnext; + Msg *m, *om, *mm; Fid *f; Ioproc *io; @@ -405,11 +405,16 @@ connthread(void *arg) } } - if(verbose) fprint(2, "%s eof\n", c->dir); + if(verbose) fprint(2, "fd#%d eof; flushing conn\n", c->fd); + + /* flush the output queue */ + sendq(c->outq, nil); + while(c->outq != nil) + yield(); /* flush all outstanding messages */ for(i=0; itag[i]; h; h=h->next){ + for(h=c->tag[i]; h; h=hnext){ om = h->v; m = msgnew(); m->internal = 1; @@ -419,19 +424,22 @@ connthread(void *arg) m->tx.tag = m->tag; m->tx.oldtag = om->tag; m->oldm = om; - om->ref++; + om->ref++; /* for m->oldm */ m->ref++; /* for outq */ sendomsg(m); - recvp(c->internal); + mm = recvp(c->internal); + assert(mm == m); msgput(m); /* got from recvp */ msgput(m); /* got from msgnew */ msgput(om); /* got from hash table */ + hnext = h->next; + free(h); } } /* clunk all outstanding fids */ for(i=0; ifid[i]; h; h=h->next){ + for(h=c->fid[i]; h; h=hnext){ f = h->v; m = msgnew(); m->internal = 1; @@ -444,10 +452,13 @@ connthread(void *arg) f->ref++; m->ref++; sendomsg(m); - recvp(c->internal); + mm = recvp(c->internal); + assert(mm == m); msgput(m); /* got from recvp */ msgput(m); /* got from msgnew */ fidput(f); /* got from hash table */ + hnext = h->next; + free(h); } } @@ -461,8 +472,6 @@ out: c->inc = 0; free(c->inq); c->inq = 0; - free(c->outq); - c->outq = 0; free(c); } @@ -482,6 +491,7 @@ openfdthread(void *v) io = ioproc(); tot = 0; + m = nil; if(c->fdmode == OREAD){ for(;;){ if(verbose) fprint(2, "tread..."); @@ -506,11 +516,12 @@ openfdthread(void *v) break; tot += m->rx.count; if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){ - fprint(2, "pipe write error: %r\n"); + // fprint(2, "pipe write error: %r\n"); break; } msgput(m); msgput(m); + m = nil; } }else{ for(;;){ @@ -521,7 +532,6 @@ openfdthread(void *v) if((n=ioread(io, c->fd, buf, n)) <= 0){ if(n < 0) fprint(2, "pipe read error: %r\n"); - m = nil; break; } m = msgnew(); @@ -540,11 +550,11 @@ openfdthread(void *v) recvp(c->internal); if(m->rx.type == Rerror){ // fprint(2, "write error: %s\n", m->rx.ename); - continue; } - tot = n; + tot += n; msgput(m); msgput(m); + m = nil; } } if(verbose) fprint(2, "eof on %d fid %d\n", c->fd, fid->fid); @@ -559,6 +569,7 @@ openfdthread(void *v) m->internal = 1; m->c = c; m->tx.type = Tclunk; + m->tx.tag = m->tag; m->tx.fid = fid->fid; m->fid = fid; fid->ref++; @@ -635,12 +646,14 @@ connoutthread(void *arg) { int err; Conn *c; + Queue *outq; Msg *m, *om; Ioproc *io; c = arg; + outq = c->outq; io = ioproc(); - while((m = recvq(c->outq)) != nil){ + while((m = recvq(outq)) != nil){ err = m->tx.type+1 != m->rx.type; if(!err && m->isopenfd) if(xopenfd(m) < 0) @@ -687,6 +700,8 @@ connoutthread(void *arg) nbsendp(c->inc, 0); } closeioproc(io); + free(outq); + c->outq = nil; } void @@ -740,13 +755,16 @@ inputthread(void *arg) msgput(m); continue; } - if(verbose > 1) fprint(2, "* -> %F\n", &m->rx); + if(verbose > 1) fprint(2, "* -> %F%s\n", &m->rx, + m->internal ? " (internal)" : ""); m->rpkt = pkt; m->rx.tag = m->ctag; if(m->internal) - sendp(m->c->internal, 0); - else + sendp(m->c->internal, m); + else if(m->c->outq) sendq(m->c->outq, m); + else + msgput(m); } closeioproc(io); //fprint(2, "input eof\n"); @@ -856,12 +874,17 @@ msgnew(void) void msgput(Msg *m) { + if(m == nil) + return; + if(verbose > 2) fprint(2, "msgput tag %d/%d ref %d\n", m->tag, m->ctag, m->ref); assert(m->ref > 0); if(--m->ref > 0) return; m->c->nmsg--; m->c = nil; + msgput(m->oldm); + m->oldm = nil; fidput(m->fid); m->fid = nil; fidput(m->afid); blob - ef8b7b619faef8eae2326b75f8a374d2a381fdc1 blob + 1f6f53504e3c2dc7143e6b3e7e09b19dd7eeaa80 --- src/cmd/win.c +++ src/cmd/win.c @@ -40,33 +40,49 @@ struct Q Q q; -int eventfd; -int addrfd; -int datafd; -int ctlfd; -int bodyfd; +Fid *eventfd; +Fid *addrfd; +Fid *datafd; +Fid *ctlfd; +// int bodyfd; char *typing; int ntypeb; int ntyper; int ntypebreak; int debug; +char *name; char **prog; int p[2]; Channel *cpid; +Channel *cwait; int pid = -1; +int label(char*, int); void error(char*); void stdinproc(void*); void stdoutproc(void*); -void type(Event*, int, int, int); -void sende(Event*, int, int, int, int, int); +void type(Event*, int, Fid*, Fid*); +void sende(Event*, int, Fid*, Fid*, Fid*, int); char *onestring(int, char**); int delete(Event*); void deltype(uint, uint); void runproc(void*); +int +fsfidprint(Fid *fid, char *fmt, ...) +{ + char buf[256]; + va_list arg; + int n; + + va_start(arg, fmt); + n = vsnprint(buf, sizeof buf, fmt, arg); + va_end(arg); + return fswrite(fid, buf, n); +} + void usage(void) { @@ -84,12 +100,18 @@ nopipes(void *v, char *msg) } void +waitthread(void *v) +{ + recvp(cwait); + threadexitsall(nil); +} + +void threadmain(int argc, char **argv) { int fd, id; char buf[256]; char buf1[128]; - char *name; Fsys *fs; ARGBEGIN{ @@ -110,8 +132,8 @@ threadmain(int argc, char **argv) threadnotify(nopipes, 1); if((fs = nsmount("acme", "")) < 0) sysfatal("nsmount acme: %r"); - ctlfd = fsopenfd(fs, "new/ctl", ORDWR|OCEXEC); - if(ctlfd < 0 || read(ctlfd, buf, 12) != 12) + ctlfd = fsopen(fs, "new/ctl", ORDWR|OCEXEC); + if(ctlfd < 0 || fsread(ctlfd, buf, 12) != 12) sysfatal("ctl: %r"); id = atoi(buf); sprint(buf, "%d/tag", id); @@ -119,21 +141,27 @@ threadmain(int argc, char **argv) write(fd, " Send Delete", 12); close(fd); sprint(buf, "%d/event", id); - eventfd = fsopenfd(fs, buf, ORDWR|OCEXEC); + eventfd = fsopen(fs, buf, ORDWR|OCEXEC); sprint(buf, "%d/addr", id); - addrfd = fsopenfd(fs, buf, ORDWR|OCEXEC); + addrfd = fsopen(fs, buf, ORDWR|OCEXEC); sprint(buf, "%d/data", id); - datafd = fsopenfd(fs, buf, ORDWR|OCEXEC); + datafd = fsopen(fs, buf, ORDWR|OCEXEC); sprint(buf, "%d/body", id); - bodyfd = fsopenfd(fs, buf, ORDWR|OCEXEC); +/* bodyfd = fsopenfd(fs, buf, ORDWR|OCEXEC); */ + if(eventfd==nil || addrfd==nil || datafd==nil) + sysfatal("data files: %r"); +/* if(eventfd<0 || addrfd<0 || datafd<0 || bodyfd<0) sysfatal("data files: %r"); +*/ fsunmount(fs); if(pipe(p) < 0) sysfatal("pipe: %r"); cpid = chancreate(sizeof(ulong), 1); + cwait = threadwaitchan(); + threadcreate(waitthread, nil, STACK); threadcreate(runproc, nil, STACK); pid = recvul(cpid); if(pid == -1) @@ -141,13 +169,13 @@ threadmain(int argc, char **argv) getwd(buf1, sizeof buf1); sprint(buf, "name %s/-%s\n0\n", buf1, name); - write(ctlfd, buf, strlen(buf)); + fswrite(ctlfd, buf, strlen(buf)); sprint(buf, "dumpdir %s/\n", buf1); - write(ctlfd, buf, strlen(buf)); + fswrite(ctlfd, buf, strlen(buf)); sprint(buf, "dump %s\n", onestring(argc, argv)); - write(ctlfd, buf, strlen(buf)); + fswrite(ctlfd, buf, strlen(buf)); -// proccreate(stdoutproc, nil, STACK); + threadcreate(stdoutproc, nil, STACK); stdinproc(nil); } @@ -161,10 +189,10 @@ runproc(void *v) USED(v); fd[0] = p[1]; - fd[1] = bodyfd; - fd[2] = bodyfd; -// fd[1] = p[1]; -// fd[2] = p[1]; +// fd[1] = bodyfd; +// fd[2] = bodyfd; + fd[1] = p[1]; + fd[2] = p[1]; if(prog[0] == nil){ prog = shell; @@ -210,14 +238,14 @@ onestring(int argc, char **argv) } int -getec(int efd) +getec(Fid *efd) { static char buf[8192]; static char *bufp; static int nbuf; if(nbuf == 0){ - nbuf = read(efd, buf, sizeof buf); + nbuf = fsread(efd, buf, sizeof buf); if(nbuf <= 0) error(nil); bufp = buf; @@ -227,7 +255,7 @@ getec(int efd) } int -geten(int efd) +geten(Fid *efd) { int n, c; @@ -240,7 +268,7 @@ geten(int efd) } int -geter(int efd, char *buf, int *nb) +geter(Fid *efd, char *buf, int *nb) { Rune r; int n; @@ -259,7 +287,7 @@ geter(int efd, char *buf, int *nb) } void -gete(int efd, Event *e) +gete(Fid *efd, Event *e) { int i, nb; @@ -297,10 +325,10 @@ nrunes(char *s, int nb) void stdinproc(void *v) { - int cfd = ctlfd; - int efd = eventfd; - int dfd = datafd; - int afd = addrfd; + Fid *cfd = ctlfd; + Fid *efd = eventfd; + Fid *dfd = datafd; + Fid *afd = addrfd; int fd0 = p[0]; Event e, e2, e3, e4; @@ -358,7 +386,7 @@ stdinproc(void *v) } if(e.flag&1 || (e.c2=='x' && e.nr==0 && e2.nr==0)){ /* send it straight back */ - fprint(efd, "%c%c%d %d\n", e.c1, e.c2, e.q0, e.q1); + fsfidprint(efd, "%c%c%d %d\n", e.c1, e.c2, e.q0, e.q1); break; } if(e.q0==e.q1 && (e.flag&2)){ @@ -380,7 +408,7 @@ stdinproc(void *v) /* just send it back */ if(e.flag & 2) gete(efd, &e2); - fprint(efd, "%c%c%d %d\n", e.c1, e.c2, e.q0, e.q1); + fsfidprint(efd, "%c%c%d %d\n", e.c1, e.c2, e.q0, e.q1); break; case 'd': @@ -399,8 +427,8 @@ void stdoutproc(void *v) { int fd1 = p[0]; - int afd = addrfd; - int dfd = datafd; + Fid *afd = addrfd; + Fid *dfd = datafd; int n, m, w, npart; char *buf, *s, *t; Rune r; @@ -411,7 +439,7 @@ stdoutproc(void *v) buf = malloc(8192+UTFmax+1); npart = 0; for(;;){ - n = read(fd1, buf+npart, 8192); + n = threadread(fd1, buf+npart, 8192); if(n < 0) error(nil); if(n == 0) @@ -445,17 +473,50 @@ stdoutproc(void *v) if(n > 0){ memmove(hold, buf+n, npart); buf[n] = 0; + n = label(buf, n); + buf[n] = 0; qlock(&q.lk); m = sprint(x, "#%d", q.p); - if(write(afd, x, m) != m) + if(fswrite(afd, x, m) != m) error("stdout writing address"); - if(write(dfd, buf, n) != n) + if(fswrite(dfd, buf, n) != n) error("stdout writing body"); q.p += nrunes(buf, n); qunlock(&q.lk); memmove(buf, hold, npart); } } +} + +char wdir[256]; +int +label(char *sr, int n) +{ + char *sl, *el, *er, *r; + + er = sr+n; + for(r=er-1; r>=sr; r--) + if(*r == '\007') + break; + if(r < sr) + return n; + + el = r+1; + if(el-sr > sizeof wdir) + sr = el - sizeof wdir; + for(sl=el-3; sl>=sr; sl--) + if(sl[0]=='\033' && sl[1]==']' && sl[2]==';') + break; + if(sl < sr) + return n; + + *r = 0; + snprint(wdir, sizeof wdir, "name %s/-%s\n0\n", sl+3, name); + fswrite(ctlfd, wdir, strlen(wdir)); + + memmove(sl, el, er-el); + n -= (el-sl); + return n; } int @@ -584,7 +645,7 @@ deltype(uint p0, uint p1) } void -type(Event *e, int fd0, int afd, int dfd) +type(Event *e, int fd0, Fid *afd, Fid *dfd) { int m, n, nr; char buf[128]; @@ -595,8 +656,8 @@ type(Event *e, int fd0, int afd, int dfd) m = e->q0; while(m < e->q1){ n = sprint(buf, "#%d", m); - write(afd, buf, n); - n = read(dfd, buf, sizeof buf); + fswrite(afd, buf, n); + n = fsread(dfd, buf, sizeof buf); nr = nrunes(buf, n); while(m+nr > e->q1){ do; while(n>0 && (buf[--n]&0xC0)==0x80); @@ -612,16 +673,16 @@ type(Event *e, int fd0, int afd, int dfd) } void -sende(Event *e, int fd0, int cfd, int afd, int dfd, int donl) +sende(Event *e, int fd0, Fid *cfd, Fid *afd, Fid *dfd, int donl) { int l, m, n, nr, lastc, end; char abuf[16], buf[128]; end = q.p+ntyper; l = sprint(abuf, "#%d", end); - write(afd, abuf, l); + fswrite(afd, abuf, l); if(e->nr > 0){ - write(dfd, e->b, e->nb); + fswrite(dfd, e->b, e->nb); addtype(e->c1, ntyper, e->b, e->nb, e->nr); lastc = e->r[e->nr-1]; }else{ @@ -629,8 +690,8 @@ sende(Event *e, int fd0, int cfd, int afd, int dfd, in lastc = 0; while(m < e->q1){ n = sprint(buf, "#%d", m); - write(afd, buf, n); - n = read(dfd, buf, sizeof buf); + fswrite(afd, buf, n); + n = fsread(dfd, buf, sizeof buf); nr = nrunes(buf, n); while(m+nr > e->q1){ do; while(n>0 && (buf[--n]&0xC0)==0x80); @@ -639,8 +700,8 @@ sende(Event *e, int fd0, int cfd, int afd, int dfd, in if(n == 0) break; l = sprint(abuf, "#%d", end); - write(afd, abuf, l); - write(dfd, buf, n); + fswrite(afd, abuf, l); + fswrite(dfd, buf, n); addtype(e->c1, ntyper, buf, n, nr); lastc = buf[n-1]; m += nr; @@ -648,9 +709,9 @@ sende(Event *e, int fd0, int cfd, int afd, int dfd, in } } if(donl && lastc!='\n'){ - write(dfd, "\n", 1); + fswrite(dfd, "\n", 1); addtype(e->c1, ntyper, "\n", 1, 1); } - write(cfd, "dot=addr", 8); + fswrite(cfd, "dot=addr", 8); sendtype(fd0); } blob - f9fe2420e57a5ca68c6c61a38f3e54f4ac7879e7 blob + 4caeb6c16bb64804b83b58997f5a90d5678f1b31 --- src/lib9/pipe.c +++ src/lib9/pipe.c @@ -3,6 +3,7 @@ #include #include +/* BUG: would like to preserve delimiters on systems that can */ int p9pipe(int fd[2]) {