Blob
1 #include <u.h>2 #include <errno.h>3 #include <libc.h>4 #include <venti.h>5 #include "queue.h"7 long ventisendbytes, ventisendpackets;8 long ventirecvbytes, ventirecvpackets;10 static int11 _vtsend(VtConn *z, Packet *p)12 {13 IOchunk ioc;14 int n;15 uchar buf[2];18 if(z->state != VtStateConnected) {19 werrstr("session not connected");20 return -1;21 }23 /* add framing */24 n = packetsize(p);25 if(n >= (1<<16)) {26 werrstr("packet too large");27 packetfree(p);28 return -1;29 }30 buf[0] = n>>8;31 buf[1] = n;32 packetprefix(p, buf, 2);33 ventisendbytes += n+2;34 ventisendpackets++;36 for(;;){37 n = packetfragments(p, &ioc, 1, 0);38 if(n == 0)39 break;40 if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){41 packetfree(p);42 return 0;43 }44 packetconsume(p, nil, ioc.len);45 }46 packetfree(p);47 return 1;48 }50 static Packet*51 _vtrecv(VtConn *z)52 {53 uchar buf[10], *b;54 int n;55 Packet *p;56 int size, len;58 if(z->state != VtStateConnected) {59 werrstr("session not connected");60 return nil;61 }63 p = z->part;64 /* get enough for head size */65 size = packetsize(p);66 while(size < 2) {67 b = packettrailer(p, MaxFragSize);68 assert(b != nil);69 if(0) fprint(2, "%d read hdr\n", getpid());70 n = read(z->infd, b, MaxFragSize);71 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);72 if(n==0 || (n<0 && errno!=EINTR))73 goto Err;74 size += n;75 packettrim(p, 0, size);76 }78 if(packetconsume(p, buf, 2) < 0)79 goto Err;80 len = (buf[0] << 8) | buf[1];81 size -= 2;83 while(size < len) {84 // n = len - size;85 // if(n > MaxFragSize)86 n = MaxFragSize;87 b = packettrailer(p, n);88 if(0) fprint(2, "%d read body %d\n", getpid(), n);89 n = read(z->infd, b, n);90 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);91 if(n > 0)92 size += n;93 packettrim(p, 0, size);94 if(n==0 || (n<0 && errno!=EINTR))95 goto Err;96 }97 ventirecvbytes += len;98 ventirecvpackets++;99 p = packetsplit(p, len);100 return p;101 Err:102 return nil;103 }105 /*106 * If you fork off two procs running vtrecvproc and vtsendproc,107 * then vtrecv/vtsend (and thus vtrpc) will never block except on108 * rendevouses, which is nice when it's running in one thread of many.109 */110 void111 vtrecvproc(void *v)112 {113 Packet *p;114 VtConn *z;115 Queue *q;117 z = v;118 q = _vtqalloc();120 qlock(&z->lk);121 z->readq = q;122 qlock(&z->inlk);123 rwakeup(&z->rpcfork);124 qunlock(&z->lk);126 while((p = _vtrecv(z)) != nil)127 if(_vtqsend(q, p) < 0){128 packetfree(p);129 break;130 }131 qunlock(&z->inlk);132 qlock(&z->lk);133 _vtqhangup(q);134 while((p = _vtnbqrecv(q)) != nil)135 packetfree(p);136 vtfree(q);137 z->readq = nil;138 rwakeup(&z->rpcfork);139 qunlock(&z->lk);140 vthangup(z);141 }143 void144 vtsendproc(void *v)145 {146 Queue *q;147 Packet *p;148 VtConn *z;150 z = v;151 q = _vtqalloc();153 qlock(&z->lk);154 z->writeq = q;155 qlock(&z->outlk);156 rwakeup(&z->rpcfork);157 qunlock(&z->lk);159 while((p = _vtqrecv(q)) != nil)160 if(_vtsend(z, p) < 0)161 break;162 qunlock(&z->outlk);163 qlock(&z->lk);164 _vtqhangup(q);165 while((p = _vtnbqrecv(q)) != nil)166 packetfree(p);167 vtfree(q);168 z->writeq = nil;169 rwakeup(&z->rpcfork);170 qunlock(&z->lk);171 return;172 }174 Packet*175 vtrecv(VtConn *z)176 {177 Packet *p;179 qlock(&z->lk);180 if(z->state != VtStateConnected){181 werrstr("not connected");182 qunlock(&z->lk);183 return nil;184 }185 if(z->readq){186 qunlock(&z->lk);187 return _vtqrecv(z->readq);188 }190 qlock(&z->inlk);191 qunlock(&z->lk);192 p = _vtrecv(z);193 qunlock(&z->inlk);194 if(!p)195 vthangup(z);196 return p;197 }199 int200 vtsend(VtConn *z, Packet *p)201 {202 qlock(&z->lk);203 if(z->state != VtStateConnected){204 packetfree(p);205 werrstr("not connected");206 qunlock(&z->lk);207 return -1;208 }209 if(z->writeq){210 qunlock(&z->lk);211 if(_vtqsend(z->writeq, p) < 0){212 packetfree(p);213 return -1;214 }215 return 0;216 }218 qlock(&z->outlk);219 qunlock(&z->lk);220 if(_vtsend(z, p) < 0){221 qunlock(&z->outlk);222 vthangup(z);223 return -1;224 }225 qunlock(&z->outlk);226 return 0;227 }