Blob


1 /*
2 * Multiplexed Venti client. It would be nice if we
3 * could turn this into a generic library routine rather
4 * than keep it Venti specific. A user-level 9P client
5 * could use something like this too.
6 *
7 * This is a little more complicated than it might be
8 * because we want it to work well within and without libthread.
9 *
10 * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel.
11 */
13 #include <u.h>
14 #include <libc.h>
15 #include <venti.h>
17 typedef struct Rwait Rwait;
18 struct Rwait
19 {
20 Rendez r;
21 Packet *p;
22 int done;
23 int sleeping;
24 };
26 static int gettag(VtConn*, Rwait*);
27 static void puttag(VtConn*, Rwait*, int);
28 static void muxrpc(VtConn*, Packet*);
29 Packet *vtrpc(VtConn*, Packet*);
31 Packet*
32 vtrpc(VtConn *z, Packet *p)
33 {
34 int i;
35 uchar tag, buf[2], *top;
36 Rwait *r;
38 /* must malloc because stack could be private */
39 r = vtmallocz(sizeof(Rwait));
41 qlock(&z->lk);
42 r->r.l = &z->lk;
43 tag = gettag(z, r);
45 /* slam tag into packet */
46 top = packetpeek(p, buf, 0, 2);
47 if(top == nil){
48 packetfree(p);
49 return nil;
50 }
51 if(top == buf){
52 werrstr("first two bytes must be in same packet fragment");
53 packetfree(p);
54 return nil;
55 }
56 top[1] = tag;
57 qunlock(&z->lk);
58 if(vtsend(z, p) < 0)
59 return nil;
61 qlock(&z->lk);
62 /* wait for the muxer to give us our packet */
63 r->sleeping = 1;
64 z->nsleep++;
65 while(z->muxer && !r->done)
66 rsleep(&r->r);
67 z->nsleep--;
68 r->sleeping = 0;
70 /* if not done, there's no muxer: start muxing */
71 if(!r->done){
72 if(z->muxer)
73 abort();
74 z->muxer = 1;
75 while(!r->done){
76 qunlock(&z->lk);
77 if((p = vtrecv(z)) == nil){
78 z->muxer = 0;
79 return nil;
80 }
81 qlock(&z->lk);
82 muxrpc(z, p);
83 }
84 z->muxer = 0;
85 /* if there is anyone else sleeping, wake them to mux */
86 if(z->nsleep){
87 for(i=0; i<256; i++)
88 if(z->wait[i] != nil && ((Rwait*)z->wait[i])->sleeping)
89 break;
90 if(i==256)
91 fprint(2, "libventi: nsleep botch\n");
92 else
93 rwakeup(&((Rwait*)z->wait[i])->r);
94 }
95 }
97 p = r->p;
98 puttag(z, r, tag);
99 vtfree(r);
100 qunlock(&z->lk);
101 return p;
104 static int
105 gettag(VtConn *z, Rwait *r)
107 int i;
109 Again:
110 while(z->ntag == 256)
111 rsleep(&z->tagrend);
112 for(i=0; i<256; i++)
113 if(z->wait[i] == 0){
114 z->ntag++;
115 z->wait[i] = r;
116 return i;
118 fprint(2, "libventi: ntag botch\n");
119 goto Again;
122 static void
123 puttag(VtConn *z, Rwait *r, int tag)
125 assert(z->wait[tag] == r);
126 z->wait[tag] = nil;
127 z->ntag--;
128 rwakeup(&z->tagrend);
131 static void
132 muxrpc(VtConn *z, Packet *p)
134 uchar tag, buf[2], *top;
135 Rwait *r;
137 if((top = packetpeek(p, buf, 0, 2)) == nil){
138 fprint(2, "libventi: short packet in vtrpc\n");
139 packetfree(p);
140 return;
143 tag = top[1];
144 if((r = z->wait[tag]) == nil){
145 fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag);
146 abort();
147 packetfree(p);
148 return;
151 r->p = p;
152 r->done = 1;
153 rwakeup(&r->r);