Blame


1 e6bcde66 2015-10-29 stephen.d package p9pnew
2 e6bcde66 2015-10-29 stephen.d
3 e6bcde66 2015-10-29 stephen.d import (
4 e6bcde66 2015-10-29 stephen.d "fmt"
5 e6bcde66 2015-10-29 stephen.d "log"
6 e6bcde66 2015-10-29 stephen.d "net"
7 e6bcde66 2015-10-29 stephen.d
8 e6bcde66 2015-10-29 stephen.d "golang.org/x/net/context"
9 e6bcde66 2015-10-29 stephen.d )
10 e6bcde66 2015-10-29 stephen.d
11 e6bcde66 2015-10-29 stephen.d // roundTripper manages the request and response from the client-side. A
12 deb98ab4 2015-11-05 stephen.d // roundTripper must abide by similar rules to the http.RoundTripper.
13 e6bcde66 2015-10-29 stephen.d // Typically, the roundTripper will manage tag assignment and message
14 e6bcde66 2015-10-29 stephen.d // serialization.
15 e6bcde66 2015-10-29 stephen.d type roundTripper interface {
16 deb98ab4 2015-11-05 stephen.d send(ctx context.Context, msg Message) (Message, error)
17 e6bcde66 2015-10-29 stephen.d }
18 e6bcde66 2015-10-29 stephen.d
19 40d4a02d 2015-11-03 stephen.d // transport plays the role of being a client channel manager. It multiplexes
20 40d4a02d 2015-11-03 stephen.d // function calls onto the wire and dispatches responses to blocking calls to
21 40d4a02d 2015-11-03 stephen.d // send. On the whole, transport is thread-safe for calling send
22 e6bcde66 2015-10-29 stephen.d type transport struct {
23 e6bcde66 2015-10-29 stephen.d ctx context.Context
24 3bf22e58 2015-11-04 stephen.d ch Channel
25 e6bcde66 2015-10-29 stephen.d requests chan *fcallRequest
26 e6bcde66 2015-10-29 stephen.d closed chan struct{}
27 e6bcde66 2015-10-29 stephen.d
28 e6bcde66 2015-10-29 stephen.d tags uint16
29 e6bcde66 2015-10-29 stephen.d }
30 e6bcde66 2015-10-29 stephen.d
31 deb98ab4 2015-11-05 stephen.d var _ roundTripper = &transport{}
32 deb98ab4 2015-11-05 stephen.d
33 fb37ce2a 2015-10-30 stephen.d func newTransport(ctx context.Context, ch *channel) roundTripper {
34 e6bcde66 2015-10-29 stephen.d t := &transport{
35 e6bcde66 2015-10-29 stephen.d ctx: ctx,
36 fb37ce2a 2015-10-30 stephen.d ch: ch,
37 e6bcde66 2015-10-29 stephen.d requests: make(chan *fcallRequest),
38 e6bcde66 2015-10-29 stephen.d closed: make(chan struct{}),
39 e6bcde66 2015-10-29 stephen.d }
40 e6bcde66 2015-10-29 stephen.d
41 e6bcde66 2015-10-29 stephen.d go t.handle()
42 e6bcde66 2015-10-29 stephen.d
43 e6bcde66 2015-10-29 stephen.d return t
44 e6bcde66 2015-10-29 stephen.d }
45 e6bcde66 2015-10-29 stephen.d
46 e6bcde66 2015-10-29 stephen.d type fcallRequest struct {
47 e6bcde66 2015-10-29 stephen.d ctx context.Context
48 e6bcde66 2015-10-29 stephen.d fcall *Fcall
49 e6bcde66 2015-10-29 stephen.d response chan *Fcall
50 e6bcde66 2015-10-29 stephen.d err chan error
51 e6bcde66 2015-10-29 stephen.d }
52 e6bcde66 2015-10-29 stephen.d
53 e6bcde66 2015-10-29 stephen.d func newFcallRequest(ctx context.Context, fcall *Fcall) *fcallRequest {
54 e6bcde66 2015-10-29 stephen.d return &fcallRequest{
55 e6bcde66 2015-10-29 stephen.d ctx: ctx,
56 e6bcde66 2015-10-29 stephen.d fcall: fcall,
57 e6bcde66 2015-10-29 stephen.d response: make(chan *Fcall, 1),
58 e6bcde66 2015-10-29 stephen.d err: make(chan error, 1),
59 e6bcde66 2015-10-29 stephen.d }
60 e6bcde66 2015-10-29 stephen.d }
61 e6bcde66 2015-10-29 stephen.d
62 deb98ab4 2015-11-05 stephen.d func (t *transport) send(ctx context.Context, msg Message) (Message, error) {
63 deb98ab4 2015-11-05 stephen.d fcall := newFcall(msg)
64 e6bcde66 2015-10-29 stephen.d req := newFcallRequest(ctx, fcall)
65 e6bcde66 2015-10-29 stephen.d
66 e6bcde66 2015-10-29 stephen.d // dispatch the request.
67 e6bcde66 2015-10-29 stephen.d select {
68 e6bcde66 2015-10-29 stephen.d case <-t.closed:
69 e6bcde66 2015-10-29 stephen.d return nil, ErrClosed
70 e6bcde66 2015-10-29 stephen.d case <-ctx.Done():
71 e6bcde66 2015-10-29 stephen.d return nil, ctx.Err()
72 e6bcde66 2015-10-29 stephen.d case t.requests <- req:
73 e6bcde66 2015-10-29 stephen.d }
74 e6bcde66 2015-10-29 stephen.d
75 e6bcde66 2015-10-29 stephen.d // wait for the response.
76 e6bcde66 2015-10-29 stephen.d select {
77 e6bcde66 2015-10-29 stephen.d case <-t.closed:
78 e6bcde66 2015-10-29 stephen.d return nil, ErrClosed
79 e6bcde66 2015-10-29 stephen.d case <-ctx.Done():
80 e6bcde66 2015-10-29 stephen.d return nil, ctx.Err()
81 a8abc687 2015-10-30 stephen.d case err := <-req.err:
82 a8abc687 2015-10-30 stephen.d return nil, err
83 e6bcde66 2015-10-29 stephen.d case resp := <-req.response:
84 97423e8b 2015-10-29 stephen.d if resp.Type == Rerror {
85 97423e8b 2015-10-29 stephen.d // pack the error into something useful
86 deb98ab4 2015-11-05 stephen.d respmesg, ok := resp.Message.(MessageRerror)
87 97423e8b 2015-10-29 stephen.d if !ok {
88 97423e8b 2015-10-29 stephen.d return nil, fmt.Errorf("invalid error response: %v", resp)
89 97423e8b 2015-10-29 stephen.d }
90 97423e8b 2015-10-29 stephen.d
91 deb98ab4 2015-11-05 stephen.d return nil, respmesg
92 97423e8b 2015-10-29 stephen.d }
93 97423e8b 2015-10-29 stephen.d
94 deb98ab4 2015-11-05 stephen.d return resp.Message, nil
95 e6bcde66 2015-10-29 stephen.d }
96 e6bcde66 2015-10-29 stephen.d }
97 e6bcde66 2015-10-29 stephen.d
98 e6bcde66 2015-10-29 stephen.d // handle takes messages off the wire and wakes up the waiting tag call.
99 e6bcde66 2015-10-29 stephen.d func (t *transport) handle() {
100 a8abc687 2015-10-30 stephen.d defer func() {
101 a8abc687 2015-10-30 stephen.d log.Println("exited handle loop")
102 b714e9e0 2015-11-05 stephen.d t.Close()
103 a8abc687 2015-10-30 stephen.d }()
104 e6bcde66 2015-10-29 stephen.d // the following variable block are protected components owned by this thread.
105 e6bcde66 2015-10-29 stephen.d var (
106 e6bcde66 2015-10-29 stephen.d responses = make(chan *Fcall)
107 e6bcde66 2015-10-29 stephen.d tags Tag
108 e6bcde66 2015-10-29 stephen.d // outstanding provides a map of tags to outstanding requests.
109 e6bcde66 2015-10-29 stephen.d outstanding = map[Tag]*fcallRequest{}
110 e6bcde66 2015-10-29 stephen.d )
111 e6bcde66 2015-10-29 stephen.d
112 e6bcde66 2015-10-29 stephen.d // loop to read messages off of the connection
113 e6bcde66 2015-10-29 stephen.d go func() {
114 fb37ce2a 2015-10-30 stephen.d defer func() {
115 fb37ce2a 2015-10-30 stephen.d log.Println("exited read loop")
116 b714e9e0 2015-11-05 stephen.d t.Close()
117 fb37ce2a 2015-10-30 stephen.d }()
118 e6bcde66 2015-10-29 stephen.d loop:
119 e6bcde66 2015-10-29 stephen.d for {
120 fb37ce2a 2015-10-30 stephen.d fcall := new(Fcall)
121 3bf22e58 2015-11-04 stephen.d if err := t.ch.ReadFcall(t.ctx, fcall); err != nil {
122 e6bcde66 2015-10-29 stephen.d switch err := err.(type) {
123 e6bcde66 2015-10-29 stephen.d case net.Error:
124 e6bcde66 2015-10-29 stephen.d if err.Timeout() || err.Temporary() {
125 fb37ce2a 2015-10-30 stephen.d // BUG(stevvooe): There may be partial reads under
126 fb37ce2a 2015-10-30 stephen.d // timeout errors where this is actually fatal.
127 fb37ce2a 2015-10-30 stephen.d
128 fb37ce2a 2015-10-30 stephen.d // can only retry if we haven't offset the frame.
129 fb37ce2a 2015-10-30 stephen.d continue loop
130 e6bcde66 2015-10-29 stephen.d }
131 e6bcde66 2015-10-29 stephen.d }
132 e6bcde66 2015-10-29 stephen.d
133 fb37ce2a 2015-10-30 stephen.d log.Println("fatal error reading msg:", err)
134 fb37ce2a 2015-10-30 stephen.d t.Close()
135 fb37ce2a 2015-10-30 stephen.d return
136 e6bcde66 2015-10-29 stephen.d }
137 e6bcde66 2015-10-29 stephen.d
138 e6bcde66 2015-10-29 stephen.d select {
139 e6bcde66 2015-10-29 stephen.d case <-t.ctx.Done():
140 fb37ce2a 2015-10-30 stephen.d log.Println("ctx done")
141 e6bcde66 2015-10-29 stephen.d return
142 e6bcde66 2015-10-29 stephen.d case <-t.closed:
143 fb37ce2a 2015-10-30 stephen.d log.Println("transport closed")
144 e6bcde66 2015-10-29 stephen.d return
145 fb37ce2a 2015-10-30 stephen.d case responses <- fcall:
146 e6bcde66 2015-10-29 stephen.d }
147 e6bcde66 2015-10-29 stephen.d }
148 e6bcde66 2015-10-29 stephen.d }()
149 e6bcde66 2015-10-29 stephen.d
150 e6bcde66 2015-10-29 stephen.d for {
151 a8abc687 2015-10-30 stephen.d log.Println("wait...")
152 e6bcde66 2015-10-29 stephen.d select {
153 e6bcde66 2015-10-29 stephen.d case req := <-t.requests:
154 74ec7ac9 2015-10-30 stephen.d if req.fcall.Tag == NOTAG {
155 74ec7ac9 2015-10-30 stephen.d // NOTE(stevvooe): We disallow fcalls with NOTAG to come
156 74ec7ac9 2015-10-30 stephen.d // through this path since we can't join the tagged response
157 74ec7ac9 2015-10-30 stephen.d // with the waiting caller. This is typically used for the
158 74ec7ac9 2015-10-30 stephen.d // Tversion/Rversion round trip to setup a session.
159 74ec7ac9 2015-10-30 stephen.d //
160 74ec7ac9 2015-10-30 stephen.d // It may be better to allow these through but block all
161 74ec7ac9 2015-10-30 stephen.d // requests until a notag message has a response.
162 e6bcde66 2015-10-29 stephen.d
163 74ec7ac9 2015-10-30 stephen.d req.err <- fmt.Errorf("disallowed tag through transport")
164 74ec7ac9 2015-10-30 stephen.d continue
165 e6bcde66 2015-10-29 stephen.d }
166 e6bcde66 2015-10-29 stephen.d
167 74ec7ac9 2015-10-30 stephen.d // BUG(stevvooe): This is an awful tag allocation procedure.
168 74ec7ac9 2015-10-30 stephen.d // Replace this with something that let's us allocate tags and
169 74ec7ac9 2015-10-30 stephen.d // associate data with them, returning to them to a pool when
170 74ec7ac9 2015-10-30 stephen.d // complete. Such a system would provide a lot of information
171 74ec7ac9 2015-10-30 stephen.d // about outstanding requests.
172 74ec7ac9 2015-10-30 stephen.d tags++
173 74ec7ac9 2015-10-30 stephen.d req.fcall.Tag = tags
174 74ec7ac9 2015-10-30 stephen.d outstanding[req.fcall.Tag] = req
175 74ec7ac9 2015-10-30 stephen.d
176 e6bcde66 2015-10-29 stephen.d // TODO(stevvooe): Consider the case of requests that never
177 e6bcde66 2015-10-29 stephen.d // receive a response. We need to remove the fcall context from
178 e6bcde66 2015-10-29 stephen.d // the tag map and dealloc the tag. We may also want to send a
179 e6bcde66 2015-10-29 stephen.d // flush for the tag.
180 3bf22e58 2015-11-04 stephen.d if err := t.ch.WriteFcall(req.ctx, req.fcall); err != nil {
181 e6bcde66 2015-10-29 stephen.d delete(outstanding, req.fcall.Tag)
182 e6bcde66 2015-10-29 stephen.d req.err <- err
183 e6bcde66 2015-10-29 stephen.d }
184 e6bcde66 2015-10-29 stephen.d case b := <-responses:
185 e6bcde66 2015-10-29 stephen.d req, ok := outstanding[b.Tag]
186 e6bcde66 2015-10-29 stephen.d if !ok {
187 e6bcde66 2015-10-29 stephen.d panic("unknown tag received")
188 e6bcde66 2015-10-29 stephen.d }
189 e6bcde66 2015-10-29 stephen.d delete(outstanding, req.fcall.Tag)
190 e6bcde66 2015-10-29 stephen.d
191 e6bcde66 2015-10-29 stephen.d req.response <- b
192 e6bcde66 2015-10-29 stephen.d
193 e6bcde66 2015-10-29 stephen.d // TODO(stevvooe): Reclaim tag id.
194 e6bcde66 2015-10-29 stephen.d case <-t.ctx.Done():
195 e6bcde66 2015-10-29 stephen.d return
196 e6bcde66 2015-10-29 stephen.d case <-t.closed:
197 e6bcde66 2015-10-29 stephen.d return
198 e6bcde66 2015-10-29 stephen.d }
199 e6bcde66 2015-10-29 stephen.d }
200 e6bcde66 2015-10-29 stephen.d }
201 e6bcde66 2015-10-29 stephen.d
202 deb98ab4 2015-11-05 stephen.d func (t *transport) flush(ctx context.Context, tag Tag) error {
203 deb98ab4 2015-11-05 stephen.d // TODO(stevvooe): We need to fire and forget flush messages when a call
204 deb98ab4 2015-11-05 stephen.d // context gets cancelled.
205 deb98ab4 2015-11-05 stephen.d panic("not implemented")
206 deb98ab4 2015-11-05 stephen.d }
207 deb98ab4 2015-11-05 stephen.d
208 e6bcde66 2015-10-29 stephen.d func (t *transport) Close() error {
209 e6bcde66 2015-10-29 stephen.d select {
210 e6bcde66 2015-10-29 stephen.d case <-t.closed:
211 e6bcde66 2015-10-29 stephen.d return ErrClosed
212 e6bcde66 2015-10-29 stephen.d case <-t.ctx.Done():
213 e6bcde66 2015-10-29 stephen.d return t.ctx.Err()
214 e6bcde66 2015-10-29 stephen.d default:
215 e6bcde66 2015-10-29 stephen.d close(t.closed)
216 e6bcde66 2015-10-29 stephen.d }
217 e6bcde66 2015-10-29 stephen.d
218 e6bcde66 2015-10-29 stephen.d return nil
219 e6bcde66 2015-10-29 stephen.d }