Blob


1 /* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
2 /* See COPYRIGHT */
4 #include <u.h>
5 #include <libc.h>
6 #include <mux.h>
8 /*
9 * If you fork off two procs running muxrecvproc and muxsendproc,
10 * then muxrecv/muxsend (and thus muxrpc) will never block except on
11 * rendevouses, which is nice when it's running in one thread of many.
12 */
13 void
14 _muxrecvproc(void *v)
15 {
16 void *p;
17 Mux *mux;
18 Muxqueue *q;
20 mux = v;
21 q = _muxqalloc();
23 qlock(&mux->lk);
24 mux->readq = q;
25 qlock(&mux->inlk);
26 rwakeup(&mux->rpcfork);
27 qunlock(&mux->lk);
29 while((p = mux->recv(mux)) != nil)
30 if(_muxqsend(q, p) < 0){
31 free(p);
32 break;
33 }
34 qunlock(&mux->inlk);
35 qlock(&mux->lk);
36 _muxqhangup(q);
37 p = nil;
38 while(_muxnbqrecv(q, &p) && p != nil){
39 free(p);
40 p = nil;
41 }
42 free(q);
43 mux->readq = nil;
44 rwakeup(&mux->rpcfork);
45 qunlock(&mux->lk);
46 }
48 void
49 _muxsendproc(void *v)
50 {
51 Muxqueue *q;
52 void *p;
53 Mux *mux;
55 mux = v;
56 q = _muxqalloc();
58 qlock(&mux->lk);
59 mux->writeq = q;
60 qlock(&mux->outlk);
61 rwakeup(&mux->rpcfork);
62 qunlock(&mux->lk);
64 while((p = _muxqrecv(q)) != nil)
65 if(mux->send(mux, p) < 0)
66 break;
67 qunlock(&mux->outlk);
68 qlock(&mux->lk);
69 _muxqhangup(q);
70 while(_muxnbqrecv(q, &p))
71 free(p);
72 free(q);
73 mux->writeq = nil;
74 rwakeup(&mux->rpcfork);
75 qunlock(&mux->lk);
76 return;
77 }
79 int
80 _muxrecv(Mux *mux, int canblock, void **vp)
81 {
82 void *p;
83 int ret;
85 qlock(&mux->lk);
86 if(mux->readq){
87 qunlock(&mux->lk);
88 if(canblock){
89 *vp = _muxqrecv(mux->readq);
90 return 1;
91 }
92 return _muxnbqrecv(mux->readq, vp);
93 }
95 qlock(&mux->inlk);
96 qunlock(&mux->lk);
97 if(canblock){
98 p = mux->recv(mux);
99 ret = 1;
100 }else{
101 if(mux->nbrecv)
102 ret = mux->nbrecv(mux, &p);
103 else{
104 /* send eof, not "no packet ready" */
105 p = nil;
106 ret = 1;
109 qunlock(&mux->inlk);
110 *vp = p;
111 return ret;
114 int
115 _muxsend(Mux *mux, void *p)
117 qlock(&mux->lk);
118 /*
119 if(mux->state != VtStateConnected){
120 packetfree(p);
121 werrstr("not connected");
122 qunlock(&mux->lk);
123 return -1;
125 */
126 if(mux->writeq){
127 qunlock(&mux->lk);
128 if(_muxqsend(mux->writeq, p) < 0){
129 free(p);
130 return -1;
132 return 0;
135 qlock(&mux->outlk);
136 qunlock(&mux->lk);
137 if(mux->send(mux, p) < 0){
138 qunlock(&mux->outlk);
139 /* vthangup(mux); */
140 return -1;
142 qunlock(&mux->outlk);
143 return 0;