Commit Diff


commit - b733ffba4fa1af07540e9687b2f84c4f3014063e
commit + 2d2e5c71f73a8ac0656ad26b406945937f4b2c0e
blob - 3d932b1a1f4925bd0c79192b237ef990309bb667
blob + 4a89ca22a72bd0c27c2ea207a5012e908b819fb2
--- src/libmux/io.c
+++ src/libmux/io.c
@@ -74,7 +74,7 @@ _muxsendproc(void *v)
 }
 
 void*
-_muxrecv(Mux *mux)
+_muxrecv(Mux *mux, int canblock)
 {
 	void *p;
 
@@ -88,15 +88,24 @@ _muxrecv(Mux *mux)
 */
 	if(mux->readq){
 		qunlock(&mux->lk);
-		return _muxqrecv(mux->readq);
+		if(canblock)
+			return _muxqrecv(mux->readq);
+		return _muxnbqrecv(mux->readq);
 	}
 
 	qlock(&mux->inlk);
 	qunlock(&mux->lk);
-	p = mux->recv(mux);
+	if(canblock)
+		p = mux->recv(mux);
+	else{
+		if(mux->nbrecv)
+			p = mux->nbrecv(mux);
+		else
+			p = nil;
+	}
 	qunlock(&mux->inlk);
 /*
-	if(!p)
+	if(!p && canblock)
 		vthangup(mux);
 */
 	return p;
blob - e6438b019d8990dec8c7e898712def5edf64a989
blob + bfabb23890aab5bddab4123c80d2f72dd3e4009f
--- src/libmux/mux.c
+++ src/libmux/mux.c
@@ -26,21 +26,31 @@ muxinit(Mux *mux)
 	mux->sleep.prev = &mux->sleep;
 }
 
-void*
-muxrpc(Mux *mux, void *tx)
+static Muxrpc*
+allocmuxrpc(Mux *mux)
 {
-	int tag;
-	Muxrpc *r, *r2;
-	void *p;
-
+	Muxrpc *r;
+	
 	/* must malloc because stack could be private */
 	r = mallocz(sizeof(Muxrpc), 1);
 	if(r == nil){
 		werrstr("mallocz: %r");
 		return nil;
 	}
+	r->mux = mux;
 	r->r.l = &mux->lk;
+	r->waiting = 1;
+	
+	return r;
+}
 
+static int
+tagmuxrpc(Muxrpc *r, void *tx)
+{
+	int tag;
+	Mux *mux;
+	
+	mux = r->mux;
 	/* assign the tag, add selves to response queue */
 	qlock(&mux->lk);
 	tag = gettag(mux, r);
@@ -56,54 +66,83 @@ muxrpc(Mux *mux, void *tx)
 		dequeue(mux, r);
 		puttag(mux, r);
 		qunlock(&mux->lk);
-		return nil;
+		return -1;
 	}
+	return 0;
+}
 
+void
+muxmsgandqlock(Mux *mux, void *p)
+{
+	int tag;
+	Muxrpc *r2;
+
+	tag = mux->gettag(mux, p) - mux->mintag;
+/*print("mux tag %d\n", tag); */
 	qlock(&mux->lk);
+	/* hand packet to correct sleeper */
+	if(tag < 0 || tag >= mux->mwait){
+		fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
+		/* must leak packet! don't know how to free it! */
+		return;
+	}
+	r2 = mux->wait[tag];
+	if(r2 == nil || r2->prev == nil){
+		fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag);
+		/* must leak packet! don't know how to free it! */
+		return;
+	}	
+	r2->p = p;
+	dequeue(mux, r2);
+	rwakeup(&r2->r);
+}
+
+void
+electmuxer(Mux *mux)
+{
+	/* if there is anyone else sleeping, wake them to mux */
+	if(mux->sleep.next != &mux->sleep){
+		mux->muxer = mux->sleep.next;
+		rwakeup(&mux->muxer->r);
+	}else
+		mux->muxer = nil;
+}
+
+void*
+muxrpc(Mux *mux, void *tx)
+{
+	int tag;
+	Muxrpc *r;
+	void *p;
+
+	if((r = allocmuxrpc(mux)) == nil)
+		return nil;
+
+	if((tag = tagmuxrpc(r, tx)) < 0)
+		return nil;
+
+	qlock(&mux->lk);
 	/* wait for our packet */
-	while(mux->muxer && !r->p){
+	while(mux->muxer && mux->muxer != r && !r->p)
 		rsleep(&r->r);
-	}
 
 	/* if not done, there's no muxer: start muxing */
 	if(!r->p){
-		if(mux->muxer)
+		if(mux->muxer != nil && mux->muxer != r)
 			abort();
-		mux->muxer = 1;
+		mux->muxer = r;
 		while(!r->p){
 			qunlock(&mux->lk);
-			p = _muxrecv(mux);
-			if(p)
-				tag = mux->gettag(mux, p) - mux->mintag;
-			else
-				tag = ~0;
-/*print("mux tag %d\n", tag); */
-			qlock(&mux->lk);
-			if(p == nil){	/* eof -- just give up and pass the buck */
+			p = _muxrecv(mux, 1);
+			if(p == nil){
+				/* eof -- just give up and pass the buck */
+				qlock(&mux->lk);
 				dequeue(mux, r);
 				break;
 			}
-			/* hand packet to correct sleeper */
-			if(tag < 0 || tag >= mux->mwait){
-				fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
-				/* must leak packet! don't know how to free it! */
-				continue;
-			}
-			r2 = mux->wait[tag];
-			if(r2 == nil || r2->prev == nil){
-				fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag);
-				/* must leak packet! don't know how to free it! */
-				continue;
-			}	
-			r2->p = p;
-			dequeue(mux, r2);
-			rwakeup(&r2->r);
+			muxmsgandqlock(mux, p);
 		}
-		mux->muxer = 0;
-
-		/* if there is anyone else sleeping, wake them to mux */
-		if(mux->sleep.next != &mux->sleep)
-			rwakeup(&mux->sleep.next->r);
+		electmuxer(mux);
 	}
 /*print("finished %p\n", r); */
 	p = r->p;
@@ -114,6 +153,47 @@ muxrpc(Mux *mux, void *tx)
 	return p;
 }
 
+Muxrpc*
+muxrpcstart(Mux *mux, void *tx)
+{
+	int tag;
+	Muxrpc *r;
+
+	if((r = allocmuxrpc(mux)) == nil)
+		return nil;
+	if((tag = tagmuxrpc(r, tx)) < 0)
+		return nil;
+	return r;
+}
+
+void*
+muxrpccanfinish(Muxrpc *r)
+{
+	char *p;
+	Mux *mux;
+	
+	mux = r->mux;
+	qlock(&mux->lk);
+	if(!r->p && !mux->muxer){
+		mux->muxer = r;
+		while(!r->p){
+			qunlock(&mux->lk);
+			p = _muxrecv(mux, 0);
+			if(p == nil){
+				qlock(&mux->lk);
+				break;
+			}
+			muxmsgandqlock(mux, p);
+		}
+		electmuxer(mux);
+	}
+	p = r->p;
+	if(p)
+		puttag(mux, r);
+	qunlock(&mux->lk);
+	return p;
+}
+
 static void
 enqueue(Mux *mux, Muxrpc *r)
 {