9 typedef struct Out Out;
12 char err[ERRMAX]; /* error string */
13 Channel *creply; /* send to finish rpc */
14 uchar *p; /* pending request packet */
15 int n; /* size of request */
16 ulong tag; /* flush tag of pending request */
17 ulong xid; /* xid of pending request */
18 ulong st; /* first send time */
19 ulong t; /* resend time */
20 int nresend; /* number of resends */
21 SunRpc rpc; /* response rpc */
31 enum { BufSize = 65536 };
34 buf = emalloc(BufSize);
38 n = ioread(io, cli->fd, buf, BufSize);
47 if(sendp(cli->readchan, p) == 0)
53 while(send(cli->dying, nil) == -1)
71 n = ioreadn(io, cli->fd, buf, 4);
74 n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3];
76 fprint(2, "%.8ux...", n);
83 p = erealloc(p, tot+n);
84 if(ioreadn(io, cli->fd, p+tot, n) != n)
92 if(sendp(cli->readchan, p) == 0)
100 while(send(cli->dying, 0) == -1)
113 if(iosleep(io, 200) < 0)
115 if(sendul(cli->timerchan, 0) == 0)
119 while(send(cli->dying, 0) == -1)
126 return nsec()/1000000;
130 twait(ulong rtt, int nresend)
137 else if(nresend <= 3)
139 else if(nresend <= 18)
150 rpcMuxThread(void *v)
153 int i, n, nout, mout;
154 ulong t, xidgen, tag;
164 out = emalloc(mout*sizeof(out[0]));
168 a[0].c = cli->rpcchan;
171 a[1].c = cli->timerchan;
174 a[2].c = cli->flushchan;
177 a[3].c = cli->readchan;
183 case 0: /* o = <-rpcchan */
197 if(write(cli->fd, o->p, o->n) != o->n){
200 snprint(o->err, sizeof o->err, "write: %r");
206 out = erealloc(out, mout*sizeof(out[0]));
210 o->t = o->st + twait(cli->rtt.avg, 0);
211 if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t);
216 case 1: /* <-timerchan */
218 for(i=0; i<nout; i++){
220 if((int)(t - o->t) > 0){
221 if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t);
222 if(cli->maxwait && t - o->st >= cli->maxwait){
225 strcpy(o->err, "timeout");
227 out[i--] = out[--nout];
232 o->t = t + twait(cli->rtt.avg, o->nresend);
233 if(write(cli->fd, o->p, o->n) != o->n){
236 snprint(o->err, sizeof o->err, "rewrite: %r");
238 out[i--] = out[--nout];
243 /* stop ticking if no work; rpcchan will turn it back on */
248 case 2: /* tag = <-flushchan */
249 for(i=0; i<nout; i++){
252 out[i--] = out[--nout];
253 strcpy(o->err, "flushed");
261 case 3: /* buf = <-readchan */
263 n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3];
266 if((ok = sunrpcunpack(p, ep, &p, &rpc)) != SunSuccess){
267 fprint(2, "%s: in: %.*H unpack failed\n", argv0, n, buf+4);
272 fprint(2, "in: %B\n", &rpc);
274 fprint(2, "did not get reply\n");
279 for(i=0; i<nout; i++){
281 if(o->xid == rpc.xid)
285 if(cli->chatty) fprint(2, "did not find waiting request\n");
289 out[i] = out[--nout];
293 if(rpc.status == SunSuccess){
298 sunerrstr(rpc.status);
299 rerrstr(o->err, sizeof o->err);
307 sendp(cli->dying, 0);
311 sundial(char *address)
316 if((fd = dial(address, 0, 0, 0)) < 0)
319 cli = emalloc(sizeof(SunClient));
321 cli->maxwait = 15000;
323 cli->dying = chancreate(sizeof(void*), 0);
324 cli->rpcchan = chancreate(sizeof(Out*), 0);
325 cli->timerchan = chancreate(sizeof(ulong), 0);
326 cli->flushchan = chancreate(sizeof(ulong), 0);
327 cli->readchan = chancreate(sizeof(uchar*), 0);
328 if(strstr(address, "udp!")){
330 cli->nettid = threadcreate(udpThread, cli, SunStackSize);
331 cli->timertid = threadcreate(timerThread, cli, SunStackSize);
334 cli->nettid = threadcreate(netThread, cli, SunStackSize);
335 /* assume reliable: don't need timer */
336 /* BUG: netThread should know how to redial */
338 threadcreate(rpcMuxThread, cli, SunStackSize);
344 sunclientclose(SunClient *cli)
349 * Threadints get you out of any stuck system calls
350 * or thread rendezvouses, but do nothing if the thread
351 * is in the ready state. Keep interrupting until it takes.
358 threadint(cli->nettid);
360 threadint(cli->timertid);
364 while(nbrecv(cli->dying, nil) == 1)
368 sendp(cli->rpcchan, 0);
371 /* everyone's gone: clean up */
373 chanfree(cli->flushchan);
374 chanfree(cli->readchan);
375 chanfree(cli->timerchan);
380 sunclientflushrpc(SunClient *cli, ulong tag)
382 sendul(cli->flushchan, tag);
386 sunclientprog(SunClient *cli, SunProg *p)
388 if(cli->nprog%16 == 0)
389 cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0]));
390 cli->prog[cli->nprog++] = p;
394 sunclientrpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree)
397 int i, n1, n2, n, nn;
402 for(i=0; i<cli->nprog; i++)
403 if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers)
406 werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers);
412 fprint(2, "out: %B\n", &tx->rpc);
413 fprint(2, "\t%C\n", tx);
416 n1 = sunrpcsize(&tx->rpc);
417 n2 = suncallsize(prog, tx);
424 * The dance with 100 is to leave some padding in case
425 * suncallsize is slightly underestimating. If this happens,
426 * the pack will succeed and then we can give a good size
427 * mismatch error below. Otherwise the pack fails with
428 * garbage args, which is less helpful.
435 p[0] = (nn>>24)|0x80;
441 if((ok = sunrpcpack(p, ep, &p, &tx->rpc)) != SunSuccess
442 || (ok = suncallpack(prog, p, ep, &p, tx)) != SunSuccess){
449 werrstr("rpc: packet size mismatch %d %ld %ld", n, ep-bp, p-bp);
454 memset(&o, 0, sizeof o);
455 o.creply = chancreate(sizeof(void*), 0);
460 sendp(cli->rpcchan, &o);
465 werrstr("%s", o.err);
472 rx->rpc.proc = tx->rpc.proc;
473 rx->rpc.prog = tx->rpc.prog;
474 rx->rpc.vers = tx->rpc.vers;
475 rx->type = (rx->rpc.proc<<1)|1;
476 if(rx->rpc.status != SunSuccess){
477 sunerrstr(rx->rpc.status);
478 werrstr("unpack: %r");
483 if((ok = suncallunpack(prog, p, ep, &p, rx)) != SunSuccess){
485 werrstr("unpack: %r");
491 fprint(2, "in: %B\n", &rx->rpc);
492 fprint(2, "in:\t%C\n", rx);