Blob


1 #include <u.h>
2 #include <errno.h>
3 #include <libc.h>
4 #include <venti.h>
5 #include "queue.h"
7 long ventisendbytes, ventisendpackets;
8 long ventirecvbytes, ventirecvpackets;
10 static int
11 _vtsend(VtConn *z, Packet *p)
12 {
13 IOchunk ioc;
14 int n;
15 uchar buf[2];
18 if(z->state != VtStateConnected) {
19 werrstr("session not connected");
20 return -1;
21 }
23 /* add framing */
24 n = packetsize(p);
25 if(n >= (1<<16)) {
26 werrstr("packet too large");
27 packetfree(p);
28 return -1;
29 }
30 buf[0] = n>>8;
31 buf[1] = n;
32 packetprefix(p, buf, 2);
33 ventisendbytes += n+2;
34 ventisendpackets++;
36 for(;;){
37 n = packetfragments(p, &ioc, 1, 0);
38 if(n == 0)
39 break;
40 if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
41 packetfree(p);
42 return 0;
43 }
44 packetconsume(p, nil, ioc.len);
45 }
46 packetfree(p);
47 return 1;
48 }
50 static Packet*
51 _vtrecv(VtConn *z)
52 {
53 uchar buf[10], *b;
54 int n;
55 Packet *p;
56 int size, len;
58 if(z->state != VtStateConnected) {
59 werrstr("session not connected");
60 return nil;
61 }
63 p = z->part;
64 /* get enough for head size */
65 size = packetsize(p);
66 while(size < 2) {
67 b = packettrailer(p, MaxFragSize);
68 assert(b != nil);
69 if(0) fprint(2, "%d read hdr\n", getpid());
70 n = read(z->infd, b, MaxFragSize);
71 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
72 if(n==0 || (n<0 && errno!=EINTR))
73 goto Err;
74 size += n;
75 packettrim(p, 0, size);
76 }
78 if(packetconsume(p, buf, 2) < 0)
79 goto Err;
80 len = (buf[0] << 8) | buf[1];
81 size -= 2;
83 while(size < len) {
84 // n = len - size;
85 // if(n > MaxFragSize)
86 n = MaxFragSize;
87 b = packettrailer(p, n);
88 if(0) fprint(2, "%d read body %d\n", getpid(), n);
89 n = read(z->infd, b, n);
90 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
91 if(n > 0)
92 size += n;
93 packettrim(p, 0, size);
94 if(n==0 || (n<0 && errno!=EINTR))
95 goto Err;
96 }
97 ventirecvbytes += len;
98 ventirecvpackets++;
99 p = packetsplit(p, len);
100 return p;
101 Err:
102 return nil;
105 /*
106 * If you fork off two procs running vtrecvproc and vtsendproc,
107 * then vtrecv/vtsend (and thus vtrpc) will never block except on
108 * rendevouses, which is nice when it's running in one thread of many.
109 */
110 void
111 vtrecvproc(void *v)
113 Packet *p;
114 VtConn *z;
115 Queue *q;
117 z = v;
118 q = _vtqalloc();
120 qlock(&z->lk);
121 z->readq = q;
122 qlock(&z->inlk);
123 rwakeup(&z->rpcfork);
124 qunlock(&z->lk);
126 while((p = _vtrecv(z)) != nil)
127 if(_vtqsend(q, p) < 0){
128 packetfree(p);
129 break;
131 qunlock(&z->inlk);
132 qlock(&z->lk);
133 _vtqhangup(q);
134 while((p = _vtnbqrecv(q)) != nil)
135 packetfree(p);
136 vtfree(q);
137 z->readq = nil;
138 rwakeup(&z->rpcfork);
139 qunlock(&z->lk);
140 vthangup(z);
143 void
144 vtsendproc(void *v)
146 Queue *q;
147 Packet *p;
148 VtConn *z;
150 z = v;
151 q = _vtqalloc();
153 qlock(&z->lk);
154 z->writeq = q;
155 qlock(&z->outlk);
156 rwakeup(&z->rpcfork);
157 qunlock(&z->lk);
159 while((p = _vtqrecv(q)) != nil)
160 if(_vtsend(z, p) < 0)
161 break;
162 qunlock(&z->outlk);
163 qlock(&z->lk);
164 _vtqhangup(q);
165 while((p = _vtnbqrecv(q)) != nil)
166 packetfree(p);
167 vtfree(q);
168 z->writeq = nil;
169 rwakeup(&z->rpcfork);
170 qunlock(&z->lk);
171 return;
174 Packet*
175 vtrecv(VtConn *z)
177 Packet *p;
179 qlock(&z->lk);
180 if(z->state != VtStateConnected){
181 werrstr("not connected");
182 qunlock(&z->lk);
183 return nil;
185 if(z->readq){
186 qunlock(&z->lk);
187 return _vtqrecv(z->readq);
190 qlock(&z->inlk);
191 qunlock(&z->lk);
192 p = _vtrecv(z);
193 qunlock(&z->inlk);
194 if(!p)
195 vthangup(z);
196 return p;
199 int
200 vtsend(VtConn *z, Packet *p)
202 qlock(&z->lk);
203 if(z->state != VtStateConnected){
204 packetfree(p);
205 werrstr("not connected");
206 qunlock(&z->lk);
207 return -1;
209 if(z->writeq){
210 qunlock(&z->lk);
211 if(_vtqsend(z->writeq, p) < 0){
212 packetfree(p);
213 return -1;
215 return 0;
218 qlock(&z->outlk);
219 qunlock(&z->lk);
220 if(_vtsend(z, p) < 0){
221 qunlock(&z->outlk);
222 vthangup(z);
223 return -1;
225 qunlock(&z->outlk);
226 return 0;