Blob


1 /*
2 * Multiplexed Venti client. It would be nice if we
3 * could turn this into a generic library routine rather
4 * than keep it Venti specific. A user-level 9P client
5 * could use something like this too.
6 *
7 * (Actually it does - this should be replaced with libmux,
8 * which should be renamed librpcmux.)
9 *
10 * This is a little more complicated than it might be
11 * because we want it to work well within and without libthread.
12 *
13 * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel.
14 */
16 #include <u.h>
17 #include <libc.h>
18 #include <venti.h>
20 typedef struct Rwait Rwait;
21 struct Rwait
22 {
23 Rendez r;
24 Packet *p;
25 int done;
26 int sleeping;
27 };
29 static int gettag(VtConn*, Rwait*);
30 static void puttag(VtConn*, Rwait*, int);
31 static void muxrpc(VtConn*, Packet*);
33 Packet*
34 _vtrpc(VtConn *z, Packet *p, VtFcall *tx)
35 {
36 int i;
37 uchar tag, buf[2], *top;
38 Rwait *r, *rr;
40 /* must malloc because stack could be private */
41 r = vtmallocz(sizeof(Rwait));
43 qlock(&z->lk);
44 r->r.l = &z->lk;
45 tag = gettag(z, r);
46 if(tx){
47 /* vtfcallrpc can't print packet because it doesn't have tag */
48 tx->tag = tag;
49 if(chattyventi)
50 fprint(2, "%s -> %F\n", argv0, tx);
51 }
53 /* slam tag into packet */
54 top = packetpeek(p, buf, 0, 2);
55 if(top == nil){
56 packetfree(p);
57 return nil;
58 }
59 if(top == buf){
60 werrstr("first two bytes must be in same packet fragment");
61 packetfree(p);
62 vtfree(r);
63 return nil;
64 }
65 top[1] = tag;
66 qunlock(&z->lk);
67 if(vtsend(z, p) < 0){
68 vtfree(r);
69 return nil;
70 }
72 qlock(&z->lk);
73 /* wait for the muxer to give us our packet */
74 r->sleeping = 1;
75 z->nsleep++;
76 while(z->muxer && !r->done)
77 rsleep(&r->r);
78 z->nsleep--;
79 r->sleeping = 0;
81 /* if not done, there's no muxer: start muxing */
82 if(!r->done){
83 if(z->muxer)
84 abort();
85 z->muxer = 1;
86 while(!r->done){
87 qunlock(&z->lk);
88 if((p = vtrecv(z)) == nil){
89 werrstr("unexpected eof on venti connection");
90 z->muxer = 0;
91 vtfree(r);
92 return nil;
93 }
94 qlock(&z->lk);
95 muxrpc(z, p);
96 }
97 z->muxer = 0;
98 /* if there is anyone else sleeping, wake first unfinished to mux */
99 if(z->nsleep)
100 for(i=0; i<256; i++){
101 rr = z->wait[i];
102 if(rr && rr->sleeping && !rr->done){
103 rwakeup(&rr->r);
104 break;
109 p = r->p;
110 puttag(z, r, tag);
111 vtfree(r);
112 qunlock(&z->lk);
113 return p;
116 Packet*
117 vtrpc(VtConn *z, Packet *p)
119 return _vtrpc(z, p, nil);
122 static int
123 gettag(VtConn *z, Rwait *r)
125 int i;
127 Again:
128 while(z->ntag == 256)
129 rsleep(&z->tagrend);
130 for(i=0; i<256; i++)
131 if(z->wait[i] == 0){
132 z->ntag++;
133 z->wait[i] = r;
134 return i;
136 fprint(2, "libventi: ntag botch\n");
137 goto Again;
140 static void
141 puttag(VtConn *z, Rwait *r, int tag)
143 assert(z->wait[tag] == r);
144 z->wait[tag] = nil;
145 z->ntag--;
146 rwakeup(&z->tagrend);
149 static void
150 muxrpc(VtConn *z, Packet *p)
152 uchar tag, buf[2], *top;
153 Rwait *r;
155 if((top = packetpeek(p, buf, 0, 2)) == nil){
156 fprint(2, "libventi: short packet in vtrpc\n");
157 packetfree(p);
158 return;
161 tag = top[1];
162 if((r = z->wait[tag]) == nil){
163 fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag);
164 abort();
165 packetfree(p);
166 return;
169 r->p = p;
170 r->done = 1;
171 rwakeup(&r->r);