Blob


1 /*
2 * Multiplexed Venti client. It would be nice if we
3 * could turn this into a generic library routine rather
4 * than keep it Venti specific. A user-level 9P client
5 * could use something like this too.
6 *
7 * (Actually it does - this should be replaced with libmux,
8 * which should be renamed librpcmux.)
9 *
10 * This is a little more complicated than it might be
11 * because we want it to work well within and without libthread.
12 *
13 * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel.
14 */
16 #include <u.h>
17 #include <libc.h>
18 #include <venti.h>
20 typedef struct Rwait Rwait;
21 struct Rwait
22 {
23 Rendez r;
24 Packet *p;
25 int done;
26 int sleeping;
27 };
29 static int gettag(VtConn*, Rwait*);
30 static void puttag(VtConn*, Rwait*, int);
31 static void muxrpc(VtConn*, Packet*);
33 Packet*
34 _vtrpc(VtConn *z, Packet *p, VtFcall *tx)
35 {
36 int i;
37 uchar tag, buf[2], *top;
38 Rwait *r, *rr;
40 /* must malloc because stack could be private */
41 r = vtmallocz(sizeof(Rwait));
43 qlock(&z->lk);
44 r->r.l = &z->lk;
45 tag = gettag(z, r);
46 if(tx){
47 /* vtfcallrpc can't print packet because it doesn't have tag */
48 tx->tag = tag;
49 if(chattyventi)
50 fprint(2, "%s -> %F\n", argv0, tx);
51 }
53 /* slam tag into packet */
54 top = packetpeek(p, buf, 0, 2);
55 if(top == nil){
56 packetfree(p);
57 return nil;
58 }
59 if(top == buf){
60 werrstr("first two bytes must be in same packet fragment");
61 packetfree(p);
62 return nil;
63 }
64 top[1] = tag;
65 qunlock(&z->lk);
66 if(vtsend(z, p) < 0)
67 return nil;
69 qlock(&z->lk);
70 /* wait for the muxer to give us our packet */
71 r->sleeping = 1;
72 z->nsleep++;
73 while(z->muxer && !r->done)
74 rsleep(&r->r);
75 z->nsleep--;
76 r->sleeping = 0;
78 /* if not done, there's no muxer: start muxing */
79 if(!r->done){
80 if(z->muxer)
81 abort();
82 z->muxer = 1;
83 while(!r->done){
84 qunlock(&z->lk);
85 if((p = vtrecv(z)) == nil){
86 werrstr("unexpected eof on venti connection");
87 z->muxer = 0;
88 return nil;
89 }
90 qlock(&z->lk);
91 muxrpc(z, p);
92 }
93 z->muxer = 0;
94 /* if there is anyone else sleeping, wake first unfinished to mux */
95 if(z->nsleep)
96 for(i=0; i<256; i++){
97 rr = z->wait[i];
98 if(rr && rr->sleeping && !rr->done){
99 rwakeup(&rr->r);
100 break;
105 p = r->p;
106 puttag(z, r, tag);
107 vtfree(r);
108 qunlock(&z->lk);
109 return p;
112 Packet*
113 vtrpc(VtConn *z, Packet *p)
115 return _vtrpc(z, p, nil);
118 static int
119 gettag(VtConn *z, Rwait *r)
121 int i;
123 Again:
124 while(z->ntag == 256)
125 rsleep(&z->tagrend);
126 for(i=0; i<256; i++)
127 if(z->wait[i] == 0){
128 z->ntag++;
129 z->wait[i] = r;
130 return i;
132 fprint(2, "libventi: ntag botch\n");
133 goto Again;
136 static void
137 puttag(VtConn *z, Rwait *r, int tag)
139 assert(z->wait[tag] == r);
140 z->wait[tag] = nil;
141 z->ntag--;
142 rwakeup(&z->tagrend);
145 static void
146 muxrpc(VtConn *z, Packet *p)
148 uchar tag, buf[2], *top;
149 Rwait *r;
151 if((top = packetpeek(p, buf, 0, 2)) == nil){
152 fprint(2, "libventi: short packet in vtrpc\n");
153 packetfree(p);
154 return;
157 tag = top[1];
158 if((r = z->wait[tag]) == nil){
159 fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag);
160 abort();
161 packetfree(p);
162 return;
165 r->p = p;
166 r->done = 1;
167 rwakeup(&r->r);