Blob


1 /* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
2 /* See COPYRIGHT */
4 /*
5 * Generic RPC packet multiplexor. Inspired by but not derived from
6 * Plan 9 kernel. Originally developed as part of Tra, later used in
7 * libnventi, and then finally split out into a generic library.
8 */
10 #include <u.h>
11 #include <libc.h>
12 #include <mux.h>
14 static int gettag(Mux*, Muxrpc*);
15 static void puttag(Mux*, Muxrpc*);
16 static void enqueue(Mux*, Muxrpc*);
17 static void dequeue(Mux*, Muxrpc*);
19 void
20 muxinit(Mux *mux)
21 {
22 memset(&mux->lk, 0, sizeof(Mux)-offsetof(Mux, lk));
23 mux->tagrend.l = &mux->lk;
24 mux->rpcfork.l = &mux->lk;
25 mux->sleep.next = &mux->sleep;
26 mux->sleep.prev = &mux->sleep;
27 }
29 static Muxrpc*
30 allocmuxrpc(Mux *mux)
31 {
32 Muxrpc *r;
34 /* must malloc because stack could be private */
35 r = mallocz(sizeof(Muxrpc), 1);
36 if(r == nil){
37 werrstr("mallocz: %r");
38 return nil;
39 }
40 r->mux = mux;
41 r->r.l = &mux->lk;
42 r->waiting = 1;
44 return r;
45 }
47 static int
48 tagmuxrpc(Muxrpc *r, void *tx)
49 {
50 int tag;
51 Mux *mux;
53 mux = r->mux;
54 /* assign the tag, add selves to response queue */
55 qlock(&mux->lk);
56 tag = gettag(mux, r);
57 /*print("gettag %p %d\n", r, tag); */
58 enqueue(mux, r);
59 qunlock(&mux->lk);
61 /* actually send the packet */
62 if(tag < 0 || mux->settag(mux, tx, tag) < 0 || _muxsend(mux, tx) < 0){
63 werrstr("settag/send tag %d: %r", tag);
64 fprint(2, "%r\n");
65 qlock(&mux->lk);
66 dequeue(mux, r);
67 puttag(mux, r);
68 qunlock(&mux->lk);
69 return -1;
70 }
71 return 0;
72 }
74 void
75 muxmsgandqlock(Mux *mux, void *p)
76 {
77 int tag;
78 Muxrpc *r2;
80 tag = mux->gettag(mux, p) - mux->mintag;
81 /*print("mux tag %d\n", tag); */
82 qlock(&mux->lk);
83 /* hand packet to correct sleeper */
84 if(tag < 0 || tag >= mux->mwait){
85 fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
86 /* must leak packet! don't know how to free it! */
87 return;
88 }
89 r2 = mux->wait[tag];
90 if(r2 == nil || r2->prev == nil){
91 fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag);
92 /* must leak packet! don't know how to free it! */
93 return;
94 }
95 r2->p = p;
96 dequeue(mux, r2);
97 rwakeup(&r2->r);
98 }
100 void
101 electmuxer(Mux *mux)
103 /* if there is anyone else sleeping, wake them to mux */
104 if(mux->sleep.next != &mux->sleep){
105 mux->muxer = mux->sleep.next;
106 rwakeup(&mux->muxer->r);
107 }else
108 mux->muxer = nil;
111 void*
112 muxrpc(Mux *mux, void *tx)
114 int tag;
115 Muxrpc *r;
116 void *p;
118 if((r = allocmuxrpc(mux)) == nil)
119 return nil;
121 if((tag = tagmuxrpc(r, tx)) < 0)
122 return nil;
124 qlock(&mux->lk);
125 /* wait for our packet */
126 while(mux->muxer && mux->muxer != r && !r->p)
127 rsleep(&r->r);
129 /* if not done, there's no muxer: start muxing */
130 if(!r->p){
131 if(mux->muxer != nil && mux->muxer != r)
132 abort();
133 mux->muxer = r;
134 while(!r->p){
135 qunlock(&mux->lk);
136 p = _muxrecv(mux, 1);
137 if(p == nil){
138 /* eof -- just give up and pass the buck */
139 qlock(&mux->lk);
140 dequeue(mux, r);
141 break;
143 muxmsgandqlock(mux, p);
145 electmuxer(mux);
147 /*print("finished %p\n", r); */
148 p = r->p;
149 puttag(mux, r);
150 qunlock(&mux->lk);
151 if(p == nil)
152 werrstr("unexpected eof");
153 return p;
156 Muxrpc*
157 muxrpcstart(Mux *mux, void *tx)
159 int tag;
160 Muxrpc *r;
162 if((r = allocmuxrpc(mux)) == nil)
163 return nil;
164 if((tag = tagmuxrpc(r, tx)) < 0)
165 return nil;
166 return r;
169 void*
170 muxrpccanfinish(Muxrpc *r)
172 char *p;
173 Mux *mux;
175 mux = r->mux;
176 qlock(&mux->lk);
177 if(!r->p && !mux->muxer){
178 mux->muxer = r;
179 while(!r->p){
180 qunlock(&mux->lk);
181 p = _muxrecv(mux, 0);
182 if(p == nil){
183 qlock(&mux->lk);
184 break;
186 muxmsgandqlock(mux, p);
188 electmuxer(mux);
190 p = r->p;
191 if(p)
192 puttag(mux, r);
193 qunlock(&mux->lk);
194 return p;
197 static void
198 enqueue(Mux *mux, Muxrpc *r)
200 r->next = mux->sleep.next;
201 r->prev = &mux->sleep;
202 r->next->prev = r;
203 r->prev->next = r;
206 static void
207 dequeue(Mux *mux, Muxrpc *r)
209 r->next->prev = r->prev;
210 r->prev->next = r->next;
211 r->prev = nil;
212 r->next = nil;
215 static int
216 gettag(Mux *mux, Muxrpc *r)
218 int i, mw;
219 Muxrpc **w;
221 for(;;){
222 /* wait for a free tag */
223 while(mux->nwait == mux->mwait){
224 if(mux->mwait < mux->maxtag-mux->mintag){
225 mw = mux->mwait;
226 if(mw == 0)
227 mw = 1;
228 else
229 mw <<= 1;
230 w = realloc(mux->wait, mw*sizeof(w[0]));
231 if(w == nil)
232 return -1;
233 memset(w+mux->mwait, 0, (mw-mux->mwait)*sizeof(w[0]));
234 mux->wait = w;
235 mux->freetag = mux->mwait;
236 mux->mwait = mw;
237 break;
239 rsleep(&mux->tagrend);
242 i=mux->freetag;
243 if(mux->wait[i] == 0)
244 goto Found;
245 for(; i<mux->mwait; i++)
246 if(mux->wait[i] == 0)
247 goto Found;
248 for(i=0; i<mux->freetag; i++)
249 if(mux->wait[i] == 0)
250 goto Found;
251 /* should not fall out of while without free tag */
252 fprint(2, "libfs: nwait botch\n");
253 abort();
256 Found:
257 mux->nwait++;
258 mux->wait[i] = r;
259 r->tag = i+mux->mintag;
260 return r->tag;
263 static void
264 puttag(Mux *mux, Muxrpc *r)
266 int i;
268 i = r->tag - mux->mintag;
269 assert(mux->wait[i] == r);
270 mux->wait[i] = nil;
271 mux->nwait--;
272 mux->freetag = i;
273 rwakeup(&mux->tagrend);
274 free(r);