Blob


1 /* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
2 /* See COPYRIGHT */
4 #include <u.h>
5 #include <libc.h>
6 #include <mux.h>
8 /*
9 * If you fork off two procs running muxrecvproc and muxsendproc,
10 * then muxrecv/muxsend (and thus muxrpc) will never block except on
11 * rendevouses, which is nice when it's running in one thread of many.
12 */
13 void
14 _muxrecvproc(void *v)
15 {
16 void *p;
17 Mux *mux;
18 Muxqueue *q;
20 mux = v;
21 q = _muxqalloc();
23 qlock(&mux->lk);
24 mux->readq = q;
25 qlock(&mux->inlk);
26 rwakeup(&mux->rpcfork);
27 qunlock(&mux->lk);
29 while((p = mux->recv(mux)) != nil)
30 if(_muxqsend(q, p) < 0){
31 free(p);
32 break;
33 }
34 qunlock(&mux->inlk);
35 qlock(&mux->lk);
36 _muxqhangup(q);
37 while(_muxnbqrecv(q, &p))
38 free(p);
39 free(q);
40 mux->readq = nil;
41 rwakeup(&mux->rpcfork);
42 qunlock(&mux->lk);
43 }
45 void
46 _muxsendproc(void *v)
47 {
48 Muxqueue *q;
49 void *p;
50 Mux *mux;
52 mux = v;
53 q = _muxqalloc();
55 qlock(&mux->lk);
56 mux->writeq = q;
57 qlock(&mux->outlk);
58 rwakeup(&mux->rpcfork);
59 qunlock(&mux->lk);
61 while((p = _muxqrecv(q)) != nil)
62 if(mux->send(mux, p) < 0)
63 break;
64 qunlock(&mux->outlk);
65 qlock(&mux->lk);
66 _muxqhangup(q);
67 while(_muxnbqrecv(q, &p))
68 free(p);
69 free(q);
70 mux->writeq = nil;
71 rwakeup(&mux->rpcfork);
72 qunlock(&mux->lk);
73 return;
74 }
76 int
77 _muxrecv(Mux *mux, int canblock, void **vp)
78 {
79 void *p;
80 int ret;
82 qlock(&mux->lk);
83 if(mux->readq){
84 qunlock(&mux->lk);
85 if(canblock){
86 *vp = _muxqrecv(mux->readq);
87 return 1;
88 }
89 return _muxnbqrecv(mux->readq, vp);
90 }
92 qlock(&mux->inlk);
93 qunlock(&mux->lk);
94 if(canblock){
95 p = mux->recv(mux);
96 ret = 1;
97 }else{
98 if(mux->nbrecv)
99 ret = mux->nbrecv(mux, &p);
100 else{
101 /* send eof, not "no packet ready" */
102 p = nil;
103 ret = 1;
106 qunlock(&mux->inlk);
107 *vp = p;
108 return ret;
111 int
112 _muxsend(Mux *mux, void *p)
114 qlock(&mux->lk);
115 /*
116 if(mux->state != VtStateConnected){
117 packetfree(p);
118 werrstr("not connected");
119 qunlock(&mux->lk);
120 return -1;
122 */
123 if(mux->writeq){
124 qunlock(&mux->lk);
125 if(_muxqsend(mux->writeq, p) < 0){
126 free(p);
127 return -1;
129 return 0;
132 qlock(&mux->outlk);
133 qunlock(&mux->lk);
134 if(mux->send(mux, p) < 0){
135 qunlock(&mux->outlk);
136 /* vthangup(mux); */
137 return -1;
139 qunlock(&mux->outlk);
140 return 0;