Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <venti.h>
4 #include "queue.h"
6 long ventisendbytes, ventisendpackets;
7 long ventirecvbytes, ventirecvpackets;
9 static int
10 _vtsend(VtConn *z, Packet *p)
11 {
12 IOchunk ioc;
13 int n;
14 uchar buf[2];
17 if(z->state != VtStateConnected) {
18 werrstr("session not connected");
19 return -1;
20 }
22 /* add framing */
23 n = packetsize(p);
24 if(n >= (1<<16)) {
25 werrstr("packet too large");
26 packetfree(p);
27 return -1;
28 }
29 buf[0] = n>>8;
30 buf[1] = n;
31 packetprefix(p, buf, 2);
32 ventisendbytes += n+2;
33 ventisendpackets++;
35 for(;;){
36 n = packetfragments(p, &ioc, 1, 0);
37 if(n == 0)
38 break;
39 if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
40 packetfree(p);
41 return 0;
42 }
43 packetconsume(p, nil, ioc.len);
44 }
45 packetfree(p);
46 return 1;
47 }
49 static int
50 interrupted(void)
51 {
52 char e[ERRMAX];
54 rerrstr(e, sizeof e);
55 return strstr(e, "interrupted") != nil;
56 }
59 static Packet*
60 _vtrecv(VtConn *z)
61 {
62 uchar buf[10], *b;
63 int n;
64 Packet *p;
65 int size, len;
67 if(z->state != VtStateConnected) {
68 werrstr("session not connected");
69 return nil;
70 }
72 p = z->part;
73 /* get enough for head size */
74 size = packetsize(p);
75 while(size < 2) {
76 b = packettrailer(p, MaxFragSize);
77 assert(b != nil);
78 if(0) fprint(2, "%d read hdr\n", getpid());
79 n = read(z->infd, b, MaxFragSize);
80 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
81 if(n==0 || (n<0 && !interrupted()))
82 goto Err;
83 size += n;
84 packettrim(p, 0, size);
85 }
87 if(packetconsume(p, buf, 2) < 0)
88 goto Err;
89 len = (buf[0] << 8) | buf[1];
90 size -= 2;
92 while(size < len) {
93 // n = len - size;
94 // if(n > MaxFragSize)
95 n = MaxFragSize;
96 b = packettrailer(p, n);
97 if(0) fprint(2, "%d read body %d\n", getpid(), n);
98 n = read(z->infd, b, n);
99 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
100 if(n > 0)
101 size += n;
102 packettrim(p, 0, size);
103 if(n==0 || (n<0 && !interrupted()))
104 goto Err;
106 ventirecvbytes += len;
107 ventirecvpackets++;
108 p = packetsplit(p, len);
109 return p;
110 Err:
111 return nil;
114 /*
115 * If you fork off two procs running vtrecvproc and vtsendproc,
116 * then vtrecv/vtsend (and thus vtrpc) will never block except on
117 * rendevouses, which is nice when it's running in one thread of many.
118 */
119 void
120 vtrecvproc(void *v)
122 Packet *p;
123 VtConn *z;
124 Queue *q;
126 z = v;
127 q = _vtqalloc();
129 qlock(&z->lk);
130 z->readq = q;
131 qlock(&z->inlk);
132 rwakeup(&z->rpcfork);
133 qunlock(&z->lk);
135 while((p = _vtrecv(z)) != nil)
136 if(_vtqsend(q, p) < 0){
137 packetfree(p);
138 break;
140 qunlock(&z->inlk);
141 qlock(&z->lk);
142 _vtqhangup(q);
143 while((p = _vtnbqrecv(q)) != nil)
144 packetfree(p);
145 vtfree(q);
146 z->readq = nil;
147 rwakeup(&z->rpcfork);
148 qunlock(&z->lk);
149 vthangup(z);
152 void
153 vtsendproc(void *v)
155 Queue *q;
156 Packet *p;
157 VtConn *z;
159 z = v;
160 q = _vtqalloc();
162 qlock(&z->lk);
163 z->writeq = q;
164 qlock(&z->outlk);
165 rwakeup(&z->rpcfork);
166 qunlock(&z->lk);
168 while((p = _vtqrecv(q)) != nil)
169 if(_vtsend(z, p) < 0)
170 break;
171 qunlock(&z->outlk);
172 qlock(&z->lk);
173 _vtqhangup(q);
174 while((p = _vtnbqrecv(q)) != nil)
175 packetfree(p);
176 vtfree(q);
177 z->writeq = nil;
178 rwakeup(&z->rpcfork);
179 qunlock(&z->lk);
180 return;
183 Packet*
184 vtrecv(VtConn *z)
186 Packet *p;
188 qlock(&z->lk);
189 if(z->state != VtStateConnected){
190 werrstr("not connected");
191 qunlock(&z->lk);
192 return nil;
194 if(z->readq){
195 qunlock(&z->lk);
196 return _vtqrecv(z->readq);
199 qlock(&z->inlk);
200 qunlock(&z->lk);
201 p = _vtrecv(z);
202 qunlock(&z->inlk);
203 if(!p)
204 vthangup(z);
205 return p;
208 int
209 vtsend(VtConn *z, Packet *p)
211 qlock(&z->lk);
212 if(z->state != VtStateConnected){
213 packetfree(p);
214 werrstr("not connected");
215 qunlock(&z->lk);
216 return -1;
218 if(z->writeq){
219 qunlock(&z->lk);
220 if(_vtqsend(z->writeq, p) < 0){
221 packetfree(p);
222 return -1;
224 return 0;
227 qlock(&z->outlk);
228 qunlock(&z->lk);
229 if(_vtsend(z, p) < 0){
230 qunlock(&z->outlk);
231 vthangup(z);
232 return -1;
234 qunlock(&z->outlk);
235 return 0;