Blob
1 /* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */2 /* See COPYRIGHT */4 #include <u.h>5 #include <libc.h>6 #include <mux.h>8 /*9 * If you fork off two procs running muxrecvproc and muxsendproc,10 * then muxrecv/muxsend (and thus muxrpc) will never block except on11 * rendevouses, which is nice when it's running in one thread of many.12 */13 void14 _muxrecvproc(void *v)15 {16 void *p;17 Mux *mux;18 Muxqueue *q;20 mux = v;21 q = _muxqalloc();23 qlock(&mux->lk);24 mux->readq = q;25 qlock(&mux->inlk);26 rwakeup(&mux->rpcfork);27 qunlock(&mux->lk);29 while((p = mux->recv(mux)) != nil)30 if(_muxqsend(q, p) < 0){31 free(p);32 break;33 }34 qunlock(&mux->inlk);35 qlock(&mux->lk);36 _muxqhangup(q);37 p = nil;38 while(_muxnbqrecv(q, &p) && p != nil){39 free(p);40 p = nil;41 }42 free(q);43 mux->readq = nil;44 rwakeup(&mux->rpcfork);45 qunlock(&mux->lk);46 }48 void49 _muxsendproc(void *v)50 {51 Muxqueue *q;52 void *p;53 Mux *mux;55 mux = v;56 q = _muxqalloc();58 qlock(&mux->lk);59 mux->writeq = q;60 qlock(&mux->outlk);61 rwakeup(&mux->rpcfork);62 qunlock(&mux->lk);64 while((p = _muxqrecv(q)) != nil)65 if(mux->send(mux, p) < 0)66 break;67 qunlock(&mux->outlk);68 qlock(&mux->lk);69 _muxqhangup(q);70 while(_muxnbqrecv(q, &p))71 free(p);72 free(q);73 mux->writeq = nil;74 rwakeup(&mux->rpcfork);75 qunlock(&mux->lk);76 return;77 }79 int80 _muxrecv(Mux *mux, int canblock, void **vp)81 {82 void *p;83 int ret;85 qlock(&mux->lk);86 if(mux->readq){87 qunlock(&mux->lk);88 if(canblock){89 *vp = _muxqrecv(mux->readq);90 return 1;91 }92 return _muxnbqrecv(mux->readq, vp);93 }95 qlock(&mux->inlk);96 qunlock(&mux->lk);97 if(canblock){98 p = mux->recv(mux);99 ret = 1;100 }else{101 if(mux->nbrecv)102 ret = mux->nbrecv(mux, &p);103 else{104 /* send eof, not "no packet ready" */105 p = nil;106 ret = 1;107 }108 }109 qunlock(&mux->inlk);110 *vp = p;111 return ret;112 }114 int115 _muxsend(Mux *mux, void *p)116 {117 qlock(&mux->lk);118 /*119 if(mux->state != VtStateConnected){120 packetfree(p);121 werrstr("not connected");122 qunlock(&mux->lk);123 return -1;124 }125 */126 if(mux->writeq){127 qunlock(&mux->lk);128 if(_muxqsend(mux->writeq, p) < 0){129 free(p);130 return -1;131 }132 return 0;133 }135 qlock(&mux->outlk);136 qunlock(&mux->lk);137 if(mux->send(mux, p) < 0){138 qunlock(&mux->outlk);139 /* vthangup(mux); */140 return -1;141 }142 qunlock(&mux->outlk);143 return 0;144 }