Blob


1 /* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
2 /* See COPYRIGHT */
4 /*
5 * Generic RPC packet multiplexor. Inspired by but not derived from
6 * Plan 9 kernel. Originally developed as part of Tra, later used in
7 * libnventi, and then finally split out into a generic library.
8 */
10 #include <u.h>
11 #include <libc.h>
12 #include <mux.h>
14 static int gettag(Mux*, Muxrpc*);
15 static void puttag(Mux*, Muxrpc*);
16 static void enqueue(Mux*, Muxrpc*);
17 static void dequeue(Mux*, Muxrpc*);
19 void
20 muxinit(Mux *mux)
21 {
22 memset(&mux->lk, 0, sizeof(Mux)-offsetof(Mux, lk));
23 mux->tagrend.l = &mux->lk;
24 mux->rpcfork.l = &mux->lk;
25 mux->sleep.next = &mux->sleep;
26 mux->sleep.prev = &mux->sleep;
27 }
29 void*
30 muxrpc(Mux *mux, void *tx)
31 {
32 int tag;
33 Muxrpc *r, *r2;
34 void *p;
36 /* must malloc because stack could be private */
37 r = mallocz(sizeof(Muxrpc), 1);
38 if(r == nil){
39 werrstr("mallocz: %r");
40 return nil;
41 }
42 r->r.l = &mux->lk;
44 /* assign the tag, add selves to response queue */
45 qlock(&mux->lk);
46 tag = gettag(mux, r);
47 //print("gettag %p %d\n", r, tag);
48 enqueue(mux, r);
49 qunlock(&mux->lk);
51 /* actually send the packet */
52 if(tag < 0 || mux->settag(mux, tx, tag) < 0 || _muxsend(mux, tx) < 0){
53 werrstr("settag/send tag %d: %r", tag);
54 fprint(2, "%r\n");
55 qlock(&mux->lk);
56 dequeue(mux, r);
57 puttag(mux, r);
58 qunlock(&mux->lk);
59 return nil;
60 }
62 qlock(&mux->lk);
63 /* wait for our packet */
64 while(mux->muxer && !r->p){
65 rsleep(&r->r);
66 }
68 /* if not done, there's no muxer: start muxing */
69 if(!r->p){
70 if(mux->muxer)
71 abort();
72 mux->muxer = 1;
73 while(!r->p){
74 qunlock(&mux->lk);
75 p = _muxrecv(mux);
76 if(p)
77 tag = mux->gettag(mux, p) - mux->mintag;
78 else
79 tag = ~0;
80 //print("mux tag %d\n", tag);
81 qlock(&mux->lk);
82 if(p == nil){ /* eof -- just give up and pass the buck */
83 dequeue(mux, r);
84 break;
85 }
86 /* hand packet to correct sleeper */
87 if(tag < 0 || tag >= mux->mwait){
88 fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
89 /* must leak packet! don't know how to free it! */
90 continue;
91 }
92 r2 = mux->wait[tag];
93 if(r2 == nil || r2->prev == nil){
94 fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag);
95 /* must leak packet! don't know how to free it! */
96 continue;
97 }
98 r2->p = p;
99 dequeue(mux, r2);
100 rwakeup(&r2->r);
102 mux->muxer = 0;
104 /* if there is anyone else sleeping, wake them to mux */
105 if(mux->sleep.next != &mux->sleep)
106 rwakeup(&mux->sleep.next->r);
108 //print("finished %p\n", r);
109 p = r->p;
110 puttag(mux, r);
111 qunlock(&mux->lk);
112 if(p == nil)
113 werrstr("unexpected eof");
114 return p;
117 static void
118 enqueue(Mux *mux, Muxrpc *r)
120 r->next = mux->sleep.next;
121 r->prev = &mux->sleep;
122 r->next->prev = r;
123 r->prev->next = r;
126 static void
127 dequeue(Mux *mux, Muxrpc *r)
129 r->next->prev = r->prev;
130 r->prev->next = r->next;
131 r->prev = nil;
132 r->next = nil;
135 static int
136 gettag(Mux *mux, Muxrpc *r)
138 int i, mw;
139 Muxrpc **w;
141 for(;;){
142 /* wait for a free tag */
143 while(mux->nwait == mux->mwait){
144 if(mux->mwait < mux->maxtag-mux->mintag){
145 mw = mux->mwait;
146 if(mw == 0)
147 mw = 1;
148 else
149 mw <<= 1;
150 w = realloc(mux->wait, mw*sizeof(w[0]));
151 if(w == nil)
152 return -1;
153 memset(w+mux->mwait, 0, (mw-mux->mwait)*sizeof(w[0]));
154 mux->wait = w;
155 mux->freetag = mux->mwait;
156 mux->mwait = mw;
157 break;
159 rsleep(&mux->tagrend);
162 i=mux->freetag;
163 if(mux->wait[i] == 0)
164 goto Found;
165 for(; i<mux->mwait; i++)
166 if(mux->wait[i] == 0)
167 goto Found;
168 for(i=0; i<mux->freetag; i++)
169 if(mux->wait[i] == 0)
170 goto Found;
171 /* should not fall out of while without free tag */
172 fprint(2, "libfs: nwait botch\n");
173 abort();
176 Found:
177 mux->nwait++;
178 mux->wait[i] = r;
179 r->tag = i+mux->mintag;
180 return r->tag;
183 static void
184 puttag(Mux *mux, Muxrpc *r)
186 int i;
188 i = r->tag - mux->mintag;
189 assert(mux->wait[i] == r);
190 mux->wait[i] = nil;
191 mux->nwait--;
192 mux->freetag = i;
193 rwakeup(&mux->tagrend);
194 free(r);