/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */ /* See COPYRIGHT */ #include #include #include /* * If you fork off two procs running muxrecvproc and muxsendproc, * then muxrecv/muxsend (and thus muxrpc) will never block except on * rendevouses, which is nice when it's running in one thread of many. */ void _muxrecvproc(void *v) { void *p; Mux *mux; Muxqueue *q; mux = v; q = _muxqalloc(); qlock(&mux->lk); mux->readq = q; qlock(&mux->inlk); rwakeup(&mux->rpcfork); qunlock(&mux->lk); while((p = mux->recv(mux)) != nil) if(_muxqsend(q, p) < 0){ free(p); break; } qunlock(&mux->inlk); qlock(&mux->lk); _muxqhangup(q); p = nil; while(_muxnbqrecv(q, &p) && p != nil){ free(p); p = nil; } free(q); mux->readq = nil; rwakeup(&mux->rpcfork); qunlock(&mux->lk); } void _muxsendproc(void *v) { Muxqueue *q; void *p; Mux *mux; mux = v; q = _muxqalloc(); qlock(&mux->lk); mux->writeq = q; qlock(&mux->outlk); rwakeup(&mux->rpcfork); qunlock(&mux->lk); while((p = _muxqrecv(q)) != nil) if(mux->send(mux, p) < 0) break; qunlock(&mux->outlk); qlock(&mux->lk); _muxqhangup(q); while(_muxnbqrecv(q, &p)) free(p); free(q); mux->writeq = nil; rwakeup(&mux->rpcfork); qunlock(&mux->lk); return; } int _muxrecv(Mux *mux, int canblock, void **vp) { void *p; int ret; qlock(&mux->lk); if(mux->readq){ qunlock(&mux->lk); if(canblock){ *vp = _muxqrecv(mux->readq); return 1; } return _muxnbqrecv(mux->readq, vp); } qlock(&mux->inlk); qunlock(&mux->lk); if(canblock){ p = mux->recv(mux); ret = 1; }else{ if(mux->nbrecv) ret = mux->nbrecv(mux, &p); else{ /* send eof, not "no packet ready" */ p = nil; ret = 1; } } qunlock(&mux->inlk); *vp = p; return ret; } int _muxsend(Mux *mux, void *p) { qlock(&mux->lk); /* if(mux->state != VtStateConnected){ packetfree(p); werrstr("not connected"); qunlock(&mux->lk); return -1; } */ if(mux->writeq){ qunlock(&mux->lk); if(_muxqsend(mux->writeq, p) < 0){ free(p); return -1; } return 0; } qlock(&mux->outlk); qunlock(&mux->lk); if(mux->send(mux, p) < 0){ qunlock(&mux->outlk); /* vthangup(mux); */ return -1; } qunlock(&mux->outlk); return 0; }