Blob


1 /*
2 * Sun RPC client.
3 */
4 #include <u.h>
5 #include <libc.h>
6 #include <thread.h>
7 #include <sunrpc.h>
9 typedef struct Out Out;
10 struct Out
11 {
12 char err[ERRMAX]; /* error string */
13 Channel *creply; /* send to finish rpc */
14 uchar *p; /* pending request packet */
15 int n; /* size of request */
16 ulong tag; /* flush tag of pending request */
17 ulong xid; /* xid of pending request */
18 ulong st; /* first send time */
19 ulong t; /* resend time */
20 int nresend; /* number of resends */
21 SunRpc rpc; /* response rpc */
22 };
24 static void
25 udpThread(void *v)
26 {
27 uchar *p, *buf;
28 Ioproc *io;
29 int n;
30 SunClient *cli;
31 enum { BufSize = 65536 };
33 cli = v;
34 buf = emalloc(BufSize);
35 io = ioproc();
36 p = nil;
37 for(;;){
38 n = ioread(io, cli->fd, buf, BufSize);
39 if(n <= 0)
40 break;
41 p = emalloc(4+n);
42 memmove(p+4, buf, n);
43 p[0] = n>>24;
44 p[1] = n>>16;
45 p[2] = n>>8;
46 p[3] = n;
47 if(sendp(cli->readchan, p) == 0)
48 break;
49 p = nil;
50 }
51 free(p);
52 closeioproc(io);
53 while(send(cli->dying, nil) == -1)
54 ;
55 }
57 static void
58 netThread(void *v)
59 {
60 uchar *p, buf[4];
61 Ioproc *io;
62 uint n, tot;
63 int done;
64 SunClient *cli;
66 cli = v;
67 io = ioproc();
68 tot = 0;
69 p = nil;
70 for(;;){
71 n = ioreadn(io, cli->fd, buf, 4);
72 if(n != 4)
73 break;
74 n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3];
75 if(cli->chatty)
76 fprint(2, "%.8ux...", n);
77 done = n&0x80000000;
78 n &= ~0x80000000;
79 if(tot == 0){
80 p = emalloc(4+n);
81 tot = 4;
82 }else
83 p = erealloc(p, tot+n);
84 if(ioreadn(io, cli->fd, p+tot, n) != n)
85 break;
86 tot += n;
87 if(done){
88 p[0] = tot>>24;
89 p[1] = tot>>16;
90 p[2] = tot>>8;
91 p[3] = tot;
92 if(sendp(cli->readchan, p) == 0)
93 break;
94 p = nil;
95 tot = 0;
96 }
97 }
98 free(p);
99 closeioproc(io);
100 while(send(cli->dying, 0) == -1)
104 static void
105 timerThread(void *v)
107 Ioproc *io;
108 SunClient *cli;
110 cli = v;
111 io = ioproc();
112 for(;;){
113 if(iosleep(io, 200) < 0)
114 break;
115 if(sendul(cli->timerchan, 0) == 0)
116 break;
118 closeioproc(io);
119 while(send(cli->dying, 0) == -1)
123 static ulong
124 msec(void)
126 return nsec()/1000000;
129 static ulong
130 twait(ulong rtt, int nresend)
132 ulong t;
134 t = rtt;
135 if(nresend <= 1)
136 {}
137 else if(nresend <= 3)
138 t *= 2;
139 else if(nresend <= 18)
140 t <<= nresend-2;
141 else
142 t = 60*1000;
143 if(t > 60*1000)
144 t = 60*1000;
146 return t;
149 static void
150 rpcMuxThread(void *v)
152 uchar *buf, *p, *ep;
153 int i, n, nout, mout;
154 ulong t, xidgen, tag;
155 Alt a[5];
156 Out *o, **out;
157 SunRpc rpc;
158 SunClient *cli;
160 cli = v;
161 mout = 16;
162 nout = 0;
163 out = emalloc(mout*sizeof(out[0]));
164 xidgen = truerand();
166 a[0].op = CHANRCV;
167 a[0].c = cli->rpcchan;
168 a[0].v = &o;
169 a[1].op = CHANNOP;
170 a[1].c = cli->timerchan;
171 a[1].v = nil;
172 a[2].op = CHANRCV;
173 a[2].c = cli->flushchan;
174 a[2].v = &tag;
175 a[3].op = CHANRCV;
176 a[3].c = cli->readchan;
177 a[3].v = &buf;
178 a[4].op = CHANEND;
180 for(;;){
181 switch(alt(a)){
182 case 0: /* o = <-rpcchan */
183 if(o == nil)
184 goto Done;
185 cli->nsend++;
186 /* set xid */
187 o->xid = ++xidgen;
188 if(cli->needcount)
189 p = o->p+4;
190 else
191 p = o->p;
192 p[0] = xidgen>>24;
193 p[1] = xidgen>>16;
194 p[2] = xidgen>>8;
195 p[3] = xidgen;
196 if(write(cli->fd, o->p, o->n) != o->n){
197 free(o->p);
198 o->p = nil;
199 snprint(o->err, sizeof o->err, "write: %r");
200 sendp(o->creply, 0);
201 break;
203 if(nout >= mout){
204 mout *= 2;
205 out = erealloc(out, mout*sizeof(out[0]));
207 o->st = msec();
208 o->nresend = 0;
209 o->t = o->st + twait(cli->rtt.avg, 0);
210 if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t);
211 out[nout++] = o;
212 a[1].op = CHANRCV;
213 break;
215 case 1: /* <-timerchan */
216 t = msec();
217 for(i=0; i<nout; i++){
218 o = out[i];
219 if((int)(t - o->t) > 0){
220 if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t);
221 if(cli->maxwait && t - o->st >= cli->maxwait){
222 free(o->p);
223 o->p = nil;
224 strcpy(o->err, "timeout");
225 sendp(o->creply, 0);
226 out[i--] = out[--nout];
227 continue;
229 cli->nresend++;
230 o->nresend++;
231 o->t = t + twait(cli->rtt.avg, o->nresend);
232 if(write(cli->fd, o->p, o->n) != o->n){
233 free(o->p);
234 o->p = nil;
235 snprint(o->err, sizeof o->err, "rewrite: %r");
236 sendp(o->creply, 0);
237 out[i--] = out[--nout];
238 continue;
242 /* stop ticking if no work; rpcchan will turn it back on */
243 if(nout == 0)
244 a[1].op = CHANNOP;
245 break;
247 case 2: /* tag = <-flushchan */
248 for(i=0; i<nout; i++){
249 o = out[i];
250 if(o->tag == tag){
251 out[i--] = out[--nout];
252 strcpy(o->err, "flushed");
253 free(o->p);
254 o->p = nil;
255 sendp(o->creply, 0);
258 break;
260 case 3: /* buf = <-readchan */
261 p = buf;
262 n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3];
263 p += 4;
264 ep = p+n;
265 if(sunrpcunpack(p, ep, &p, &rpc) != SunSuccess){
266 fprint(2, "%s: in: %.*H unpack failed\n", argv0, n, buf+4);
267 free(buf);
268 break;
270 if(cli->chatty)
271 fprint(2, "in: %B\n", &rpc);
272 if(rpc.iscall){
273 fprint(2, "did not get reply\n");
274 free(buf);
275 break;
277 o = nil;
278 for(i=0; i<nout; i++){
279 o = out[i];
280 if(o->xid == rpc.xid)
281 break;
283 if(i==nout){
284 if(cli->chatty) fprint(2, "did not find waiting request\n");
285 free(buf);
286 break;
288 out[i] = out[--nout];
289 free(o->p);
290 o->p = nil;
291 o->rpc = rpc;
292 if(rpc.status == SunSuccess){
293 o->p = buf;
294 }else{
295 o->p = nil;
296 free(buf);
297 sunerrstr(rpc.status);
298 rerrstr(o->err, sizeof o->err);
300 sendp(o->creply, 0);
301 break;
304 Done:
305 free(out);
306 sendp(cli->dying, 0);
309 SunClient*
310 sundial(char *address)
312 int fd;
313 SunClient *cli;
315 if((fd = dial(address, 0, 0, 0)) < 0)
316 return nil;
318 cli = emalloc(sizeof(SunClient));
319 cli->fd = fd;
320 cli->maxwait = 15000;
321 cli->rtt.avg = 1000;
322 cli->dying = chancreate(sizeof(void*), 0);
323 cli->rpcchan = chancreate(sizeof(Out*), 0);
324 cli->timerchan = chancreate(sizeof(ulong), 0);
325 cli->flushchan = chancreate(sizeof(ulong), 0);
326 cli->readchan = chancreate(sizeof(uchar*), 0);
327 if(strstr(address, "udp!")){
328 cli->needcount = 0;
329 cli->nettid = threadcreate(udpThread, cli, SunStackSize);
330 cli->timertid = threadcreate(timerThread, cli, SunStackSize);
331 }else{
332 cli->needcount = 1;
333 cli->nettid = threadcreate(netThread, cli, SunStackSize);
334 /* assume reliable: don't need timer */
335 /* BUG: netThread should know how to redial */
337 threadcreate(rpcMuxThread, cli, SunStackSize);
339 return cli;
342 void
343 sunclientclose(SunClient *cli)
345 int n;
347 /*
348 * Threadints get you out of any stuck system calls
349 * or thread rendezvouses, but do nothing if the thread
350 * is in the ready state. Keep interrupting until it takes.
351 */
352 n = 0;
353 if(!cli->timertid)
354 n++;
355 while(n < 2){
356 /*
357 threadint(cli->nettid);
358 if(cli->timertid)
359 threadint(cli->timertid);
360 */
362 yield();
363 while(nbrecv(cli->dying, nil) == 1)
364 n++;
367 sendp(cli->rpcchan, 0);
368 recvp(cli->dying);
370 /* everyone's gone: clean up */
371 close(cli->fd);
372 chanfree(cli->flushchan);
373 chanfree(cli->readchan);
374 chanfree(cli->timerchan);
375 free(cli);
378 void
379 sunclientflushrpc(SunClient *cli, ulong tag)
381 sendul(cli->flushchan, tag);
384 void
385 sunclientprog(SunClient *cli, SunProg *p)
387 if(cli->nprog%16 == 0)
388 cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0]));
389 cli->prog[cli->nprog++] = p;
392 int
393 sunclientrpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree)
395 uchar *bp, *p, *ep;
396 int i, n1, n2, n, nn;
397 Out o;
398 SunProg *prog;
399 SunStatus ok;
401 for(i=0; i<cli->nprog; i++)
402 if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers)
403 break;
404 if(i==cli->nprog){
405 werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers);
406 return -1;
408 prog = cli->prog[i];
410 if(cli->chatty){
411 fprint(2, "out: %B\n", &tx->rpc);
412 fprint(2, "\t%C\n", tx);
415 n1 = sunrpcsize(&tx->rpc);
416 n2 = suncallsize(prog, tx);
418 n = n1+n2;
419 if(cli->needcount)
420 n += 4;
422 /*
423 * The dance with 100 is to leave some padding in case
424 * suncallsize is slightly underestimating. If this happens,
425 * the pack will succeed and then we can give a good size
426 * mismatch error below. Otherwise the pack fails with
427 * garbage args, which is less helpful.
428 */
429 bp = emalloc(n+100);
430 ep = bp+n+100;
431 p = bp;
432 if(cli->needcount){
433 nn = n-4;
434 p[0] = (nn>>24)|0x80;
435 p[1] = nn>>16;
436 p[2] = nn>>8;
437 p[3] = nn;
438 p += 4;
440 if((ok = sunrpcpack(p, ep, &p, &tx->rpc)) != SunSuccess
441 || (ok = suncallpack(prog, p, ep, &p, tx)) != SunSuccess){
442 sunerrstr(ok);
443 free(bp);
444 return -1;
446 ep -= 100;
447 if(p != ep){
448 werrstr("rpc: packet size mismatch %d %ld %ld", n, ep-bp, p-bp);
449 free(bp);
450 return -1;
453 memset(&o, 0, sizeof o);
454 o.creply = chancreate(sizeof(void*), 0);
455 o.tag = tag;
456 o.p = bp;
457 o.n = n;
459 sendp(cli->rpcchan, &o);
460 recvp(o.creply);
461 chanfree(o.creply);
463 if(o.p == nil){
464 werrstr("%s", o.err);
465 return -1;
468 p = o.rpc.data;
469 ep = p+o.rpc.ndata;
470 rx->rpc = o.rpc;
471 rx->rpc.proc = tx->rpc.proc;
472 rx->rpc.prog = tx->rpc.prog;
473 rx->rpc.vers = tx->rpc.vers;
474 rx->type = (rx->rpc.proc<<1)|1;
475 if(rx->rpc.status != SunSuccess){
476 sunerrstr(rx->rpc.status);
477 werrstr("unpack: %r");
478 free(o.p);
479 return -1;
482 if((ok = suncallunpack(prog, p, ep, &p, rx)) != SunSuccess){
483 sunerrstr(ok);
484 werrstr("unpack: %r");
485 free(o.p);
486 return -1;
489 if(cli->chatty){
490 fprint(2, "in: %B\n", &rx->rpc);
491 fprint(2, "in:\t%C\n", rx);
494 if(tofree)
495 *tofree = o.p;
496 else
497 free(o.p);
499 return 0;