Blob


1 /* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
2 /* See COPYRIGHT */
4 /*
5 * Generic RPC packet multiplexor. Inspired by but not derived from
6 * Plan 9 kernel. Originally developed as part of Tra, later used in
7 * libnventi, and then finally split out into a generic library.
8 */
10 #include <u.h>
11 #include <libc.h>
12 #include <mux.h>
14 static int gettag(Mux*, Muxrpc*);
15 static void puttag(Mux*, Muxrpc*);
16 static void enqueue(Mux*, Muxrpc*);
17 static void dequeue(Mux*, Muxrpc*);
19 void
20 muxinit(Mux *mux)
21 {
22 memset(&mux->lk, 0, sizeof(Mux)-offsetof(Mux, lk));
23 mux->tagrend.l = &mux->lk;
24 mux->rpcfork.l = &mux->lk;
25 mux->sleep.next = &mux->sleep;
26 mux->sleep.prev = &mux->sleep;
27 }
29 void*
30 muxrpc(Mux *mux, void *tx)
31 {
32 int tag;
33 Muxrpc *r, *r2;
34 void *p;
36 /* must malloc because stack could be private */
37 r = mallocz(sizeof(Muxrpc), 1);
38 if(r == nil)
39 return nil;
40 r->r.l = &mux->lk;
42 /* assign the tag */
43 qlock(&mux->lk);
44 tag = gettag(mux, r);
45 qunlock(&mux->lk);
46 if(tag < 0 || mux->settag(mux, tx, tag) < 0 || _muxsend(mux, tx) < 0){
47 qlock(&mux->lk);
48 puttag(mux, r);
49 qunlock(&mux->lk);
50 return nil;
51 }
53 /* add ourselves to sleep queue */
54 qlock(&mux->lk);
55 enqueue(mux, r);
57 /* wait for our packet */
58 while(mux->muxer && !r->p){
59 rsleep(&r->r);
60 }
62 /* if not done, there's no muxer: start muxing */
63 if(!r->p){
64 if(mux->muxer)
65 abort();
66 mux->muxer = 1;
67 while(!r->p){
68 qunlock(&mux->lk);
69 p = _muxrecv(mux);
70 if(p)
71 tag = mux->gettag(mux, p);
72 else
73 tag = ~0;
74 qlock(&mux->lk);
75 if(p == nil){ /* eof -- just give up and pass the buck */
76 dequeue(mux, r);
77 break;
78 }
79 /* hand packet to correct sleeper */
80 if(tag < 0 || tag >= mux->mwait){
81 fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
82 /* must leak packet! don't know how to free it! */
83 continue;
84 }
85 r2 = mux->wait[tag];
86 r2->p = p;
87 dequeue(mux, r2);
88 rwakeup(&r2->r);
89 }
90 mux->muxer = 0;
92 /* if there is anyone else sleeping, wake them to mux */
93 if(mux->sleep.next != &mux->sleep)
94 rwakeup(&mux->sleep.next->r);
95 }
96 p = r->p;
97 puttag(mux, r);
98 qunlock(&mux->lk);
99 return p;
102 static void
103 enqueue(Mux *mux, Muxrpc *r)
105 r->next = mux->sleep.next;
106 r->prev = &mux->sleep;
107 r->next->prev = r;
108 r->prev->next = r;
111 static void
112 dequeue(Mux *mux, Muxrpc *r)
114 r->next->prev = r->prev;
115 r->prev->next = r->next;
116 r->prev = nil;
117 r->next = nil;
120 static int
121 gettag(Mux *mux, Muxrpc *r)
123 int i, mw;
124 Muxrpc **w;
126 for(;;){
127 /* wait for a free tag */
128 while(mux->nwait == mux->mwait){
129 if(mux->mwait < mux->maxtag-mux->mintag){
130 mw = mux->mwait;
131 if(mw == 0)
132 mw = 1;
133 else
134 mw <<= 1;
135 w = realloc(mux->wait, mw*sizeof(w[0]));
136 if(w == nil)
137 return -1;
138 memset(w+mux->mwait, 0, (mw-mux->mwait)*sizeof(w[0]));
139 mux->wait = w;
140 mux->freetag = mux->mwait;
141 mux->mwait = mw;
142 break;
144 rsleep(&mux->tagrend);
147 i=mux->freetag;
148 if(mux->wait[i] == 0)
149 goto Found;
150 for(; i<mux->mwait; i++)
151 if(mux->wait[i] == 0)
152 goto Found;
153 for(i=0; i<mux->freetag; i++)
154 if(mux->wait[i] == 0)
155 goto Found;
156 /* should not fall out of while without free tag */
157 fprint(2, "libfs: nwait botch\n");
158 abort();
161 Found:
162 mux->nwait++;
163 mux->wait[i] = r;
164 r->tag = i+mux->mintag;
165 return i;
168 static void
169 puttag(Mux *mux, Muxrpc *r)
171 int i;
173 i = r->tag - mux->mintag;
174 assert(mux->wait[i] == r);
175 mux->wait[i] = nil;
176 mux->nwait--;
177 mux->freetag = i;
178 rwakeup(&mux->tagrend);
179 free(r);