Blame


1 e342de7d 2015-11-11 adrien package p9p
2 e342de7d 2015-11-11 adrien
3 e342de7d 2015-11-11 adrien import (
4 e342de7d 2015-11-11 adrien "fmt"
5 e342de7d 2015-11-11 adrien "log"
6 e342de7d 2015-11-11 adrien "net"
7 e342de7d 2015-11-11 adrien
8 e342de7d 2015-11-11 adrien "golang.org/x/net/context"
9 e342de7d 2015-11-11 adrien )
10 e342de7d 2015-11-11 adrien
11 e342de7d 2015-11-11 adrien // roundTripper manages the request and response from the client-side. A
12 e342de7d 2015-11-11 adrien // roundTripper must abide by similar rules to the http.RoundTripper.
13 e342de7d 2015-11-11 adrien // Typically, the roundTripper will manage tag assignment and message
14 e342de7d 2015-11-11 adrien // serialization.
15 e342de7d 2015-11-11 adrien type roundTripper interface {
16 e342de7d 2015-11-11 adrien send(ctx context.Context, msg Message) (Message, error)
17 e342de7d 2015-11-11 adrien }
18 e342de7d 2015-11-11 adrien
19 e342de7d 2015-11-11 adrien // transport plays the role of being a client channel manager. It multiplexes
20 e342de7d 2015-11-11 adrien // function calls onto the wire and dispatches responses to blocking calls to
21 e342de7d 2015-11-11 adrien // send. On the whole, transport is thread-safe for calling send
22 e342de7d 2015-11-11 adrien type transport struct {
23 e342de7d 2015-11-11 adrien ctx context.Context
24 e342de7d 2015-11-11 adrien ch Channel
25 e342de7d 2015-11-11 adrien requests chan *fcallRequest
26 e342de7d 2015-11-11 adrien closed chan struct{}
27 e342de7d 2015-11-11 adrien
28 e342de7d 2015-11-11 adrien tags uint16
29 e342de7d 2015-11-11 adrien }
30 e342de7d 2015-11-11 adrien
31 e342de7d 2015-11-11 adrien var _ roundTripper = &transport{}
32 e342de7d 2015-11-11 adrien
33 e342de7d 2015-11-11 adrien func newTransport(ctx context.Context, ch *channel) roundTripper {
34 e342de7d 2015-11-11 adrien t := &transport{
35 e342de7d 2015-11-11 adrien ctx: ctx,
36 e342de7d 2015-11-11 adrien ch: ch,
37 e342de7d 2015-11-11 adrien requests: make(chan *fcallRequest),
38 e342de7d 2015-11-11 adrien closed: make(chan struct{}),
39 e342de7d 2015-11-11 adrien }
40 e342de7d 2015-11-11 adrien
41 e342de7d 2015-11-11 adrien go t.handle()
42 e342de7d 2015-11-11 adrien
43 e342de7d 2015-11-11 adrien return t
44 e342de7d 2015-11-11 adrien }
45 e342de7d 2015-11-11 adrien
46 e342de7d 2015-11-11 adrien type fcallRequest struct {
47 e342de7d 2015-11-11 adrien ctx context.Context
48 e342de7d 2015-11-11 adrien fcall *Fcall
49 e342de7d 2015-11-11 adrien response chan *Fcall
50 e342de7d 2015-11-11 adrien err chan error
51 e342de7d 2015-11-11 adrien }
52 e342de7d 2015-11-11 adrien
53 e342de7d 2015-11-11 adrien func newFcallRequest(ctx context.Context, fcall *Fcall) *fcallRequest {
54 e342de7d 2015-11-11 adrien return &fcallRequest{
55 e342de7d 2015-11-11 adrien ctx: ctx,
56 e342de7d 2015-11-11 adrien fcall: fcall,
57 e342de7d 2015-11-11 adrien response: make(chan *Fcall, 1),
58 e342de7d 2015-11-11 adrien err: make(chan error, 1),
59 e342de7d 2015-11-11 adrien }
60 e342de7d 2015-11-11 adrien }
61 e342de7d 2015-11-11 adrien
62 e342de7d 2015-11-11 adrien func (t *transport) send(ctx context.Context, msg Message) (Message, error) {
63 e342de7d 2015-11-11 adrien fcall := newFcall(msg)
64 e342de7d 2015-11-11 adrien req := newFcallRequest(ctx, fcall)
65 e342de7d 2015-11-11 adrien
66 e342de7d 2015-11-11 adrien // dispatch the request.
67 e342de7d 2015-11-11 adrien select {
68 e342de7d 2015-11-11 adrien case <-t.closed:
69 e342de7d 2015-11-11 adrien return nil, ErrClosed
70 e342de7d 2015-11-11 adrien case <-ctx.Done():
71 e342de7d 2015-11-11 adrien return nil, ctx.Err()
72 e342de7d 2015-11-11 adrien case t.requests <- req:
73 e342de7d 2015-11-11 adrien }
74 e342de7d 2015-11-11 adrien
75 e342de7d 2015-11-11 adrien // wait for the response.
76 e342de7d 2015-11-11 adrien select {
77 e342de7d 2015-11-11 adrien case <-t.closed:
78 e342de7d 2015-11-11 adrien return nil, ErrClosed
79 e342de7d 2015-11-11 adrien case <-ctx.Done():
80 e342de7d 2015-11-11 adrien return nil, ctx.Err()
81 e342de7d 2015-11-11 adrien case err := <-req.err:
82 e342de7d 2015-11-11 adrien return nil, err
83 e342de7d 2015-11-11 adrien case resp := <-req.response:
84 e342de7d 2015-11-11 adrien if resp.Type == Rerror {
85 e342de7d 2015-11-11 adrien // pack the error into something useful
86 e342de7d 2015-11-11 adrien respmesg, ok := resp.Message.(MessageRerror)
87 e342de7d 2015-11-11 adrien if !ok {
88 e342de7d 2015-11-11 adrien return nil, fmt.Errorf("invalid error response: %v", resp)
89 e342de7d 2015-11-11 adrien }
90 e342de7d 2015-11-11 adrien
91 e342de7d 2015-11-11 adrien return nil, respmesg
92 e342de7d 2015-11-11 adrien }
93 e342de7d 2015-11-11 adrien
94 e342de7d 2015-11-11 adrien return resp.Message, nil
95 e342de7d 2015-11-11 adrien }
96 e342de7d 2015-11-11 adrien }
97 e342de7d 2015-11-11 adrien
98 e342de7d 2015-11-11 adrien // handle takes messages off the wire and wakes up the waiting tag call.
99 e342de7d 2015-11-11 adrien func (t *transport) handle() {
100 e342de7d 2015-11-11 adrien defer func() {
101 e342de7d 2015-11-11 adrien log.Println("exited handle loop")
102 e342de7d 2015-11-11 adrien t.Close()
103 e342de7d 2015-11-11 adrien }()
104 e342de7d 2015-11-11 adrien // the following variable block are protected components owned by this thread.
105 e342de7d 2015-11-11 adrien var (
106 e342de7d 2015-11-11 adrien responses = make(chan *Fcall)
107 e342de7d 2015-11-11 adrien tags Tag
108 e342de7d 2015-11-11 adrien // outstanding provides a map of tags to outstanding requests.
109 e342de7d 2015-11-11 adrien outstanding = map[Tag]*fcallRequest{}
110 e342de7d 2015-11-11 adrien )
111 e342de7d 2015-11-11 adrien
112 e342de7d 2015-11-11 adrien // loop to read messages off of the connection
113 e342de7d 2015-11-11 adrien go func() {
114 e342de7d 2015-11-11 adrien defer func() {
115 e342de7d 2015-11-11 adrien log.Println("exited read loop")
116 e342de7d 2015-11-11 adrien t.Close()
117 e342de7d 2015-11-11 adrien }()
118 e342de7d 2015-11-11 adrien loop:
119 e342de7d 2015-11-11 adrien for {
120 e342de7d 2015-11-11 adrien fcall := new(Fcall)
121 e342de7d 2015-11-11 adrien if err := t.ch.ReadFcall(t.ctx, fcall); err != nil {
122 e342de7d 2015-11-11 adrien switch err := err.(type) {
123 e342de7d 2015-11-11 adrien case net.Error:
124 e342de7d 2015-11-11 adrien if err.Timeout() || err.Temporary() {
125 e342de7d 2015-11-11 adrien // BUG(stevvooe): There may be partial reads under
126 e342de7d 2015-11-11 adrien // timeout errors where this is actually fatal.
127 e342de7d 2015-11-11 adrien
128 e342de7d 2015-11-11 adrien // can only retry if we haven't offset the frame.
129 e342de7d 2015-11-11 adrien continue loop
130 e342de7d 2015-11-11 adrien }
131 e342de7d 2015-11-11 adrien }
132 e342de7d 2015-11-11 adrien
133 e342de7d 2015-11-11 adrien log.Println("fatal error reading msg:", err)
134 e342de7d 2015-11-11 adrien t.Close()
135 e342de7d 2015-11-11 adrien return
136 e342de7d 2015-11-11 adrien }
137 e342de7d 2015-11-11 adrien
138 e342de7d 2015-11-11 adrien select {
139 e342de7d 2015-11-11 adrien case <-t.ctx.Done():
140 e342de7d 2015-11-11 adrien log.Println("ctx done")
141 e342de7d 2015-11-11 adrien return
142 e342de7d 2015-11-11 adrien case <-t.closed:
143 e342de7d 2015-11-11 adrien log.Println("transport closed")
144 e342de7d 2015-11-11 adrien return
145 e342de7d 2015-11-11 adrien case responses <- fcall:
146 e342de7d 2015-11-11 adrien }
147 e342de7d 2015-11-11 adrien }
148 e342de7d 2015-11-11 adrien }()
149 e342de7d 2015-11-11 adrien
150 e342de7d 2015-11-11 adrien for {
151 e342de7d 2015-11-11 adrien log.Println("wait...")
152 e342de7d 2015-11-11 adrien select {
153 e342de7d 2015-11-11 adrien case req := <-t.requests:
154 e342de7d 2015-11-11 adrien if req.fcall.Tag == NOTAG {
155 e342de7d 2015-11-11 adrien // NOTE(stevvooe): We disallow fcalls with NOTAG to come
156 e342de7d 2015-11-11 adrien // through this path since we can't join the tagged response
157 e342de7d 2015-11-11 adrien // with the waiting caller. This is typically used for the
158 e342de7d 2015-11-11 adrien // Tversion/Rversion round trip to setup a session.
159 e342de7d 2015-11-11 adrien //
160 e342de7d 2015-11-11 adrien // It may be better to allow these through but block all
161 e342de7d 2015-11-11 adrien // requests until a notag message has a response.
162 e342de7d 2015-11-11 adrien
163 e342de7d 2015-11-11 adrien req.err <- fmt.Errorf("disallowed tag through transport")
164 e342de7d 2015-11-11 adrien continue
165 e342de7d 2015-11-11 adrien }
166 e342de7d 2015-11-11 adrien
167 e342de7d 2015-11-11 adrien // BUG(stevvooe): This is an awful tag allocation procedure.
168 e342de7d 2015-11-11 adrien // Replace this with something that let's us allocate tags and
169 e342de7d 2015-11-11 adrien // associate data with them, returning to them to a pool when
170 e342de7d 2015-11-11 adrien // complete. Such a system would provide a lot of information
171 e342de7d 2015-11-11 adrien // about outstanding requests.
172 e342de7d 2015-11-11 adrien tags++
173 e342de7d 2015-11-11 adrien req.fcall.Tag = tags
174 e342de7d 2015-11-11 adrien outstanding[req.fcall.Tag] = req
175 e342de7d 2015-11-11 adrien
176 e342de7d 2015-11-11 adrien // TODO(stevvooe): Consider the case of requests that never
177 e342de7d 2015-11-11 adrien // receive a response. We need to remove the fcall context from
178 e342de7d 2015-11-11 adrien // the tag map and dealloc the tag. We may also want to send a
179 e342de7d 2015-11-11 adrien // flush for the tag.
180 e342de7d 2015-11-11 adrien if err := t.ch.WriteFcall(req.ctx, req.fcall); err != nil {
181 e342de7d 2015-11-11 adrien delete(outstanding, req.fcall.Tag)
182 e342de7d 2015-11-11 adrien req.err <- err
183 e342de7d 2015-11-11 adrien }
184 e342de7d 2015-11-11 adrien case b := <-responses:
185 e342de7d 2015-11-11 adrien req, ok := outstanding[b.Tag]
186 e342de7d 2015-11-11 adrien if !ok {
187 e342de7d 2015-11-11 adrien panic("unknown tag received")
188 e342de7d 2015-11-11 adrien }
189 e342de7d 2015-11-11 adrien delete(outstanding, req.fcall.Tag)
190 e342de7d 2015-11-11 adrien
191 e342de7d 2015-11-11 adrien req.response <- b
192 e342de7d 2015-11-11 adrien
193 e342de7d 2015-11-11 adrien // TODO(stevvooe): Reclaim tag id.
194 e342de7d 2015-11-11 adrien case <-t.ctx.Done():
195 e342de7d 2015-11-11 adrien return
196 e342de7d 2015-11-11 adrien case <-t.closed:
197 e342de7d 2015-11-11 adrien return
198 e342de7d 2015-11-11 adrien }
199 e342de7d 2015-11-11 adrien }
200 e342de7d 2015-11-11 adrien }
201 e342de7d 2015-11-11 adrien
202 e342de7d 2015-11-11 adrien func (t *transport) flush(ctx context.Context, tag Tag) error {
203 e342de7d 2015-11-11 adrien // TODO(stevvooe): We need to fire and forget flush messages when a call
204 e342de7d 2015-11-11 adrien // context gets cancelled.
205 e342de7d 2015-11-11 adrien panic("not implemented")
206 e342de7d 2015-11-11 adrien }
207 e342de7d 2015-11-11 adrien
208 e342de7d 2015-11-11 adrien func (t *transport) Close() error {
209 e342de7d 2015-11-11 adrien select {
210 e342de7d 2015-11-11 adrien case <-t.closed:
211 e342de7d 2015-11-11 adrien return ErrClosed
212 e342de7d 2015-11-11 adrien case <-t.ctx.Done():
213 e342de7d 2015-11-11 adrien return t.ctx.Err()
214 e342de7d 2015-11-11 adrien default:
215 e342de7d 2015-11-11 adrien close(t.closed)
216 e342de7d 2015-11-11 adrien }
217 e342de7d 2015-11-11 adrien
218 e342de7d 2015-11-11 adrien return nil
219 e342de7d 2015-11-11 adrien }