1 /* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
5 * Generic RPC packet multiplexor. Inspired by but not derived from
6 * Plan 9 kernel. Originally developed as part of Tra, later used in
7 * libnventi, and then finally split out into a generic library.
14 static int gettag(Mux*, Muxrpc*);
15 static void puttag(Mux*, Muxrpc*);
16 static void enqueue(Mux*, Muxrpc*);
17 static void dequeue(Mux*, Muxrpc*);
22 memset(&mux->lk, 0, sizeof(Mux)-offsetof(Mux, lk));
23 mux->tagrend.l = &mux->lk;
24 mux->rpcfork.l = &mux->lk;
25 mux->sleep.next = &mux->sleep;
26 mux->sleep.prev = &mux->sleep;
34 /* must malloc because stack could be private */
35 r = mallocz(sizeof(Muxrpc), 1);
37 werrstr("mallocz: %r");
48 tagmuxrpc(Muxrpc *r, void *tx)
54 /* assign the tag, add selves to response queue */
57 /*print("gettag %p %d\n", r, tag); */
61 /* actually send the packet */
62 if(tag < 0 || mux->settag(mux, tx, tag) < 0 || _muxsend(mux, tx) < 0){
63 werrstr("settag/send tag %d: %r", tag);
75 muxmsgandqlock(Mux *mux, void *p)
80 tag = mux->gettag(mux, p) - mux->mintag;
81 /*print("mux tag %d\n", tag); */
83 /* hand packet to correct sleeper */
84 if(tag < 0 || tag >= mux->mwait){
85 fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
86 /* must leak packet! don't know how to free it! */
90 if(r2 == nil || r2->prev == nil){
91 fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag);
92 /* must leak packet! don't know how to free it! */
103 /* if there is anyone else sleeping, wake them to mux */
104 if(mux->sleep.next != &mux->sleep){
105 mux->muxer = mux->sleep.next;
106 rwakeup(&mux->muxer->r);
112 muxrpc(Mux *mux, void *tx)
118 if((r = allocmuxrpc(mux)) == nil)
121 if((tag = tagmuxrpc(r, tx)) < 0)
125 /* wait for our packet */
126 while(mux->muxer && mux->muxer != r && !r->p)
129 /* if not done, there's no muxer: start muxing */
131 if(mux->muxer != nil && mux->muxer != r)
136 p = _muxrecv(mux, 1);
138 /* eof -- just give up and pass the buck */
143 muxmsgandqlock(mux, p);
147 /*print("finished %p\n", r); */
152 werrstr("unexpected eof");
157 muxrpcstart(Mux *mux, void *tx)
162 if((r = allocmuxrpc(mux)) == nil)
164 if((tag = tagmuxrpc(r, tx)) < 0)
170 muxrpccanfinish(Muxrpc *r)
177 if(!r->p && !mux->muxer){
181 p = _muxrecv(mux, 0);
186 muxmsgandqlock(mux, p);
198 enqueue(Mux *mux, Muxrpc *r)
200 r->next = mux->sleep.next;
201 r->prev = &mux->sleep;
207 dequeue(Mux *mux, Muxrpc *r)
209 r->next->prev = r->prev;
210 r->prev->next = r->next;
216 gettag(Mux *mux, Muxrpc *r)
222 /* wait for a free tag */
223 while(mux->nwait == mux->mwait){
224 if(mux->mwait < mux->maxtag-mux->mintag){
230 w = realloc(mux->wait, mw*sizeof(w[0]));
233 memset(w+mux->mwait, 0, (mw-mux->mwait)*sizeof(w[0]));
235 mux->freetag = mux->mwait;
239 rsleep(&mux->tagrend);
243 if(mux->wait[i] == 0)
245 for(; i<mux->mwait; i++)
246 if(mux->wait[i] == 0)
248 for(i=0; i<mux->freetag; i++)
249 if(mux->wait[i] == 0)
251 /* should not fall out of while without free tag */
252 fprint(2, "libfs: nwait botch\n");
259 r->tag = i+mux->mintag;
264 puttag(Mux *mux, Muxrpc *r)
268 i = r->tag - mux->mintag;
269 assert(mux->wait[i] == r);
273 rwakeup(&mux->tagrend);