Commit Diff


commit - c4991217e1c34d6bc14c7a5d2371a74342581539
commit + 05b7f431f01dad68d31b4681a5583a0c3de2921a
blob - 3d72437f12e559065cd257247596586712dbf33d
blob + 44965a135fc48c2a30b47e9af0e9edeb536950fe
--- src/cmd/9pserve.c
+++ src/cmd/9pserve.c
@@ -74,7 +74,7 @@ char adir[40];
 int isunix;
 Queue *outq;
 Queue *inq;
-int verbose;
+int verbose = 0;
 int msize = 8192;
 
 void *gethash(Hash**, uint);
@@ -276,8 +276,8 @@ connthread(void *arg)
 {
 	int i, fd;
 	Conn *c;
-	Hash *h;
-	Msg *m, *om;
+	Hash *h, *hnext;
+	Msg *m, *om, *mm;
 	Fid *f;
 	Ioproc *io;
 
@@ -405,11 +405,16 @@ connthread(void *arg)
 		}
 	}
 
-	if(verbose) fprint(2, "%s eof\n", c->dir);
+	if(verbose) fprint(2, "fd#%d eof; flushing conn\n", c->fd);
+
+	/* flush the output queue */
+	sendq(c->outq, nil);
+	while(c->outq != nil)
+		yield();
 
 	/* flush all outstanding messages */
 	for(i=0; i<NHASH; i++){
-		for(h=c->tag[i]; h; h=h->next){
+		for(h=c->tag[i]; h; h=hnext){
 			om = h->v;
 			m = msgnew();
 			m->internal = 1;
@@ -419,19 +424,22 @@ connthread(void *arg)
 			m->tx.tag = m->tag;
 			m->tx.oldtag = om->tag;
 			m->oldm = om;
-			om->ref++;
+			om->ref++;	/* for m->oldm */
 			m->ref++;	/* for outq */
 			sendomsg(m);
-			recvp(c->internal);
+			mm = recvp(c->internal);
+			assert(mm == m);
 			msgput(m);	/* got from recvp */
 			msgput(m);	/* got from msgnew */
 			msgput(om);	/* got from hash table */
+			hnext = h->next;
+			free(h);
 		}
 	}
 
 	/* clunk all outstanding fids */
 	for(i=0; i<NHASH; i++){
-		for(h=c->fid[i]; h; h=h->next){
+		for(h=c->fid[i]; h; h=hnext){
 			f = h->v;
 			m = msgnew();
 			m->internal = 1;
@@ -444,10 +452,13 @@ connthread(void *arg)
 			f->ref++;
 			m->ref++;
 			sendomsg(m);
-			recvp(c->internal);
+			mm = recvp(c->internal);
+			assert(mm == m);
 			msgput(m);	/* got from recvp */
 			msgput(m);	/* got from msgnew */
 			fidput(f);	/* got from hash table */
+			hnext = h->next;
+			free(h);
 		}
 	}
 
@@ -461,8 +472,6 @@ out:
 	c->inc = 0;
 	free(c->inq);
 	c->inq = 0;
-	free(c->outq);
-	c->outq = 0;
 	free(c);
 }
 
@@ -482,6 +491,7 @@ openfdthread(void *v)
 	io = ioproc();
 
 	tot = 0;
+	m = nil;
 	if(c->fdmode == OREAD){
 		for(;;){
 			if(verbose) fprint(2, "tread...");
@@ -506,11 +516,12 @@ openfdthread(void *v)
 				break;
 			tot += m->rx.count;
 			if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
-				fprint(2, "pipe write error: %r\n");
+				// fprint(2, "pipe write error: %r\n");
 				break;
 			}
 			msgput(m);
 			msgput(m);
+			m = nil;
 		}
 	}else{
 		for(;;){
@@ -521,7 +532,6 @@ openfdthread(void *v)
 			if((n=ioread(io, c->fd, buf, n)) <= 0){
 				if(n < 0)
 					fprint(2, "pipe read error: %r\n");
-				m = nil;
 				break;
 			}
 			m = msgnew();
@@ -540,11 +550,11 @@ openfdthread(void *v)
 			recvp(c->internal);
 			if(m->rx.type == Rerror){
 			//	fprint(2, "write error: %s\n", m->rx.ename);
-				continue;
 			}
-			tot = n;
+			tot += n;
 			msgput(m);
 			msgput(m);
+			m = nil;
 		}
 	}
 	if(verbose) fprint(2, "eof on %d fid %d\n", c->fd, fid->fid);
@@ -559,6 +569,7 @@ openfdthread(void *v)
 		m->internal = 1;
 		m->c = c;
 		m->tx.type = Tclunk;
+		m->tx.tag = m->tag;
 		m->tx.fid = fid->fid;
 		m->fid = fid;
 		fid->ref++;
@@ -635,12 +646,14 @@ connoutthread(void *arg)
 {
 	int err;
 	Conn *c;
+	Queue *outq;
 	Msg *m, *om;
 	Ioproc *io;
 
 	c = arg;
+	outq = c->outq;
 	io = ioproc();
-	while((m = recvq(c->outq)) != nil){
+	while((m = recvq(outq)) != nil){
 		err = m->tx.type+1 != m->rx.type;
 		if(!err && m->isopenfd)
 			if(xopenfd(m) < 0)
@@ -687,6 +700,8 @@ connoutthread(void *arg)
 			nbsendp(c->inc, 0);
 	}
 	closeioproc(io);
+	free(outq);
+	c->outq = nil;
 }
 
 void
@@ -740,13 +755,16 @@ inputthread(void *arg)
 			msgput(m);
 			continue;
 		}
-		if(verbose > 1) fprint(2, "* -> %F\n", &m->rx);
+		if(verbose > 1) fprint(2, "* -> %F%s\n", &m->rx,
+			m->internal ? " (internal)" : "");
 		m->rpkt = pkt;
 		m->rx.tag = m->ctag;
 		if(m->internal)
-			sendp(m->c->internal, 0);
-		else
+			sendp(m->c->internal, m);
+		else if(m->c->outq)
 			sendq(m->c->outq, m);
+		else
+			msgput(m);
 	}
 	closeioproc(io);
 	//fprint(2, "input eof\n");
@@ -856,12 +874,17 @@ msgnew(void)
 void
 msgput(Msg *m)
 {
+	if(m == nil)
+		return;
+
 	if(verbose > 2) fprint(2, "msgput tag %d/%d ref %d\n", m->tag, m->ctag, m->ref);
 	assert(m->ref > 0);
 	if(--m->ref > 0)
 		return;
 	m->c->nmsg--;
 	m->c = nil;
+	msgput(m->oldm);
+	m->oldm = nil;
 	fidput(m->fid);
 	m->fid = nil;
 	fidput(m->afid);
blob - ef8b7b619faef8eae2326b75f8a374d2a381fdc1
blob + 1f6f53504e3c2dc7143e6b3e7e09b19dd7eeaa80
--- src/cmd/win.c
+++ src/cmd/win.c
@@ -40,33 +40,49 @@ struct Q
 
 Q	q;
 
-int eventfd;
-int addrfd;
-int datafd;
-int ctlfd;
-int bodyfd;
+Fid *eventfd;
+Fid *addrfd;
+Fid *datafd;
+Fid *ctlfd;
+// int bodyfd;
 
 char	*typing;
 int	ntypeb;
 int	ntyper;
 int	ntypebreak;
 int	debug;
+char *name;
 
 char **prog;
 int p[2];
 Channel *cpid;
+Channel *cwait;
 int pid = -1;
 
+int	label(char*, int);
 void	error(char*);
 void	stdinproc(void*);
 void	stdoutproc(void*);
-void	type(Event*, int, int, int);
-void	sende(Event*, int, int, int, int, int);
+void	type(Event*, int, Fid*, Fid*);
+void	sende(Event*, int, Fid*, Fid*, Fid*, int);
 char	*onestring(int, char**);
 int	delete(Event*);
 void	deltype(uint, uint);
 void	runproc(void*);
 
+int
+fsfidprint(Fid *fid, char *fmt, ...)
+{
+	char buf[256];
+	va_list arg;
+	int n;
+
+	va_start(arg, fmt);
+	n = vsnprint(buf, sizeof buf, fmt, arg);
+	va_end(arg);
+	return fswrite(fid, buf, n);
+}
+
 void
 usage(void)
 {
@@ -84,12 +100,18 @@ nopipes(void *v, char *msg)
 }
 
 void
+waitthread(void *v)
+{
+	recvp(cwait);
+	threadexitsall(nil);
+}
+
+void
 threadmain(int argc, char **argv)
 {
 	int fd, id;
 	char buf[256];
 	char buf1[128];
-	char *name;
 	Fsys *fs;
 
 	ARGBEGIN{
@@ -110,8 +132,8 @@ threadmain(int argc, char **argv)
 	threadnotify(nopipes, 1);
 	if((fs = nsmount("acme", "")) < 0)
 		sysfatal("nsmount acme: %r");
-	ctlfd = fsopenfd(fs, "new/ctl", ORDWR|OCEXEC);
-	if(ctlfd < 0 || read(ctlfd, buf, 12) != 12)
+	ctlfd = fsopen(fs, "new/ctl", ORDWR|OCEXEC);
+	if(ctlfd < 0 || fsread(ctlfd, buf, 12) != 12)
 		sysfatal("ctl: %r");
 	id = atoi(buf);
 	sprint(buf, "%d/tag", id);
@@ -119,21 +141,27 @@ threadmain(int argc, char **argv)
 	write(fd, " Send Delete", 12);
 	close(fd);
 	sprint(buf, "%d/event", id);
-	eventfd = fsopenfd(fs, buf, ORDWR|OCEXEC);
+	eventfd = fsopen(fs, buf, ORDWR|OCEXEC);
 	sprint(buf, "%d/addr", id);
-	addrfd = fsopenfd(fs, buf, ORDWR|OCEXEC);
+	addrfd = fsopen(fs, buf, ORDWR|OCEXEC);
 	sprint(buf, "%d/data", id);
-	datafd = fsopenfd(fs, buf, ORDWR|OCEXEC);
+	datafd = fsopen(fs, buf, ORDWR|OCEXEC);
 	sprint(buf, "%d/body", id);
-	bodyfd = fsopenfd(fs, buf, ORDWR|OCEXEC);
+/*	bodyfd = fsopenfd(fs, buf, ORDWR|OCEXEC); */
+	if(eventfd==nil || addrfd==nil || datafd==nil)
+		sysfatal("data files: %r");
+/*
 	if(eventfd<0 || addrfd<0 || datafd<0 || bodyfd<0)
 		sysfatal("data files: %r");
+*/
 	fsunmount(fs);
 
 	if(pipe(p) < 0)
 		sysfatal("pipe: %r");
 
 	cpid = chancreate(sizeof(ulong), 1);
+	cwait = threadwaitchan();
+	threadcreate(waitthread, nil, STACK);
 	threadcreate(runproc, nil, STACK);
 	pid = recvul(cpid);
 	if(pid == -1)
@@ -141,13 +169,13 @@ threadmain(int argc, char **argv)
 
 	getwd(buf1, sizeof buf1);
 	sprint(buf, "name %s/-%s\n0\n", buf1, name);
-	write(ctlfd, buf, strlen(buf));
+	fswrite(ctlfd, buf, strlen(buf));
 	sprint(buf, "dumpdir %s/\n", buf1);
-	write(ctlfd, buf, strlen(buf));
+	fswrite(ctlfd, buf, strlen(buf));
 	sprint(buf, "dump %s\n", onestring(argc, argv));
-	write(ctlfd, buf, strlen(buf));
+	fswrite(ctlfd, buf, strlen(buf));
 	
-//	proccreate(stdoutproc, nil, STACK);
+	threadcreate(stdoutproc, nil, STACK);
 	stdinproc(nil);
 }
 
@@ -161,10 +189,10 @@ runproc(void *v)
 	USED(v);
 
 	fd[0] = p[1];
-	fd[1] = bodyfd;
-	fd[2] = bodyfd;
-//	fd[1] = p[1];
-//	fd[2] = p[1];
+//	fd[1] = bodyfd;
+//	fd[2] = bodyfd;
+	fd[1] = p[1];
+	fd[2] = p[1];
 
 	if(prog[0] == nil){
 		prog = shell;
@@ -210,14 +238,14 @@ onestring(int argc, char **argv)
 }
 
 int
-getec(int efd)
+getec(Fid *efd)
 {
 	static char buf[8192];
 	static char *bufp;
 	static int nbuf;
 
 	if(nbuf == 0){
-		nbuf = read(efd, buf, sizeof buf);
+		nbuf = fsread(efd, buf, sizeof buf);
 		if(nbuf <= 0)
 			error(nil);
 		bufp = buf;
@@ -227,7 +255,7 @@ getec(int efd)
 }
 
 int
-geten(int efd)
+geten(Fid *efd)
 {
 	int n, c;
 
@@ -240,7 +268,7 @@ geten(int efd)
 }
 
 int
-geter(int efd, char *buf, int *nb)
+geter(Fid *efd, char *buf, int *nb)
 {
 	Rune r;
 	int n;
@@ -259,7 +287,7 @@ geter(int efd, char *buf, int *nb)
 }
 
 void
-gete(int efd, Event *e)
+gete(Fid *efd, Event *e)
 {
 	int i, nb;
 
@@ -297,10 +325,10 @@ nrunes(char *s, int nb)
 void
 stdinproc(void *v)
 {
-	int cfd = ctlfd;
-	int efd = eventfd;
-	int dfd = datafd;
-	int afd = addrfd;
+	Fid *cfd = ctlfd;
+	Fid *efd = eventfd;
+	Fid *dfd = datafd;
+	Fid *afd = addrfd;
 	int fd0 = p[0];
 	Event e, e2, e3, e4;
 
@@ -358,7 +386,7 @@ stdinproc(void *v)
 				}
 				if(e.flag&1 || (e.c2=='x' && e.nr==0 && e2.nr==0)){
 					/* send it straight back */
-					fprint(efd, "%c%c%d %d\n", e.c1, e.c2, e.q0, e.q1);
+					fsfidprint(efd, "%c%c%d %d\n", e.c1, e.c2, e.q0, e.q1);
 					break;
 				}
 				if(e.q0==e.q1 && (e.flag&2)){
@@ -380,7 +408,7 @@ stdinproc(void *v)
 				/* just send it back */
 				if(e.flag & 2)
 					gete(efd, &e2);
-				fprint(efd, "%c%c%d %d\n", e.c1, e.c2, e.q0, e.q1);
+				fsfidprint(efd, "%c%c%d %d\n", e.c1, e.c2, e.q0, e.q1);
 				break;
 
 			case 'd':
@@ -399,8 +427,8 @@ void
 stdoutproc(void *v)
 {
 	int fd1 = p[0];
-	int afd = addrfd;
-	int dfd = datafd;
+	Fid *afd = addrfd;
+	Fid *dfd = datafd;
 	int n, m, w, npart;
 	char *buf, *s, *t;
 	Rune r;
@@ -411,7 +439,7 @@ stdoutproc(void *v)
 	buf = malloc(8192+UTFmax+1);
 	npart = 0;
 	for(;;){
-		n = read(fd1, buf+npart, 8192);
+		n = threadread(fd1, buf+npart, 8192);
 		if(n < 0)
 			error(nil);
 		if(n == 0)
@@ -445,17 +473,50 @@ stdoutproc(void *v)
 		if(n > 0){
 			memmove(hold, buf+n, npart);
 			buf[n] = 0;
+			n = label(buf, n);
+			buf[n] = 0;
 			qlock(&q.lk);
 			m = sprint(x, "#%d", q.p);
-			if(write(afd, x, m) != m)
+			if(fswrite(afd, x, m) != m)
 				error("stdout writing address");
-			if(write(dfd, buf, n) != n)
+			if(fswrite(dfd, buf, n) != n)
 				error("stdout writing body");
 			q.p += nrunes(buf, n);
 			qunlock(&q.lk);
 			memmove(buf, hold, npart);
 		}
 	}
+}
+
+char wdir[256];
+int
+label(char *sr, int n)
+{
+	char *sl, *el, *er, *r;
+
+	er = sr+n;
+	for(r=er-1; r>=sr; r--)
+		if(*r == '\007')
+			break;
+	if(r < sr)
+		return n;
+
+	el = r+1;
+	if(el-sr > sizeof wdir)
+		sr = el - sizeof wdir;
+	for(sl=el-3; sl>=sr; sl--)
+		if(sl[0]=='\033' && sl[1]==']' && sl[2]==';')
+			break;
+	if(sl < sr)
+		return n;
+
+	*r = 0;
+	snprint(wdir, sizeof wdir, "name %s/-%s\n0\n", sl+3, name);
+	fswrite(ctlfd, wdir, strlen(wdir));
+
+	memmove(sl, el, er-el);
+	n -= (el-sl);
+	return n;
 }
 
 int
@@ -584,7 +645,7 @@ deltype(uint p0, uint p1)
 }
 
 void
-type(Event *e, int fd0, int afd, int dfd)
+type(Event *e, int fd0, Fid *afd, Fid *dfd)
 {
 	int m, n, nr;
 	char buf[128];
@@ -595,8 +656,8 @@ type(Event *e, int fd0, int afd, int dfd)
 		m = e->q0;
 		while(m < e->q1){
 			n = sprint(buf, "#%d", m);
-			write(afd, buf, n);
-			n = read(dfd, buf, sizeof buf);
+			fswrite(afd, buf, n);
+			n = fsread(dfd, buf, sizeof buf);
 			nr = nrunes(buf, n);
 			while(m+nr > e->q1){
 				do; while(n>0 && (buf[--n]&0xC0)==0x80);
@@ -612,16 +673,16 @@ type(Event *e, int fd0, int afd, int dfd)
 }
 
 void
-sende(Event *e, int fd0, int cfd, int afd, int dfd, int donl)
+sende(Event *e, int fd0, Fid *cfd, Fid *afd, Fid *dfd, int donl)
 {
 	int l, m, n, nr, lastc, end;
 	char abuf[16], buf[128];
 
 	end = q.p+ntyper;
 	l = sprint(abuf, "#%d", end);
-	write(afd, abuf, l);
+	fswrite(afd, abuf, l);
 	if(e->nr > 0){
-		write(dfd, e->b, e->nb);
+		fswrite(dfd, e->b, e->nb);
 		addtype(e->c1, ntyper, e->b, e->nb, e->nr);
 		lastc = e->r[e->nr-1];
 	}else{
@@ -629,8 +690,8 @@ sende(Event *e, int fd0, int cfd, int afd, int dfd, in
 		lastc = 0;
 		while(m < e->q1){
 			n = sprint(buf, "#%d", m);
-			write(afd, buf, n);
-			n = read(dfd, buf, sizeof buf);
+			fswrite(afd, buf, n);
+			n = fsread(dfd, buf, sizeof buf);
 			nr = nrunes(buf, n);
 			while(m+nr > e->q1){
 				do; while(n>0 && (buf[--n]&0xC0)==0x80);
@@ -639,8 +700,8 @@ sende(Event *e, int fd0, int cfd, int afd, int dfd, in
 			if(n == 0)
 				break;
 			l = sprint(abuf, "#%d", end);
-			write(afd, abuf, l);
-			write(dfd, buf, n);
+			fswrite(afd, abuf, l);
+			fswrite(dfd, buf, n);
 			addtype(e->c1, ntyper, buf, n, nr);
 			lastc = buf[n-1];
 			m += nr;
@@ -648,9 +709,9 @@ sende(Event *e, int fd0, int cfd, int afd, int dfd, in
 		}
 	}
 	if(donl && lastc!='\n'){
-		write(dfd, "\n", 1);
+		fswrite(dfd, "\n", 1);
 		addtype(e->c1, ntyper, "\n", 1, 1);
 	}
-	write(cfd, "dot=addr", 8);
+	fswrite(cfd, "dot=addr", 8);
 	sendtype(fd0);
 }
blob - f9fe2420e57a5ca68c6c61a38f3e54f4ac7879e7
blob + 4caeb6c16bb64804b83b58997f5a90d5678f1b31
--- src/lib9/pipe.c
+++ src/lib9/pipe.c
@@ -3,6 +3,7 @@
 #include <libc.h>
 #include <sys/socket.h>
 
+/* BUG: would like to preserve delimiters on systems that can */
 int
 p9pipe(int fd[2])
 {