Commit Diff


commit - 1f61c0914d5f42054b075b6dc5296894de2d25ab
commit + 841d71b5c6be4851572a60c5a9f9dd239ea69e56
blob - 04e9806137c5fed5d33aaebaaa0f96ecd79f8aaf
blob + f7b76bf9735ef7a2a66bf574ad524af540c73a38
--- src/cmd/9pserve.c
+++ src/cmd/9pserve.c
@@ -43,6 +43,7 @@ struct Msg
 {
 	Conn *c;
 	int internal;
+	int sync;
 	int ref;
 	int ctag;
 	int tag;
@@ -73,6 +74,7 @@ struct Conn
 	Hash *fid[NHASH];
 	Queue *outq;
 	Queue *inq;
+	Channel *outqdead;
 	int dotu;
 };
 
@@ -288,6 +290,7 @@ listenthread(void *arg)
 		c->internal = chancreate(sizeof(void*), 0);
 		c->inq = qalloc();
 		c->outq = qalloc();
+		c->outqdead = chancreate(sizeof(void*), 0);
 		if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
 		threadcreate(connthread, c, STACK);
 	}	
@@ -348,7 +351,7 @@ connthread(void *arg)
 	int i, fd;
 	Conn *c;
 	Hash *h, *hnext;
-	Msg *m, *om, *mm;
+	Msg *m, *om, *mm, sync;
 	Fid *f;
 	Ioproc *io;
 
@@ -518,16 +521,12 @@ connthread(void *arg)
 	}
 
 	if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
-
-	/* flush the output queue */
-	sendq(c->outq, nil);
-	while(c->outq != nil)
-		yield();
 
 	/* flush all outstanding messages */
 	for(i=0; i<NHASH; i++){
-		for(h=c->tag[i]; h; h=hnext){
+		while((h = c->tag[i]) != nil){
 			om = h->v;
+			msgincref(om); /* for us */
 			m = msgnew(0);
 			m->internal = 1;
 			m->c = c;
@@ -543,12 +542,31 @@ connthread(void *arg)
 			assert(mm == m);
 			msgput(m);	/* got from recvp */
 			msgput(m);	/* got from msgnew */
-			msgput(om);	/* got from hash table */
-			hnext = h->next;
-			free(h);
+			if(delhash(c->tag, om->tag, om) == 0)
+				msgput(om);	/* got from hash table */
+			msgput(om);	/* got from msgincref */
 		}
 	}
+
+	/*
+	 * outputthread has written all its messages
+	 * to the remote connection (because we've gotten all the replies!),
+	 * but it might not have gotten a chance to msgput
+	 * the very last one.  sync up to make sure.
+	 */
+	memset(&sync, 0, sizeof sync);
+	sync.sync = 1;
+	sync.c = c;
+	sendq(outq, &sync);
+	recvp(c->outqdead);
+
+	/* should be no messages left anywhere. */
+	assert(c->nmsg == 0);
 
+	/* everything is quiet; can close the local output queue. */
+	sendq(c->outq, nil);
+	recvp(c->outqdead);
+
 	/* clunk all outstanding fids */
 	for(i=0; i<NHASH; i++){
 		for(h=c->fid[i]; h; h=hnext){
@@ -765,15 +783,13 @@ connoutthread(void *arg)
 	char *ename;
 	int err;
 	Conn *c;
-	Queue *outq;
 	Msg *m, *om;
 	Ioproc *io;
 
 	c = arg;
-	outq = c->outq;
 	io = ioproc();
 	threadsetname("connout %s", c->dir);
-	while((m = recvq(outq)) != nil){
+	while((m = recvq(c->outq)) != nil){
 		err = m->tx.type+1 != m->rx.type;
 		if(!err && m->isopenfd)
 			if(xopenfd(m) < 0)
@@ -843,8 +859,9 @@ connoutthread(void *arg)
 			nbsendp(c->inc, 0);
 	}
 	closeioproc(io);
-	free(outq);
+	free(c->outq);
 	c->outq = nil;
+	sendp(c->outqdead, nil);
 }
 
 void
@@ -857,6 +874,10 @@ outputthread(void *arg)
 	io = ioproc();
 	threadsetname("output");
 	while((m = recvq(outq)) != nil){
+		if(m->sync){
+			sendp(m->c->outqdead, nil);
+			continue;
+		}
 		if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
 		rewritehdr(&m->tx, m->tpkt);
 		if(mwrite9p(io, 1, m->tpkt) < 0)
@@ -1148,7 +1169,6 @@ struct Qel
 
 struct Queue
 {
-	int hungup;
 	QLock lk;
 	Rendez r;
 	Qel *head;
@@ -1174,12 +1194,6 @@ sendq(Queue *q, void *p)
 
 	e = emalloc(sizeof(Qel));
 	qlock(&q->lk);
-	if(q->hungup){
-		free(e);
-		werrstr("hungup queue");
-		qunlock(&q->lk);
-		return -1;
-	}
 	e->p = p;
 	e->next = nil;
 	if(q->head == nil)
@@ -1199,12 +1213,8 @@ recvq(Queue *q)
 	Qel *e;
 
 	qlock(&q->lk);
-	while(q->head == nil && !q->hungup)
+	while(q->head == nil)
 		rsleep(&q->r);
-	if(q->hungup){
-		qunlock(&q->lk);
-		return nil;
-	}
 	e = q->head;
 	q->head = e->next;
 	qunlock(&q->lk);