1 056fe1ba 2003-11-23 devnull #include <u.h>
2 056fe1ba 2003-11-23 devnull #include <libc.h>
3 056fe1ba 2003-11-23 devnull #include <venti.h>
4 056fe1ba 2003-11-23 devnull #include "queue.h"
6 056fe1ba 2003-11-23 devnull static int
7 056fe1ba 2003-11-23 devnull _vtsend(VtConn *z, Packet *p)
9 056fe1ba 2003-11-23 devnull IOchunk ioc;
11 056fe1ba 2003-11-23 devnull uchar buf[2];
13 056fe1ba 2003-11-23 devnull if(z->state != VtStateConnected) {
14 056fe1ba 2003-11-23 devnull werrstr("session not connected");
15 056fe1ba 2003-11-23 devnull return -1;
18 056fe1ba 2003-11-23 devnull /* add framing */
19 056fe1ba 2003-11-23 devnull n = packetsize(p);
20 056fe1ba 2003-11-23 devnull if(n >= (1<<16)) {
21 056fe1ba 2003-11-23 devnull werrstr("packet too large");
22 056fe1ba 2003-11-23 devnull packetfree(p);
23 056fe1ba 2003-11-23 devnull return -1;
25 056fe1ba 2003-11-23 devnull buf[0] = n>>8;
26 056fe1ba 2003-11-23 devnull buf[1] = n;
27 056fe1ba 2003-11-23 devnull packetprefix(p, buf, 2);
30 056fe1ba 2003-11-23 devnull n = packetfragments(p, &ioc, 1, 0);
31 056fe1ba 2003-11-23 devnull if(n == 0)
33 056fe1ba 2003-11-23 devnull if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
34 056fe1ba 2003-11-23 devnull packetfree(p);
35 056fe1ba 2003-11-23 devnull return 0;
37 056fe1ba 2003-11-23 devnull packetconsume(p, nil, ioc.len);
39 056fe1ba 2003-11-23 devnull packetfree(p);
40 056fe1ba 2003-11-23 devnull return 1;
43 056fe1ba 2003-11-23 devnull static Packet*
44 056fe1ba 2003-11-23 devnull _vtrecv(VtConn *z)
46 056fe1ba 2003-11-23 devnull uchar buf[10], *b;
48 056fe1ba 2003-11-23 devnull Packet *p;
49 056fe1ba 2003-11-23 devnull int size, len;
51 056fe1ba 2003-11-23 devnull if(z->state != VtStateConnected) {
52 056fe1ba 2003-11-23 devnull werrstr("session not connected");
53 056fe1ba 2003-11-23 devnull return nil;
56 056fe1ba 2003-11-23 devnull p = z->part;
57 056fe1ba 2003-11-23 devnull /* get enough for head size */
58 056fe1ba 2003-11-23 devnull size = packetsize(p);
59 056fe1ba 2003-11-23 devnull while(size < 2) {
60 056fe1ba 2003-11-23 devnull b = packettrailer(p, MaxFragSize);
61 056fe1ba 2003-11-23 devnull assert(b != nil);
62 056fe1ba 2003-11-23 devnull n = read(z->infd, b, MaxFragSize);
63 056fe1ba 2003-11-23 devnull if(n <= 0)
64 056fe1ba 2003-11-23 devnull goto Err;
65 056fe1ba 2003-11-23 devnull size += n;
66 056fe1ba 2003-11-23 devnull packettrim(p, 0, size);
69 056fe1ba 2003-11-23 devnull if(packetconsume(p, buf, 2) < 0)
70 056fe1ba 2003-11-23 devnull goto Err;
71 056fe1ba 2003-11-23 devnull len = (buf[0] << 8) | buf[1];
72 056fe1ba 2003-11-23 devnull size -= 2;
74 056fe1ba 2003-11-23 devnull while(size < len) {
75 056fe1ba 2003-11-23 devnull n = len - size;
76 056fe1ba 2003-11-23 devnull if(n > MaxFragSize)
77 056fe1ba 2003-11-23 devnull n = MaxFragSize;
78 056fe1ba 2003-11-23 devnull b = packettrailer(p, n);
79 056fe1ba 2003-11-23 devnull if(readn(z->infd, b, n) != n)
80 056fe1ba 2003-11-23 devnull goto Err;
81 056fe1ba 2003-11-23 devnull size += n;
83 056fe1ba 2003-11-23 devnull p = packetsplit(p, len);
84 056fe1ba 2003-11-23 devnull return p;
86 056fe1ba 2003-11-23 devnull return nil;
90 056fe1ba 2003-11-23 devnull * If you fork off two procs running vtrecvproc and vtsendproc,
91 056fe1ba 2003-11-23 devnull * then vtrecv/vtsend (and thus vtrpc) will never block except on
92 056fe1ba 2003-11-23 devnull * rendevouses, which is nice when it's running in one thread of many.
95 056fe1ba 2003-11-23 devnull vtrecvproc(void *v)
97 056fe1ba 2003-11-23 devnull Packet *p;
98 056fe1ba 2003-11-23 devnull VtConn *z;
99 056fe1ba 2003-11-23 devnull Queue *q;
102 056fe1ba 2003-11-23 devnull q = _vtqalloc();
104 056fe1ba 2003-11-23 devnull qlock(&z->lk);
105 056fe1ba 2003-11-23 devnull z->readq = q;
106 056fe1ba 2003-11-23 devnull qlock(&z->inlk);
107 056fe1ba 2003-11-23 devnull rwakeup(&z->rpcfork);
108 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
110 056fe1ba 2003-11-23 devnull while((p = _vtrecv(z)) != nil)
111 056fe1ba 2003-11-23 devnull if(_vtqsend(q, p) < 0){
112 056fe1ba 2003-11-23 devnull packetfree(p);
115 056fe1ba 2003-11-23 devnull qunlock(&z->inlk);
116 056fe1ba 2003-11-23 devnull qlock(&z->lk);
117 056fe1ba 2003-11-23 devnull _vtqhangup(q);
118 056fe1ba 2003-11-23 devnull while((p = _vtnbqrecv(q)) != nil)
119 056fe1ba 2003-11-23 devnull packetfree(p);
120 056fe1ba 2003-11-23 devnull vtfree(q);
121 056fe1ba 2003-11-23 devnull z->readq = nil;
122 056fe1ba 2003-11-23 devnull rwakeup(&z->rpcfork);
123 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
124 056fe1ba 2003-11-23 devnull vthangup(z);
128 056fe1ba 2003-11-23 devnull vtsendproc(void *v)
130 056fe1ba 2003-11-23 devnull Queue *q;
131 056fe1ba 2003-11-23 devnull Packet *p;
132 056fe1ba 2003-11-23 devnull VtConn *z;
135 056fe1ba 2003-11-23 devnull q = _vtqalloc();
137 056fe1ba 2003-11-23 devnull qlock(&z->lk);
138 056fe1ba 2003-11-23 devnull z->writeq = q;
139 056fe1ba 2003-11-23 devnull qlock(&z->outlk);
140 056fe1ba 2003-11-23 devnull rwakeup(&z->rpcfork);
141 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
143 056fe1ba 2003-11-23 devnull while((p = _vtqrecv(q)) != nil)
144 056fe1ba 2003-11-23 devnull if(_vtsend(z, p) < 0)
146 056fe1ba 2003-11-23 devnull qunlock(&z->outlk);
147 056fe1ba 2003-11-23 devnull qlock(&z->lk);
148 056fe1ba 2003-11-23 devnull _vtqhangup(q);
149 056fe1ba 2003-11-23 devnull while((p = _vtnbqrecv(q)) != nil)
150 056fe1ba 2003-11-23 devnull packetfree(p);
151 056fe1ba 2003-11-23 devnull vtfree(q);
152 056fe1ba 2003-11-23 devnull z->writeq = nil;
153 056fe1ba 2003-11-23 devnull rwakeup(&z->rpcfork);
154 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
159 056fe1ba 2003-11-23 devnull vtrecv(VtConn *z)
161 056fe1ba 2003-11-23 devnull Packet *p;
163 056fe1ba 2003-11-23 devnull qlock(&z->lk);
164 056fe1ba 2003-11-23 devnull if(z->state != VtStateConnected){
165 056fe1ba 2003-11-23 devnull werrstr("not connected");
166 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
167 056fe1ba 2003-11-23 devnull return nil;
169 056fe1ba 2003-11-23 devnull if(z->readq){
170 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
171 056fe1ba 2003-11-23 devnull return _vtqrecv(z->readq);
174 056fe1ba 2003-11-23 devnull qlock(&z->inlk);
175 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
176 056fe1ba 2003-11-23 devnull p = _vtrecv(z);
177 056fe1ba 2003-11-23 devnull qunlock(&z->inlk);
179 056fe1ba 2003-11-23 devnull vthangup(z);
180 056fe1ba 2003-11-23 devnull return p;
184 056fe1ba 2003-11-23 devnull vtsend(VtConn *z, Packet *p)
186 056fe1ba 2003-11-23 devnull qlock(&z->lk);
187 056fe1ba 2003-11-23 devnull if(z->state != VtStateConnected){
188 056fe1ba 2003-11-23 devnull packetfree(p);
189 056fe1ba 2003-11-23 devnull werrstr("not connected");
190 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
191 056fe1ba 2003-11-23 devnull return -1;
193 056fe1ba 2003-11-23 devnull if(z->writeq){
194 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
195 056fe1ba 2003-11-23 devnull if(_vtqsend(z->writeq, p) < 0){
196 056fe1ba 2003-11-23 devnull packetfree(p);
197 056fe1ba 2003-11-23 devnull return -1;
199 056fe1ba 2003-11-23 devnull return 0;
202 056fe1ba 2003-11-23 devnull qlock(&z->outlk);
203 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
204 056fe1ba 2003-11-23 devnull if(_vtsend(z, p) < 0){
205 056fe1ba 2003-11-23 devnull qunlock(&z->outlk);
206 056fe1ba 2003-11-23 devnull vthangup(z);
207 056fe1ba 2003-11-23 devnull return -1;
209 056fe1ba 2003-11-23 devnull qunlock(&z->outlk);
210 056fe1ba 2003-11-23 devnull return 0;