Blame


1 3a194702 2006-11-04 devnull /* Copyright (C) 2003-2006 Russ Cox, Massachusetts Institute of Technology */
2 d3df3087 2003-12-06 devnull /* See COPYRIGHT */
3 d3df3087 2003-12-06 devnull
4 d3df3087 2003-12-06 devnull /*
5 d3df3087 2003-12-06 devnull * Generic RPC packet multiplexor. Inspired by but not derived from
6 d3df3087 2003-12-06 devnull * Plan 9 kernel. Originally developed as part of Tra, later used in
7 d3df3087 2003-12-06 devnull * libnventi, and then finally split out into a generic library.
8 d3df3087 2003-12-06 devnull */
9 d3df3087 2003-12-06 devnull
10 d3df3087 2003-12-06 devnull #include <u.h>
11 d3df3087 2003-12-06 devnull #include <libc.h>
12 d3df3087 2003-12-06 devnull #include <mux.h>
13 d3df3087 2003-12-06 devnull
14 d3df3087 2003-12-06 devnull static int gettag(Mux*, Muxrpc*);
15 d3df3087 2003-12-06 devnull static void puttag(Mux*, Muxrpc*);
16 d3df3087 2003-12-06 devnull static void enqueue(Mux*, Muxrpc*);
17 d3df3087 2003-12-06 devnull static void dequeue(Mux*, Muxrpc*);
18 d3df3087 2003-12-06 devnull
19 d3df3087 2003-12-06 devnull void
20 d3df3087 2003-12-06 devnull muxinit(Mux *mux)
21 d3df3087 2003-12-06 devnull {
22 b214663d 2004-01-09 devnull memset(&mux->lk, 0, sizeof(Mux)-offsetof(Mux, lk));
23 d3df3087 2003-12-06 devnull mux->tagrend.l = &mux->lk;
24 b214663d 2004-01-09 devnull mux->rpcfork.l = &mux->lk;
25 d3df3087 2003-12-06 devnull mux->sleep.next = &mux->sleep;
26 d3df3087 2003-12-06 devnull mux->sleep.prev = &mux->sleep;
27 d3df3087 2003-12-06 devnull }
28 d3df3087 2003-12-06 devnull
29 2d2e5c71 2006-06-25 devnull static Muxrpc*
30 2d2e5c71 2006-06-25 devnull allocmuxrpc(Mux *mux)
31 d3df3087 2003-12-06 devnull {
32 2d2e5c71 2006-06-25 devnull Muxrpc *r;
33 fa325e9b 2020-01-10 cross
34 d3df3087 2003-12-06 devnull /* must malloc because stack could be private */
35 d3df3087 2003-12-06 devnull r = mallocz(sizeof(Muxrpc), 1);
36 a28c0548 2005-07-13 devnull if(r == nil){
37 a28c0548 2005-07-13 devnull werrstr("mallocz: %r");
38 d3df3087 2003-12-06 devnull return nil;
39 a28c0548 2005-07-13 devnull }
40 2d2e5c71 2006-06-25 devnull r->mux = mux;
41 d3df3087 2003-12-06 devnull r->r.l = &mux->lk;
42 2d2e5c71 2006-06-25 devnull r->waiting = 1;
43 fa325e9b 2020-01-10 cross
44 2d2e5c71 2006-06-25 devnull return r;
45 2d2e5c71 2006-06-25 devnull }
46 d3df3087 2003-12-06 devnull
47 2d2e5c71 2006-06-25 devnull static int
48 2d2e5c71 2006-06-25 devnull tagmuxrpc(Muxrpc *r, void *tx)
49 2d2e5c71 2006-06-25 devnull {
50 2d2e5c71 2006-06-25 devnull int tag;
51 2d2e5c71 2006-06-25 devnull Mux *mux;
52 fa325e9b 2020-01-10 cross
53 2d2e5c71 2006-06-25 devnull mux = r->mux;
54 43db87f1 2004-12-27 devnull /* assign the tag, add selves to response queue */
55 ceb04770 2003-12-09 devnull qlock(&mux->lk);
56 d3df3087 2003-12-06 devnull tag = gettag(mux, r);
57 cbeb0b26 2006-04-01 devnull /*print("gettag %p %d\n", r, tag); */
58 43db87f1 2004-12-27 devnull enqueue(mux, r);
59 ceb04770 2003-12-09 devnull qunlock(&mux->lk);
60 43db87f1 2004-12-27 devnull
61 43db87f1 2004-12-27 devnull /* actually send the packet */
62 ceb04770 2003-12-09 devnull if(tag < 0 || mux->settag(mux, tx, tag) < 0 || _muxsend(mux, tx) < 0){
63 a28c0548 2005-07-13 devnull werrstr("settag/send tag %d: %r", tag);
64 a28c0548 2005-07-13 devnull fprint(2, "%r\n");
65 ceb04770 2003-12-09 devnull qlock(&mux->lk);
66 d165fa68 2005-01-04 devnull dequeue(mux, r);
67 d3df3087 2003-12-06 devnull puttag(mux, r);
68 ceb04770 2003-12-09 devnull qunlock(&mux->lk);
69 2d2e5c71 2006-06-25 devnull return -1;
70 d3df3087 2003-12-06 devnull }
71 2d2e5c71 2006-06-25 devnull return 0;
72 2d2e5c71 2006-06-25 devnull }
73 d3df3087 2003-12-06 devnull
74 2d2e5c71 2006-06-25 devnull void
75 2d2e5c71 2006-06-25 devnull muxmsgandqlock(Mux *mux, void *p)
76 2d2e5c71 2006-06-25 devnull {
77 2d2e5c71 2006-06-25 devnull int tag;
78 2d2e5c71 2006-06-25 devnull Muxrpc *r2;
79 2d2e5c71 2006-06-25 devnull
80 2d2e5c71 2006-06-25 devnull tag = mux->gettag(mux, p) - mux->mintag;
81 2d2e5c71 2006-06-25 devnull /*print("mux tag %d\n", tag); */
82 d3df3087 2003-12-06 devnull qlock(&mux->lk);
83 2d2e5c71 2006-06-25 devnull /* hand packet to correct sleeper */
84 2d2e5c71 2006-06-25 devnull if(tag < 0 || tag >= mux->mwait){
85 2d2e5c71 2006-06-25 devnull fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
86 2d2e5c71 2006-06-25 devnull /* must leak packet! don't know how to free it! */
87 2d2e5c71 2006-06-25 devnull return;
88 2d2e5c71 2006-06-25 devnull }
89 2d2e5c71 2006-06-25 devnull r2 = mux->wait[tag];
90 2d2e5c71 2006-06-25 devnull if(r2 == nil || r2->prev == nil){
91 2d2e5c71 2006-06-25 devnull fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag);
92 2d2e5c71 2006-06-25 devnull /* must leak packet! don't know how to free it! */
93 2d2e5c71 2006-06-25 devnull return;
94 fa325e9b 2020-01-10 cross }
95 2d2e5c71 2006-06-25 devnull r2->p = p;
96 2d2e5c71 2006-06-25 devnull dequeue(mux, r2);
97 2d2e5c71 2006-06-25 devnull rwakeup(&r2->r);
98 2d2e5c71 2006-06-25 devnull }
99 2d2e5c71 2006-06-25 devnull
100 2d2e5c71 2006-06-25 devnull void
101 2d2e5c71 2006-06-25 devnull electmuxer(Mux *mux)
102 2d2e5c71 2006-06-25 devnull {
103 3a194702 2006-11-04 devnull Muxrpc *rpc;
104 3a194702 2006-11-04 devnull
105 2d2e5c71 2006-06-25 devnull /* if there is anyone else sleeping, wake them to mux */
106 3a194702 2006-11-04 devnull for(rpc=mux->sleep.next; rpc != &mux->sleep; rpc = rpc->next){
107 3a194702 2006-11-04 devnull if(!rpc->async){
108 3a194702 2006-11-04 devnull mux->muxer = rpc;
109 3a194702 2006-11-04 devnull rwakeup(&rpc->r);
110 3a194702 2006-11-04 devnull return;
111 3a194702 2006-11-04 devnull }
112 3a194702 2006-11-04 devnull }
113 3a194702 2006-11-04 devnull mux->muxer = nil;
114 2d2e5c71 2006-06-25 devnull }
115 2d2e5c71 2006-06-25 devnull
116 2d2e5c71 2006-06-25 devnull void*
117 2d2e5c71 2006-06-25 devnull muxrpc(Mux *mux, void *tx)
118 2d2e5c71 2006-06-25 devnull {
119 2d2e5c71 2006-06-25 devnull int tag;
120 2d2e5c71 2006-06-25 devnull Muxrpc *r;
121 2d2e5c71 2006-06-25 devnull void *p;
122 2d2e5c71 2006-06-25 devnull
123 2d2e5c71 2006-06-25 devnull if((r = allocmuxrpc(mux)) == nil)
124 2d2e5c71 2006-06-25 devnull return nil;
125 2d2e5c71 2006-06-25 devnull
126 2d2e5c71 2006-06-25 devnull if((tag = tagmuxrpc(r, tx)) < 0)
127 2d2e5c71 2006-06-25 devnull return nil;
128 2d2e5c71 2006-06-25 devnull
129 2d2e5c71 2006-06-25 devnull qlock(&mux->lk);
130 d3df3087 2003-12-06 devnull /* wait for our packet */
131 2d2e5c71 2006-06-25 devnull while(mux->muxer && mux->muxer != r && !r->p)
132 d3df3087 2003-12-06 devnull rsleep(&r->r);
133 d3df3087 2003-12-06 devnull
134 d3df3087 2003-12-06 devnull /* if not done, there's no muxer: start muxing */
135 d3df3087 2003-12-06 devnull if(!r->p){
136 2d2e5c71 2006-06-25 devnull if(mux->muxer != nil && mux->muxer != r)
137 d3df3087 2003-12-06 devnull abort();
138 2d2e5c71 2006-06-25 devnull mux->muxer = r;
139 d3df3087 2003-12-06 devnull while(!r->p){
140 d3df3087 2003-12-06 devnull qunlock(&mux->lk);
141 3a194702 2006-11-04 devnull _muxrecv(mux, 1, &p);
142 2d2e5c71 2006-06-25 devnull if(p == nil){
143 2d2e5c71 2006-06-25 devnull /* eof -- just give up and pass the buck */
144 2d2e5c71 2006-06-25 devnull qlock(&mux->lk);
145 d3df3087 2003-12-06 devnull dequeue(mux, r);
146 d3df3087 2003-12-06 devnull break;
147 d3df3087 2003-12-06 devnull }
148 2d2e5c71 2006-06-25 devnull muxmsgandqlock(mux, p);
149 d3df3087 2003-12-06 devnull }
150 2d2e5c71 2006-06-25 devnull electmuxer(mux);
151 d3df3087 2003-12-06 devnull }
152 d3df3087 2003-12-06 devnull p = r->p;
153 d165fa68 2005-01-04 devnull puttag(mux, r);
154 d3df3087 2003-12-06 devnull qunlock(&mux->lk);
155 a28c0548 2005-07-13 devnull if(p == nil)
156 a28c0548 2005-07-13 devnull werrstr("unexpected eof");
157 d3df3087 2003-12-06 devnull return p;
158 d3df3087 2003-12-06 devnull }
159 d3df3087 2003-12-06 devnull
160 2d2e5c71 2006-06-25 devnull Muxrpc*
161 2d2e5c71 2006-06-25 devnull muxrpcstart(Mux *mux, void *tx)
162 2d2e5c71 2006-06-25 devnull {
163 2d2e5c71 2006-06-25 devnull int tag;
164 2d2e5c71 2006-06-25 devnull Muxrpc *r;
165 2d2e5c71 2006-06-25 devnull
166 2d2e5c71 2006-06-25 devnull if((r = allocmuxrpc(mux)) == nil)
167 2d2e5c71 2006-06-25 devnull return nil;
168 3a194702 2006-11-04 devnull r->async = 1;
169 2d2e5c71 2006-06-25 devnull if((tag = tagmuxrpc(r, tx)) < 0)
170 2d2e5c71 2006-06-25 devnull return nil;
171 2d2e5c71 2006-06-25 devnull return r;
172 2d2e5c71 2006-06-25 devnull }
173 2d2e5c71 2006-06-25 devnull
174 3a194702 2006-11-04 devnull int
175 3a194702 2006-11-04 devnull muxrpccanfinish(Muxrpc *r, void **vp)
176 2d2e5c71 2006-06-25 devnull {
177 3a194702 2006-11-04 devnull void *p;
178 2d2e5c71 2006-06-25 devnull Mux *mux;
179 3a194702 2006-11-04 devnull int ret;
180 3a194702 2006-11-04 devnull
181 2d2e5c71 2006-06-25 devnull mux = r->mux;
182 2d2e5c71 2006-06-25 devnull qlock(&mux->lk);
183 3a194702 2006-11-04 devnull ret = 1;
184 2d2e5c71 2006-06-25 devnull if(!r->p && !mux->muxer){
185 2d2e5c71 2006-06-25 devnull mux->muxer = r;
186 2d2e5c71 2006-06-25 devnull while(!r->p){
187 2d2e5c71 2006-06-25 devnull qunlock(&mux->lk);
188 3a194702 2006-11-04 devnull p = nil;
189 3a194702 2006-11-04 devnull if(!_muxrecv(mux, 0, &p))
190 3a194702 2006-11-04 devnull ret = 0;
191 2d2e5c71 2006-06-25 devnull if(p == nil){
192 2d2e5c71 2006-06-25 devnull qlock(&mux->lk);
193 2d2e5c71 2006-06-25 devnull break;
194 2d2e5c71 2006-06-25 devnull }
195 2d2e5c71 2006-06-25 devnull muxmsgandqlock(mux, p);
196 2d2e5c71 2006-06-25 devnull }
197 2d2e5c71 2006-06-25 devnull electmuxer(mux);
198 2d2e5c71 2006-06-25 devnull }
199 2d2e5c71 2006-06-25 devnull p = r->p;
200 2d2e5c71 2006-06-25 devnull if(p)
201 2d2e5c71 2006-06-25 devnull puttag(mux, r);
202 2d2e5c71 2006-06-25 devnull qunlock(&mux->lk);
203 3a194702 2006-11-04 devnull *vp = p;
204 3a194702 2006-11-04 devnull return ret;
205 2d2e5c71 2006-06-25 devnull }
206 2d2e5c71 2006-06-25 devnull
207 d3df3087 2003-12-06 devnull static void
208 d3df3087 2003-12-06 devnull enqueue(Mux *mux, Muxrpc *r)
209 d3df3087 2003-12-06 devnull {
210 d3df3087 2003-12-06 devnull r->next = mux->sleep.next;
211 d3df3087 2003-12-06 devnull r->prev = &mux->sleep;
212 d3df3087 2003-12-06 devnull r->next->prev = r;
213 d3df3087 2003-12-06 devnull r->prev->next = r;
214 d3df3087 2003-12-06 devnull }
215 d3df3087 2003-12-06 devnull
216 d3df3087 2003-12-06 devnull static void
217 d3df3087 2003-12-06 devnull dequeue(Mux *mux, Muxrpc *r)
218 d3df3087 2003-12-06 devnull {
219 d3df3087 2003-12-06 devnull r->next->prev = r->prev;
220 d3df3087 2003-12-06 devnull r->prev->next = r->next;
221 d3df3087 2003-12-06 devnull r->prev = nil;
222 d3df3087 2003-12-06 devnull r->next = nil;
223 d3df3087 2003-12-06 devnull }
224 d3df3087 2003-12-06 devnull
225 fa325e9b 2020-01-10 cross static int
226 d3df3087 2003-12-06 devnull gettag(Mux *mux, Muxrpc *r)
227 d3df3087 2003-12-06 devnull {
228 ceb04770 2003-12-09 devnull int i, mw;
229 ceb04770 2003-12-09 devnull Muxrpc **w;
230 d3df3087 2003-12-06 devnull
231 ceb04770 2003-12-09 devnull for(;;){
232 ceb04770 2003-12-09 devnull /* wait for a free tag */
233 ceb04770 2003-12-09 devnull while(mux->nwait == mux->mwait){
234 ceb04770 2003-12-09 devnull if(mux->mwait < mux->maxtag-mux->mintag){
235 ceb04770 2003-12-09 devnull mw = mux->mwait;
236 ceb04770 2003-12-09 devnull if(mw == 0)
237 ceb04770 2003-12-09 devnull mw = 1;
238 ceb04770 2003-12-09 devnull else
239 ceb04770 2003-12-09 devnull mw <<= 1;
240 ceb04770 2003-12-09 devnull w = realloc(mux->wait, mw*sizeof(w[0]));
241 ceb04770 2003-12-09 devnull if(w == nil)
242 ceb04770 2003-12-09 devnull return -1;
243 e95a7088 2003-12-09 devnull memset(w+mux->mwait, 0, (mw-mux->mwait)*sizeof(w[0]));
244 ceb04770 2003-12-09 devnull mux->wait = w;
245 ceb04770 2003-12-09 devnull mux->freetag = mux->mwait;
246 ceb04770 2003-12-09 devnull mux->mwait = mw;
247 ceb04770 2003-12-09 devnull break;
248 ceb04770 2003-12-09 devnull }
249 ceb04770 2003-12-09 devnull rsleep(&mux->tagrend);
250 d3df3087 2003-12-06 devnull }
251 ceb04770 2003-12-09 devnull
252 ceb04770 2003-12-09 devnull i=mux->freetag;
253 ceb04770 2003-12-09 devnull if(mux->wait[i] == 0)
254 ceb04770 2003-12-09 devnull goto Found;
255 ceb04770 2003-12-09 devnull for(; i<mux->mwait; i++)
256 ceb04770 2003-12-09 devnull if(mux->wait[i] == 0)
257 ceb04770 2003-12-09 devnull goto Found;
258 ceb04770 2003-12-09 devnull for(i=0; i<mux->freetag; i++)
259 ceb04770 2003-12-09 devnull if(mux->wait[i] == 0)
260 ceb04770 2003-12-09 devnull goto Found;
261 ceb04770 2003-12-09 devnull /* should not fall out of while without free tag */
262 ceb04770 2003-12-09 devnull fprint(2, "libfs: nwait botch\n");
263 ceb04770 2003-12-09 devnull abort();
264 ceb04770 2003-12-09 devnull }
265 ceb04770 2003-12-09 devnull
266 ceb04770 2003-12-09 devnull Found:
267 ceb04770 2003-12-09 devnull mux->nwait++;
268 ceb04770 2003-12-09 devnull mux->wait[i] = r;
269 ceb04770 2003-12-09 devnull r->tag = i+mux->mintag;
270 0b22e9bd 2005-01-30 devnull return r->tag;
271 d3df3087 2003-12-06 devnull }
272 d3df3087 2003-12-06 devnull
273 d3df3087 2003-12-06 devnull static void
274 d3df3087 2003-12-06 devnull puttag(Mux *mux, Muxrpc *r)
275 d3df3087 2003-12-06 devnull {
276 ceb04770 2003-12-09 devnull int i;
277 ceb04770 2003-12-09 devnull
278 ceb04770 2003-12-09 devnull i = r->tag - mux->mintag;
279 ceb04770 2003-12-09 devnull assert(mux->wait[i] == r);
280 ceb04770 2003-12-09 devnull mux->wait[i] = nil;
281 d3df3087 2003-12-06 devnull mux->nwait--;
282 ceb04770 2003-12-09 devnull mux->freetag = i;
283 d3df3087 2003-12-06 devnull rwakeup(&mux->tagrend);
284 ceb04770 2003-12-09 devnull free(r);
285 d3df3087 2003-12-06 devnull }