Blame


1 056fe1ba 2003-11-23 devnull #include <u.h>
2 056fe1ba 2003-11-23 devnull #include <libc.h>
3 056fe1ba 2003-11-23 devnull #include <venti.h>
4 056fe1ba 2003-11-23 devnull #include "queue.h"
5 056fe1ba 2003-11-23 devnull
6 8baa0cbd 2004-06-09 devnull long ventisendbytes, ventisendpackets;
7 8baa0cbd 2004-06-09 devnull long ventirecvbytes, ventirecvpackets;
8 8baa0cbd 2004-06-09 devnull
9 056fe1ba 2003-11-23 devnull static int
10 056fe1ba 2003-11-23 devnull _vtsend(VtConn *z, Packet *p)
11 056fe1ba 2003-11-23 devnull {
12 056fe1ba 2003-11-23 devnull IOchunk ioc;
13 056fe1ba 2003-11-23 devnull int n;
14 056fe1ba 2003-11-23 devnull uchar buf[2];
15 8baa0cbd 2004-06-09 devnull
16 8baa0cbd 2004-06-09 devnull
17 056fe1ba 2003-11-23 devnull if(z->state != VtStateConnected) {
18 056fe1ba 2003-11-23 devnull werrstr("session not connected");
19 056fe1ba 2003-11-23 devnull return -1;
20 056fe1ba 2003-11-23 devnull }
21 056fe1ba 2003-11-23 devnull
22 056fe1ba 2003-11-23 devnull /* add framing */
23 056fe1ba 2003-11-23 devnull n = packetsize(p);
24 056fe1ba 2003-11-23 devnull if(n >= (1<<16)) {
25 056fe1ba 2003-11-23 devnull werrstr("packet too large");
26 056fe1ba 2003-11-23 devnull packetfree(p);
27 056fe1ba 2003-11-23 devnull return -1;
28 056fe1ba 2003-11-23 devnull }
29 056fe1ba 2003-11-23 devnull buf[0] = n>>8;
30 056fe1ba 2003-11-23 devnull buf[1] = n;
31 056fe1ba 2003-11-23 devnull packetprefix(p, buf, 2);
32 8baa0cbd 2004-06-09 devnull ventisendbytes += n+2;
33 8baa0cbd 2004-06-09 devnull ventisendpackets++;
34 056fe1ba 2003-11-23 devnull
35 056fe1ba 2003-11-23 devnull for(;;){
36 056fe1ba 2003-11-23 devnull n = packetfragments(p, &ioc, 1, 0);
37 056fe1ba 2003-11-23 devnull if(n == 0)
38 056fe1ba 2003-11-23 devnull break;
39 056fe1ba 2003-11-23 devnull if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
40 056fe1ba 2003-11-23 devnull packetfree(p);
41 056fe1ba 2003-11-23 devnull return 0;
42 056fe1ba 2003-11-23 devnull }
43 056fe1ba 2003-11-23 devnull packetconsume(p, nil, ioc.len);
44 056fe1ba 2003-11-23 devnull }
45 056fe1ba 2003-11-23 devnull packetfree(p);
46 056fe1ba 2003-11-23 devnull return 1;
47 056fe1ba 2003-11-23 devnull }
48 056fe1ba 2003-11-23 devnull
49 0c148046 2004-06-16 devnull static int
50 0c148046 2004-06-16 devnull interrupted(void)
51 0c148046 2004-06-16 devnull {
52 0c148046 2004-06-16 devnull char e[ERRMAX];
53 0c148046 2004-06-16 devnull
54 0c148046 2004-06-16 devnull rerrstr(e, sizeof e);
55 0c148046 2004-06-16 devnull return strstr(e, "interrupted") != nil;
56 0c148046 2004-06-16 devnull }
57 0c148046 2004-06-16 devnull
58 0c148046 2004-06-16 devnull
59 056fe1ba 2003-11-23 devnull static Packet*
60 056fe1ba 2003-11-23 devnull _vtrecv(VtConn *z)
61 056fe1ba 2003-11-23 devnull {
62 056fe1ba 2003-11-23 devnull uchar buf[10], *b;
63 056fe1ba 2003-11-23 devnull int n;
64 056fe1ba 2003-11-23 devnull Packet *p;
65 056fe1ba 2003-11-23 devnull int size, len;
66 056fe1ba 2003-11-23 devnull
67 056fe1ba 2003-11-23 devnull if(z->state != VtStateConnected) {
68 056fe1ba 2003-11-23 devnull werrstr("session not connected");
69 056fe1ba 2003-11-23 devnull return nil;
70 056fe1ba 2003-11-23 devnull }
71 056fe1ba 2003-11-23 devnull
72 056fe1ba 2003-11-23 devnull p = z->part;
73 056fe1ba 2003-11-23 devnull /* get enough for head size */
74 056fe1ba 2003-11-23 devnull size = packetsize(p);
75 056fe1ba 2003-11-23 devnull while(size < 2) {
76 056fe1ba 2003-11-23 devnull b = packettrailer(p, MaxFragSize);
77 056fe1ba 2003-11-23 devnull assert(b != nil);
78 a09e80f9 2004-05-23 devnull if(0) fprint(2, "%d read hdr\n", getpid());
79 056fe1ba 2003-11-23 devnull n = read(z->infd, b, MaxFragSize);
80 a09e80f9 2004-05-23 devnull if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
81 0c148046 2004-06-16 devnull if(n==0 || (n<0 && !interrupted()))
82 056fe1ba 2003-11-23 devnull goto Err;
83 056fe1ba 2003-11-23 devnull size += n;
84 056fe1ba 2003-11-23 devnull packettrim(p, 0, size);
85 056fe1ba 2003-11-23 devnull }
86 056fe1ba 2003-11-23 devnull
87 056fe1ba 2003-11-23 devnull if(packetconsume(p, buf, 2) < 0)
88 056fe1ba 2003-11-23 devnull goto Err;
89 056fe1ba 2003-11-23 devnull len = (buf[0] << 8) | buf[1];
90 056fe1ba 2003-11-23 devnull size -= 2;
91 056fe1ba 2003-11-23 devnull
92 056fe1ba 2003-11-23 devnull while(size < len) {
93 a09e80f9 2004-05-23 devnull // n = len - size;
94 a09e80f9 2004-05-23 devnull // if(n > MaxFragSize)
95 056fe1ba 2003-11-23 devnull n = MaxFragSize;
96 056fe1ba 2003-11-23 devnull b = packettrailer(p, n);
97 a09e80f9 2004-05-23 devnull if(0) fprint(2, "%d read body %d\n", getpid(), n);
98 a09e80f9 2004-05-23 devnull n = read(z->infd, b, n);
99 a09e80f9 2004-05-23 devnull if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
100 a09e80f9 2004-05-23 devnull if(n > 0)
101 a09e80f9 2004-05-23 devnull size += n;
102 a09e80f9 2004-05-23 devnull packettrim(p, 0, size);
103 0c148046 2004-06-16 devnull if(n==0 || (n<0 && !interrupted()))
104 056fe1ba 2003-11-23 devnull goto Err;
105 056fe1ba 2003-11-23 devnull }
106 8baa0cbd 2004-06-09 devnull ventirecvbytes += len;
107 8baa0cbd 2004-06-09 devnull ventirecvpackets++;
108 056fe1ba 2003-11-23 devnull p = packetsplit(p, len);
109 056fe1ba 2003-11-23 devnull return p;
110 056fe1ba 2003-11-23 devnull Err:
111 056fe1ba 2003-11-23 devnull return nil;
112 056fe1ba 2003-11-23 devnull }
113 056fe1ba 2003-11-23 devnull
114 056fe1ba 2003-11-23 devnull /*
115 056fe1ba 2003-11-23 devnull * If you fork off two procs running vtrecvproc and vtsendproc,
116 056fe1ba 2003-11-23 devnull * then vtrecv/vtsend (and thus vtrpc) will never block except on
117 056fe1ba 2003-11-23 devnull * rendevouses, which is nice when it's running in one thread of many.
118 056fe1ba 2003-11-23 devnull */
119 056fe1ba 2003-11-23 devnull void
120 056fe1ba 2003-11-23 devnull vtrecvproc(void *v)
121 056fe1ba 2003-11-23 devnull {
122 056fe1ba 2003-11-23 devnull Packet *p;
123 056fe1ba 2003-11-23 devnull VtConn *z;
124 056fe1ba 2003-11-23 devnull Queue *q;
125 056fe1ba 2003-11-23 devnull
126 056fe1ba 2003-11-23 devnull z = v;
127 056fe1ba 2003-11-23 devnull q = _vtqalloc();
128 056fe1ba 2003-11-23 devnull
129 056fe1ba 2003-11-23 devnull qlock(&z->lk);
130 056fe1ba 2003-11-23 devnull z->readq = q;
131 056fe1ba 2003-11-23 devnull qlock(&z->inlk);
132 056fe1ba 2003-11-23 devnull rwakeup(&z->rpcfork);
133 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
134 056fe1ba 2003-11-23 devnull
135 056fe1ba 2003-11-23 devnull while((p = _vtrecv(z)) != nil)
136 056fe1ba 2003-11-23 devnull if(_vtqsend(q, p) < 0){
137 056fe1ba 2003-11-23 devnull packetfree(p);
138 056fe1ba 2003-11-23 devnull break;
139 056fe1ba 2003-11-23 devnull }
140 056fe1ba 2003-11-23 devnull qunlock(&z->inlk);
141 056fe1ba 2003-11-23 devnull qlock(&z->lk);
142 056fe1ba 2003-11-23 devnull _vtqhangup(q);
143 056fe1ba 2003-11-23 devnull while((p = _vtnbqrecv(q)) != nil)
144 056fe1ba 2003-11-23 devnull packetfree(p);
145 056fe1ba 2003-11-23 devnull vtfree(q);
146 056fe1ba 2003-11-23 devnull z->readq = nil;
147 056fe1ba 2003-11-23 devnull rwakeup(&z->rpcfork);
148 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
149 056fe1ba 2003-11-23 devnull vthangup(z);
150 056fe1ba 2003-11-23 devnull }
151 056fe1ba 2003-11-23 devnull
152 056fe1ba 2003-11-23 devnull void
153 056fe1ba 2003-11-23 devnull vtsendproc(void *v)
154 056fe1ba 2003-11-23 devnull {
155 056fe1ba 2003-11-23 devnull Queue *q;
156 056fe1ba 2003-11-23 devnull Packet *p;
157 056fe1ba 2003-11-23 devnull VtConn *z;
158 056fe1ba 2003-11-23 devnull
159 056fe1ba 2003-11-23 devnull z = v;
160 056fe1ba 2003-11-23 devnull q = _vtqalloc();
161 056fe1ba 2003-11-23 devnull
162 056fe1ba 2003-11-23 devnull qlock(&z->lk);
163 056fe1ba 2003-11-23 devnull z->writeq = q;
164 056fe1ba 2003-11-23 devnull qlock(&z->outlk);
165 056fe1ba 2003-11-23 devnull rwakeup(&z->rpcfork);
166 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
167 056fe1ba 2003-11-23 devnull
168 056fe1ba 2003-11-23 devnull while((p = _vtqrecv(q)) != nil)
169 056fe1ba 2003-11-23 devnull if(_vtsend(z, p) < 0)
170 056fe1ba 2003-11-23 devnull break;
171 056fe1ba 2003-11-23 devnull qunlock(&z->outlk);
172 056fe1ba 2003-11-23 devnull qlock(&z->lk);
173 056fe1ba 2003-11-23 devnull _vtqhangup(q);
174 056fe1ba 2003-11-23 devnull while((p = _vtnbqrecv(q)) != nil)
175 056fe1ba 2003-11-23 devnull packetfree(p);
176 056fe1ba 2003-11-23 devnull vtfree(q);
177 056fe1ba 2003-11-23 devnull z->writeq = nil;
178 056fe1ba 2003-11-23 devnull rwakeup(&z->rpcfork);
179 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
180 056fe1ba 2003-11-23 devnull return;
181 056fe1ba 2003-11-23 devnull }
182 056fe1ba 2003-11-23 devnull
183 056fe1ba 2003-11-23 devnull Packet*
184 056fe1ba 2003-11-23 devnull vtrecv(VtConn *z)
185 056fe1ba 2003-11-23 devnull {
186 056fe1ba 2003-11-23 devnull Packet *p;
187 056fe1ba 2003-11-23 devnull
188 056fe1ba 2003-11-23 devnull qlock(&z->lk);
189 056fe1ba 2003-11-23 devnull if(z->state != VtStateConnected){
190 056fe1ba 2003-11-23 devnull werrstr("not connected");
191 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
192 056fe1ba 2003-11-23 devnull return nil;
193 056fe1ba 2003-11-23 devnull }
194 056fe1ba 2003-11-23 devnull if(z->readq){
195 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
196 056fe1ba 2003-11-23 devnull return _vtqrecv(z->readq);
197 056fe1ba 2003-11-23 devnull }
198 056fe1ba 2003-11-23 devnull
199 056fe1ba 2003-11-23 devnull qlock(&z->inlk);
200 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
201 056fe1ba 2003-11-23 devnull p = _vtrecv(z);
202 056fe1ba 2003-11-23 devnull qunlock(&z->inlk);
203 056fe1ba 2003-11-23 devnull if(!p)
204 056fe1ba 2003-11-23 devnull vthangup(z);
205 056fe1ba 2003-11-23 devnull return p;
206 056fe1ba 2003-11-23 devnull }
207 056fe1ba 2003-11-23 devnull
208 056fe1ba 2003-11-23 devnull int
209 056fe1ba 2003-11-23 devnull vtsend(VtConn *z, Packet *p)
210 056fe1ba 2003-11-23 devnull {
211 056fe1ba 2003-11-23 devnull qlock(&z->lk);
212 056fe1ba 2003-11-23 devnull if(z->state != VtStateConnected){
213 056fe1ba 2003-11-23 devnull packetfree(p);
214 056fe1ba 2003-11-23 devnull werrstr("not connected");
215 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
216 056fe1ba 2003-11-23 devnull return -1;
217 056fe1ba 2003-11-23 devnull }
218 056fe1ba 2003-11-23 devnull if(z->writeq){
219 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
220 056fe1ba 2003-11-23 devnull if(_vtqsend(z->writeq, p) < 0){
221 056fe1ba 2003-11-23 devnull packetfree(p);
222 056fe1ba 2003-11-23 devnull return -1;
223 056fe1ba 2003-11-23 devnull }
224 056fe1ba 2003-11-23 devnull return 0;
225 056fe1ba 2003-11-23 devnull }
226 056fe1ba 2003-11-23 devnull
227 056fe1ba 2003-11-23 devnull qlock(&z->outlk);
228 056fe1ba 2003-11-23 devnull qunlock(&z->lk);
229 056fe1ba 2003-11-23 devnull if(_vtsend(z, p) < 0){
230 056fe1ba 2003-11-23 devnull qunlock(&z->outlk);
231 056fe1ba 2003-11-23 devnull vthangup(z);
232 056fe1ba 2003-11-23 devnull return -1;
233 056fe1ba 2003-11-23 devnull }
234 056fe1ba 2003-11-23 devnull qunlock(&z->outlk);
235 056fe1ba 2003-11-23 devnull return 0;
236 056fe1ba 2003-11-23 devnull }
237 056fe1ba 2003-11-23 devnull