1 /* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
5 * Generic RPC packet multiplexor. Inspired by but not derived from
6 * Plan 9 kernel. Originally developed as part of Tra, later used in
7 * libnventi, and then finally split out into a generic library.
14 static int gettag(Mux*, Muxrpc*);
15 static void puttag(Mux*, Muxrpc*);
16 static void enqueue(Mux*, Muxrpc*);
17 static void dequeue(Mux*, Muxrpc*);
22 memset(&mux->lk, 0, sizeof(Mux)-offsetof(Mux, lk));
23 mux->tagrend.l = &mux->lk;
24 mux->rpcfork.l = &mux->lk;
25 mux->sleep.next = &mux->sleep;
26 mux->sleep.prev = &mux->sleep;
30 muxrpc(Mux *mux, void *tx)
36 /* must malloc because stack could be private */
37 r = mallocz(sizeof(Muxrpc), 1);
46 if(tag < 0 || mux->settag(mux, tx, tag) < 0 || _muxsend(mux, tx) < 0){
53 /* add ourselves to sleep queue */
57 /* wait for our packet */
58 while(mux->muxer && !r->p)
61 /* if not done, there's no muxer: start muxing */
70 tag = mux->gettag(mux, p);
74 if(p == nil){ /* eof -- just give up and pass the buck */
78 /* hand packet to correct sleeper */
79 if(tag < 0 || tag >= mux->mwait){
80 fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
81 /* must leak packet! don't know how to free it! */
91 /* if there is anyone else sleeping, wake them to mux */
92 if(mux->sleep.next != &mux->sleep)
93 rwakeup(&mux->sleep.next->r);
102 enqueue(Mux *mux, Muxrpc *r)
104 r->next = mux->sleep.next;
105 r->prev = &mux->sleep;
111 dequeue(Mux *mux, Muxrpc *r)
113 r->next->prev = r->prev;
114 r->prev->next = r->next;
120 gettag(Mux *mux, Muxrpc *r)
126 /* wait for a free tag */
127 while(mux->nwait == mux->mwait){
128 if(mux->mwait < mux->maxtag-mux->mintag){
134 w = realloc(mux->wait, mw*sizeof(w[0]));
137 memset(w+mux->mwait, 0, (mw-mux->mwait)*sizeof(w[0]));
139 mux->freetag = mux->mwait;
143 rsleep(&mux->tagrend);
147 if(mux->wait[i] == 0)
149 for(; i<mux->mwait; i++)
150 if(mux->wait[i] == 0)
152 for(i=0; i<mux->freetag; i++)
153 if(mux->wait[i] == 0)
155 /* should not fall out of while without free tag */
156 fprint(2, "libfs: nwait botch\n");
163 r->tag = i+mux->mintag;
168 puttag(Mux *mux, Muxrpc *r)
172 i = r->tag - mux->mintag;
173 assert(mux->wait[i] == r);
177 rwakeup(&mux->tagrend);