Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <venti.h>
4 #include "queue.h"
6 long ventisendbytes, ventisendpackets;
7 long ventirecvbytes, ventirecvpackets;
9 static int
10 _vtsend(VtConn *z, Packet *p)
11 {
12 IOchunk ioc;
13 int n, tot;
14 uchar buf[2];
16 if(z->state != VtStateConnected) {
17 werrstr("session not connected");
18 return -1;
19 }
21 /* add framing */
22 n = packetsize(p);
23 if(n >= (1<<16)) {
24 werrstr("packet too large");
25 packetfree(p);
26 return -1;
27 }
28 buf[0] = n>>8;
29 buf[1] = n;
30 packetprefix(p, buf, 2);
31 ventisendbytes += n+2;
32 ventisendpackets++;
34 tot = 0;
35 for(;;){
36 n = packetfragments(p, &ioc, 1, 0);
37 if(n == 0)
38 break;
39 if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
40 vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);
41 packetfree(p);
42 return 0;
43 }
44 packetconsume(p, nil, ioc.len);
45 tot += ioc.len;
46 }
47 vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);
48 packetfree(p);
49 return 1;
50 }
52 static int
53 interrupted(void)
54 {
55 char e[ERRMAX];
57 rerrstr(e, sizeof e);
58 return strstr(e, "interrupted") != nil;
59 }
62 static Packet*
63 _vtrecv(VtConn *z)
64 {
65 uchar buf[10], *b;
66 int n;
67 Packet *p;
68 int size, len;
70 if(z->state != VtStateConnected) {
71 werrstr("session not connected");
72 return nil;
73 }
75 p = z->part;
76 /* get enough for head size */
77 size = packetsize(p);
78 while(size < 2) {
79 b = packettrailer(p, 2);
80 assert(b != nil);
81 if(0) fprint(2, "%d read hdr\n", getpid());
82 n = read(z->infd, b, 2);
83 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
84 if(n==0 || (n<0 && !interrupted()))
85 goto Err;
86 size += n;
87 packettrim(p, 0, size);
88 }
90 if(packetconsume(p, buf, 2) < 0)
91 goto Err;
92 len = (buf[0] << 8) | buf[1];
93 size -= 2;
95 while(size < len) {
96 n = len - size;
97 if(n > MaxFragSize)
98 n = MaxFragSize;
99 b = packettrailer(p, n);
100 if(0) fprint(2, "%d read body %d\n", getpid(), n);
101 n = read(z->infd, b, n);
102 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
103 if(n > 0)
104 size += n;
105 packettrim(p, 0, size);
106 if(n==0 || (n<0 && !interrupted()))
107 goto Err;
109 ventirecvbytes += len;
110 ventirecvpackets++;
111 p = packetsplit(p, len);
112 vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);
113 return p;
114 Err:
115 vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);
116 return nil;
119 /*
120 * If you fork off two procs running vtrecvproc and vtsendproc,
121 * then vtrecv/vtsend (and thus vtrpc) will never block except on
122 * rendevouses, which is nice when it's running in one thread of many.
123 */
124 void
125 vtrecvproc(void *v)
127 Packet *p;
128 VtConn *z;
129 Queue *q;
131 z = v;
132 q = _vtqalloc();
134 qlock(&z->lk);
135 z->readq = q;
136 qlock(&z->inlk);
137 rwakeup(&z->rpcfork);
138 qunlock(&z->lk);
140 while((p = _vtrecv(z)) != nil)
141 if(_vtqsend(q, p) < 0){
142 packetfree(p);
143 break;
145 qunlock(&z->inlk);
146 qlock(&z->lk);
147 _vtqhangup(q);
148 while((p = _vtnbqrecv(q)) != nil)
149 packetfree(p);
150 vtfree(q);
151 z->readq = nil;
152 rwakeup(&z->rpcfork);
153 qunlock(&z->lk);
154 vthangup(z);
157 void
158 vtsendproc(void *v)
160 Queue *q;
161 Packet *p;
162 VtConn *z;
164 z = v;
165 q = _vtqalloc();
167 qlock(&z->lk);
168 z->writeq = q;
169 qlock(&z->outlk);
170 rwakeup(&z->rpcfork);
171 qunlock(&z->lk);
173 while((p = _vtqrecv(q)) != nil)
174 if(_vtsend(z, p) < 0)
175 break;
176 qunlock(&z->outlk);
177 qlock(&z->lk);
178 _vtqhangup(q);
179 while((p = _vtnbqrecv(q)) != nil)
180 packetfree(p);
181 vtfree(q);
182 z->writeq = nil;
183 rwakeup(&z->rpcfork);
184 qunlock(&z->lk);
185 return;
188 Packet*
189 vtrecv(VtConn *z)
191 Packet *p;
193 qlock(&z->lk);
194 if(z->state != VtStateConnected){
195 werrstr("not connected");
196 qunlock(&z->lk);
197 return nil;
199 if(z->readq){
200 qunlock(&z->lk);
201 return _vtqrecv(z->readq);
204 qlock(&z->inlk);
205 qunlock(&z->lk);
206 p = _vtrecv(z);
207 qunlock(&z->inlk);
208 if(!p)
209 vthangup(z);
210 return p;
213 int
214 vtsend(VtConn *z, Packet *p)
216 qlock(&z->lk);
217 if(z->state != VtStateConnected){
218 packetfree(p);
219 werrstr("not connected");
220 qunlock(&z->lk);
221 return -1;
223 if(z->writeq){
224 qunlock(&z->lk);
225 if(_vtqsend(z->writeq, p) < 0){
226 packetfree(p);
227 return -1;
229 return 0;
232 qlock(&z->outlk);
233 qunlock(&z->lk);
234 if(_vtsend(z, p) < 0){
235 qunlock(&z->outlk);
236 vthangup(z);
237 return -1;
239 qunlock(&z->outlk);
240 return 0;