Blob


1 #include <u.h>
2 #include <libc.h>
3 #include <venti.h>
4 #include "queue.h"
6 static int
7 _vtsend(VtConn *z, Packet *p)
8 {
9 IOchunk ioc;
10 int n;
11 uchar buf[2];
13 if(z->state != VtStateConnected) {
14 werrstr("session not connected");
15 return -1;
16 }
18 /* add framing */
19 n = packetsize(p);
20 if(n >= (1<<16)) {
21 werrstr("packet too large");
22 packetfree(p);
23 return -1;
24 }
25 buf[0] = n>>8;
26 buf[1] = n;
27 packetprefix(p, buf, 2);
29 for(;;){
30 n = packetfragments(p, &ioc, 1, 0);
31 if(n == 0)
32 break;
33 if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
34 packetfree(p);
35 return 0;
36 }
37 packetconsume(p, nil, ioc.len);
38 }
39 packetfree(p);
40 return 1;
41 }
43 static Packet*
44 _vtrecv(VtConn *z)
45 {
46 uchar buf[10], *b;
47 int n;
48 Packet *p;
49 int size, len;
51 if(z->state != VtStateConnected) {
52 werrstr("session not connected");
53 return nil;
54 }
56 p = z->part;
57 /* get enough for head size */
58 size = packetsize(p);
59 while(size < 2) {
60 b = packettrailer(p, MaxFragSize);
61 assert(b != nil);
62 n = read(z->infd, b, MaxFragSize);
63 if(n <= 0)
64 goto Err;
65 size += n;
66 packettrim(p, 0, size);
67 }
69 if(packetconsume(p, buf, 2) < 0)
70 goto Err;
71 len = (buf[0] << 8) | buf[1];
72 size -= 2;
74 while(size < len) {
75 n = len - size;
76 if(n > MaxFragSize)
77 n = MaxFragSize;
78 b = packettrailer(p, n);
79 if(readn(z->infd, b, n) != n)
80 goto Err;
81 size += n;
82 }
83 p = packetsplit(p, len);
84 return p;
85 Err:
86 return nil;
87 }
89 /*
90 * If you fork off two procs running vtrecvproc and vtsendproc,
91 * then vtrecv/vtsend (and thus vtrpc) will never block except on
92 * rendevouses, which is nice when it's running in one thread of many.
93 */
94 void
95 vtrecvproc(void *v)
96 {
97 Packet *p;
98 VtConn *z;
99 Queue *q;
101 z = v;
102 q = _vtqalloc();
104 qlock(&z->lk);
105 z->readq = q;
106 qlock(&z->inlk);
107 rwakeup(&z->rpcfork);
108 qunlock(&z->lk);
110 while((p = _vtrecv(z)) != nil)
111 if(_vtqsend(q, p) < 0){
112 packetfree(p);
113 break;
115 qunlock(&z->inlk);
116 qlock(&z->lk);
117 _vtqhangup(q);
118 while((p = _vtnbqrecv(q)) != nil)
119 packetfree(p);
120 vtfree(q);
121 z->readq = nil;
122 rwakeup(&z->rpcfork);
123 qunlock(&z->lk);
124 vthangup(z);
127 void
128 vtsendproc(void *v)
130 Queue *q;
131 Packet *p;
132 VtConn *z;
134 z = v;
135 q = _vtqalloc();
137 qlock(&z->lk);
138 z->writeq = q;
139 qlock(&z->outlk);
140 rwakeup(&z->rpcfork);
141 qunlock(&z->lk);
143 while((p = _vtqrecv(q)) != nil)
144 if(_vtsend(z, p) < 0)
145 break;
146 qunlock(&z->outlk);
147 qlock(&z->lk);
148 _vtqhangup(q);
149 while((p = _vtnbqrecv(q)) != nil)
150 packetfree(p);
151 vtfree(q);
152 z->writeq = nil;
153 rwakeup(&z->rpcfork);
154 qunlock(&z->lk);
155 return;
158 Packet*
159 vtrecv(VtConn *z)
161 Packet *p;
163 qlock(&z->lk);
164 if(z->state != VtStateConnected){
165 werrstr("not connected");
166 qunlock(&z->lk);
167 return nil;
169 if(z->readq){
170 qunlock(&z->lk);
171 return _vtqrecv(z->readq);
174 qlock(&z->inlk);
175 qunlock(&z->lk);
176 p = _vtrecv(z);
177 qunlock(&z->inlk);
178 if(!p)
179 vthangup(z);
180 return p;
183 int
184 vtsend(VtConn *z, Packet *p)
186 qlock(&z->lk);
187 if(z->state != VtStateConnected){
188 packetfree(p);
189 werrstr("not connected");
190 qunlock(&z->lk);
191 return -1;
193 if(z->writeq){
194 qunlock(&z->lk);
195 if(_vtqsend(z->writeq, p) < 0){
196 packetfree(p);
197 return -1;
199 return 0;
202 qlock(&z->outlk);
203 qunlock(&z->lk);
204 if(_vtsend(z, p) < 0){
205 qunlock(&z->outlk);
206 vthangup(z);
207 return -1;
209 qunlock(&z->outlk);
210 return 0;