Blob


1 /*
2 * Multiplexed Venti client. It would be nice if we
3 * could turn this into a generic library routine rather
4 * than keep it Venti specific. A user-level 9P client
5 * could use something like this too.
6 *
7 * (Actually it does - this should be replaced with libmux,
8 * which should be renamed librpcmux.)
9 *
10 * This is a little more complicated than it might be
11 * because we want it to work well within and without libthread.
12 *
13 * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel.
14 */
16 #include <u.h>
17 #include <libc.h>
18 #include <venti.h>
20 typedef struct Rwait Rwait;
21 struct Rwait
22 {
23 Rendez r;
24 Packet *p;
25 int done;
26 int sleeping;
27 };
29 static int gettag(VtConn*, Rwait*);
30 static void puttag(VtConn*, Rwait*, int);
31 static void muxrpc(VtConn*, Packet*);
32 Packet *vtrpc(VtConn*, Packet*);
34 Packet*
35 vtrpc(VtConn *z, Packet *p)
36 {
37 int i;
38 uchar tag, buf[2], *top;
39 Rwait *r, *rr;
41 /* must malloc because stack could be private */
42 r = vtmallocz(sizeof(Rwait));
44 qlock(&z->lk);
45 r->r.l = &z->lk;
46 tag = gettag(z, r);
48 /* slam tag into packet */
49 top = packetpeek(p, buf, 0, 2);
50 if(top == nil){
51 packetfree(p);
52 return nil;
53 }
54 if(top == buf){
55 werrstr("first two bytes must be in same packet fragment");
56 packetfree(p);
57 return nil;
58 }
59 top[1] = tag;
60 qunlock(&z->lk);
61 if(vtsend(z, p) < 0)
62 return nil;
64 qlock(&z->lk);
65 /* wait for the muxer to give us our packet */
66 r->sleeping = 1;
67 z->nsleep++;
68 while(z->muxer && !r->done)
69 rsleep(&r->r);
70 z->nsleep--;
71 r->sleeping = 0;
73 /* if not done, there's no muxer: start muxing */
74 if(!r->done){
75 if(z->muxer)
76 abort();
77 z->muxer = 1;
78 while(!r->done){
79 qunlock(&z->lk);
80 if((p = vtrecv(z)) == nil){
81 werrstr("unexpected eof on venti connection");
82 z->muxer = 0;
83 return nil;
84 }
85 qlock(&z->lk);
86 muxrpc(z, p);
87 }
88 z->muxer = 0;
89 /* if there is anyone else sleeping, wake first unfinished to mux */
90 if(z->nsleep)
91 for(i=0; i<256; i++){
92 rr = z->wait[i];
93 if(rr && rr->sleeping && !rr->done){
94 rwakeup(&rr->r);
95 break;
96 }
97 }
98 }
100 p = r->p;
101 puttag(z, r, tag);
102 vtfree(r);
103 qunlock(&z->lk);
104 return p;
107 static int
108 gettag(VtConn *z, Rwait *r)
110 int i;
112 Again:
113 while(z->ntag == 256)
114 rsleep(&z->tagrend);
115 for(i=0; i<256; i++)
116 if(z->wait[i] == 0){
117 z->ntag++;
118 z->wait[i] = r;
119 return i;
121 fprint(2, "libventi: ntag botch\n");
122 goto Again;
125 static void
126 puttag(VtConn *z, Rwait *r, int tag)
128 assert(z->wait[tag] == r);
129 z->wait[tag] = nil;
130 z->ntag--;
131 rwakeup(&z->tagrend);
134 static void
135 muxrpc(VtConn *z, Packet *p)
137 uchar tag, buf[2], *top;
138 Rwait *r;
140 if((top = packetpeek(p, buf, 0, 2)) == nil){
141 fprint(2, "libventi: short packet in vtrpc\n");
142 packetfree(p);
143 return;
146 tag = top[1];
147 if((r = z->wait[tag]) == nil){
148 fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag);
149 abort();
150 packetfree(p);
151 return;
154 r->p = p;
155 r->done = 1;
156 rwakeup(&r->r);