Blob
1 #include <u.h>2 #include <libc.h>3 #include <venti.h>4 #include "queue.h"6 long ventisendbytes, ventisendpackets;7 long ventirecvbytes, ventirecvpackets;9 static int10 _vtsend(VtConn *z, Packet *p)11 {12 IOchunk ioc;13 int n;14 uchar buf[2];17 if(z->state != VtStateConnected) {18 werrstr("session not connected");19 return -1;20 }22 /* add framing */23 n = packetsize(p);24 if(n >= (1<<16)) {25 werrstr("packet too large");26 packetfree(p);27 return -1;28 }29 buf[0] = n>>8;30 buf[1] = n;31 packetprefix(p, buf, 2);32 ventisendbytes += n+2;33 ventisendpackets++;35 for(;;){36 n = packetfragments(p, &ioc, 1, 0);37 if(n == 0)38 break;39 if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){40 packetfree(p);41 return 0;42 }43 packetconsume(p, nil, ioc.len);44 }45 packetfree(p);46 return 1;47 }49 static int50 interrupted(void)51 {52 char e[ERRMAX];54 rerrstr(e, sizeof e);55 return strstr(e, "interrupted") != nil;56 }59 static Packet*60 _vtrecv(VtConn *z)61 {62 uchar buf[10], *b;63 int n;64 Packet *p;65 int size, len;67 if(z->state != VtStateConnected) {68 werrstr("session not connected");69 return nil;70 }72 p = z->part;73 /* get enough for head size */74 size = packetsize(p);75 while(size < 2) {76 b = packettrailer(p, MaxFragSize);77 assert(b != nil);78 if(0) fprint(2, "%d read hdr\n", getpid());79 n = read(z->infd, b, MaxFragSize);80 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);81 if(n==0 || (n<0 && !interrupted()))82 goto Err;83 size += n;84 packettrim(p, 0, size);85 }87 if(packetconsume(p, buf, 2) < 0)88 goto Err;89 len = (buf[0] << 8) | buf[1];90 size -= 2;92 while(size < len) {93 // n = len - size;94 // if(n > MaxFragSize)95 n = MaxFragSize;96 b = packettrailer(p, n);97 if(0) fprint(2, "%d read body %d\n", getpid(), n);98 n = read(z->infd, b, n);99 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);100 if(n > 0)101 size += n;102 packettrim(p, 0, size);103 if(n==0 || (n<0 && !interrupted()))104 goto Err;105 }106 ventirecvbytes += len;107 ventirecvpackets++;108 p = packetsplit(p, len);109 return p;110 Err:111 return nil;112 }114 /*115 * If you fork off two procs running vtrecvproc and vtsendproc,116 * then vtrecv/vtsend (and thus vtrpc) will never block except on117 * rendevouses, which is nice when it's running in one thread of many.118 */119 void120 vtrecvproc(void *v)121 {122 Packet *p;123 VtConn *z;124 Queue *q;126 z = v;127 q = _vtqalloc();129 qlock(&z->lk);130 z->readq = q;131 qlock(&z->inlk);132 rwakeup(&z->rpcfork);133 qunlock(&z->lk);135 while((p = _vtrecv(z)) != nil)136 if(_vtqsend(q, p) < 0){137 packetfree(p);138 break;139 }140 qunlock(&z->inlk);141 qlock(&z->lk);142 _vtqhangup(q);143 while((p = _vtnbqrecv(q)) != nil)144 packetfree(p);145 vtfree(q);146 z->readq = nil;147 rwakeup(&z->rpcfork);148 qunlock(&z->lk);149 vthangup(z);150 }152 void153 vtsendproc(void *v)154 {155 Queue *q;156 Packet *p;157 VtConn *z;159 z = v;160 q = _vtqalloc();162 qlock(&z->lk);163 z->writeq = q;164 qlock(&z->outlk);165 rwakeup(&z->rpcfork);166 qunlock(&z->lk);168 while((p = _vtqrecv(q)) != nil)169 if(_vtsend(z, p) < 0)170 break;171 qunlock(&z->outlk);172 qlock(&z->lk);173 _vtqhangup(q);174 while((p = _vtnbqrecv(q)) != nil)175 packetfree(p);176 vtfree(q);177 z->writeq = nil;178 rwakeup(&z->rpcfork);179 qunlock(&z->lk);180 return;181 }183 Packet*184 vtrecv(VtConn *z)185 {186 Packet *p;188 qlock(&z->lk);189 if(z->state != VtStateConnected){190 werrstr("not connected");191 qunlock(&z->lk);192 return nil;193 }194 if(z->readq){195 qunlock(&z->lk);196 return _vtqrecv(z->readq);197 }199 qlock(&z->inlk);200 qunlock(&z->lk);201 p = _vtrecv(z);202 qunlock(&z->inlk);203 if(!p)204 vthangup(z);205 return p;206 }208 int209 vtsend(VtConn *z, Packet *p)210 {211 qlock(&z->lk);212 if(z->state != VtStateConnected){213 packetfree(p);214 werrstr("not connected");215 qunlock(&z->lk);216 return -1;217 }218 if(z->writeq){219 qunlock(&z->lk);220 if(_vtqsend(z->writeq, p) < 0){221 packetfree(p);222 return -1;223 }224 return 0;225 }227 qlock(&z->outlk);228 qunlock(&z->lk);229 if(_vtsend(z, p) < 0){230 qunlock(&z->outlk);231 vthangup(z);232 return -1;233 }234 qunlock(&z->outlk);235 return 0;236 }