Blob


1 /*
2 * Multiplexed Venti client. It would be nice if we
3 * could turn this into a generic library routine rather
4 * than keep it Venti specific. A user-level 9P client
5 * could use something like this too.
6 *
7 * (Actually it does - this should be replaced with libmux,
8 * which should be renamed librpcmux.)
9 *
10 * This is a little more complicated than it might be
11 * because we want it to work well within and without libthread.
12 *
13 * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel.
14 */
16 #include <u.h>
17 #include <libc.h>
18 #include <venti.h>
20 typedef struct Rwait Rwait;
21 struct Rwait
22 {
23 Rendez r;
24 Packet *p;
25 int done;
26 int sleeping;
27 };
29 static int gettag(VtConn*, Rwait*);
30 static void puttag(VtConn*, Rwait*, int);
31 static void muxrpc(VtConn*, Packet*);
33 Packet*
34 _vtrpc(VtConn *z, Packet *p, VtFcall *tx)
35 {
36 int i;
37 uchar tag, buf[2], *top;
38 Rwait *r, *rr;
40 if(z == nil){
41 werrstr("not connected");
42 packetfree(p);
43 return nil;
44 }
46 /* must malloc because stack could be private */
47 r = vtmallocz(sizeof(Rwait));
49 qlock(&z->lk);
50 r->r.l = &z->lk;
51 tag = gettag(z, r);
52 if(tx){
53 /* vtfcallrpc can't print packet because it doesn't have tag */
54 tx->tag = tag;
55 if(chattyventi)
56 fprint(2, "%s -> %F\n", argv0, tx);
57 }
59 /* slam tag into packet */
60 top = packetpeek(p, buf, 0, 2);
61 if(top == nil){
62 packetfree(p);
63 return nil;
64 }
65 if(top == buf){
66 werrstr("first two bytes must be in same packet fragment");
67 packetfree(p);
68 vtfree(r);
69 return nil;
70 }
71 top[1] = tag;
72 qunlock(&z->lk);
73 if(vtsend(z, p) < 0){
74 vtfree(r);
75 return nil;
76 }
78 qlock(&z->lk);
79 /* wait for the muxer to give us our packet */
80 r->sleeping = 1;
81 z->nsleep++;
82 while(z->muxer && !r->done)
83 rsleep(&r->r);
84 z->nsleep--;
85 r->sleeping = 0;
87 /* if not done, there's no muxer: start muxing */
88 if(!r->done){
89 if(z->muxer)
90 abort();
91 z->muxer = 1;
92 while(!r->done){
93 qunlock(&z->lk);
94 if((p = vtrecv(z)) == nil){
95 werrstr("unexpected eof on venti connection");
96 z->muxer = 0;
97 vtfree(r);
98 return nil;
99 }
100 qlock(&z->lk);
101 muxrpc(z, p);
103 z->muxer = 0;
104 /* if there is anyone else sleeping, wake first unfinished to mux */
105 if(z->nsleep)
106 for(i=0; i<256; i++){
107 rr = z->wait[i];
108 if(rr && rr->sleeping && !rr->done){
109 rwakeup(&rr->r);
110 break;
115 p = r->p;
116 puttag(z, r, tag);
117 vtfree(r);
118 qunlock(&z->lk);
119 return p;
122 Packet*
123 vtrpc(VtConn *z, Packet *p)
125 return _vtrpc(z, p, nil);
128 static int
129 gettag(VtConn *z, Rwait *r)
131 int i;
133 Again:
134 while(z->ntag == 256)
135 rsleep(&z->tagrend);
136 for(i=0; i<256; i++)
137 if(z->wait[i] == 0){
138 z->ntag++;
139 z->wait[i] = r;
140 return i;
142 fprint(2, "libventi: ntag botch\n");
143 goto Again;
146 static void
147 puttag(VtConn *z, Rwait *r, int tag)
149 assert(z->wait[tag] == r);
150 z->wait[tag] = nil;
151 z->ntag--;
152 rwakeup(&z->tagrend);
155 static void
156 muxrpc(VtConn *z, Packet *p)
158 uchar tag, buf[2], *top;
159 Rwait *r;
161 if((top = packetpeek(p, buf, 0, 2)) == nil){
162 fprint(2, "libventi: short packet in vtrpc\n");
163 packetfree(p);
164 return;
167 tag = top[1];
168 if((r = z->wait[tag]) == nil){
169 fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag);
170 abort();
171 packetfree(p);
172 return;
175 r->p = p;
176 r->done = 1;
177 rwakeup(&r->r);