Blob


1 /*
2 * Multiplexed Venti client. It would be nice if we
3 * could turn this into a generic library routine rather
4 * than keep it Venti specific. A user-level 9P client
5 * could use something like this too.
6 *
7 * (Actually it does - this should be replaced with libmux,
8 * which should be renamed librpcmux.)
9 *
10 * This is a little more complicated than it might be
11 * because we want it to work well within and without libthread.
12 *
13 * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel.
14 */
16 #include <u.h>
17 #include <libc.h>
18 #include <venti.h>
20 typedef struct Rwait Rwait;
21 struct Rwait
22 {
23 Rendez r;
24 Packet *p;
25 int done;
26 int sleeping;
27 };
29 static int gettag(VtConn*, Rwait*);
30 static void puttag(VtConn*, Rwait*, int);
31 static void muxrpc(VtConn*, Packet*);
32 Packet *vtrpc(VtConn*, Packet*);
34 Packet*
35 vtrpc(VtConn *z, Packet *p)
36 {
37 int i;
38 uchar tag, buf[2], *top;
39 Rwait *r;
41 /* must malloc because stack could be private */
42 r = vtmallocz(sizeof(Rwait));
44 qlock(&z->lk);
45 r->r.l = &z->lk;
46 tag = gettag(z, r);
48 /* slam tag into packet */
49 top = packetpeek(p, buf, 0, 2);
50 if(top == nil){
51 packetfree(p);
52 return nil;
53 }
54 if(top == buf){
55 werrstr("first two bytes must be in same packet fragment");
56 packetfree(p);
57 return nil;
58 }
59 top[1] = tag;
60 qunlock(&z->lk);
61 if(vtsend(z, p) < 0)
62 return nil;
64 qlock(&z->lk);
65 /* wait for the muxer to give us our packet */
66 r->sleeping = 1;
67 z->nsleep++;
68 while(z->muxer && !r->done)
69 rsleep(&r->r);
70 z->nsleep--;
71 r->sleeping = 0;
73 /* if not done, there's no muxer: start muxing */
74 if(!r->done){
75 if(z->muxer)
76 abort();
77 z->muxer = 1;
78 while(!r->done){
79 qunlock(&z->lk);
80 if((p = vtrecv(z)) == nil){
81 werrstr("unexpected eof on venti connection");
82 z->muxer = 0;
83 return nil;
84 }
85 qlock(&z->lk);
86 muxrpc(z, p);
87 }
88 z->muxer = 0;
89 /* if there is anyone else sleeping, wake them to mux */
90 if(z->nsleep){
91 for(i=0; i<256; i++)
92 if(z->wait[i] != nil && ((Rwait*)z->wait[i])->sleeping)
93 break;
94 if(i==256)
95 fprint(2, "libventi: nsleep botch\n");
96 else
97 rwakeup(&((Rwait*)z->wait[i])->r);
98 }
99 }
101 p = r->p;
102 puttag(z, r, tag);
103 vtfree(r);
104 qunlock(&z->lk);
105 return p;
108 static int
109 gettag(VtConn *z, Rwait *r)
111 int i;
113 Again:
114 while(z->ntag == 256)
115 rsleep(&z->tagrend);
116 for(i=0; i<256; i++)
117 if(z->wait[i] == 0){
118 z->ntag++;
119 z->wait[i] = r;
120 return i;
122 fprint(2, "libventi: ntag botch\n");
123 goto Again;
126 static void
127 puttag(VtConn *z, Rwait *r, int tag)
129 assert(z->wait[tag] == r);
130 z->wait[tag] = nil;
131 z->ntag--;
132 rwakeup(&z->tagrend);
135 static void
136 muxrpc(VtConn *z, Packet *p)
138 uchar tag, buf[2], *top;
139 Rwait *r;
141 if((top = packetpeek(p, buf, 0, 2)) == nil){
142 fprint(2, "libventi: short packet in vtrpc\n");
143 packetfree(p);
144 return;
147 tag = top[1];
148 if((r = z->wait[tag]) == nil){
149 fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag);
150 abort();
151 packetfree(p);
152 return;
155 r->p = p;
156 r->done = 1;
157 rwakeup(&r->r);