Blame


1 4b33cdd0 2015-11-30 stephen.d package p9p
2 4b33cdd0 2015-11-30 stephen.d
3 4b33cdd0 2015-11-30 stephen.d import (
4 bd8041e2 2016-05-25 w "errors"
5 4b33cdd0 2015-11-30 stephen.d "fmt"
6 4b33cdd0 2015-11-30 stephen.d "log"
7 4b33cdd0 2015-11-30 stephen.d "net"
8 4b33cdd0 2015-11-30 stephen.d
9 4b33cdd0 2015-11-30 stephen.d "golang.org/x/net/context"
10 4b33cdd0 2015-11-30 stephen.d )
11 4b33cdd0 2015-11-30 stephen.d
12 4b33cdd0 2015-11-30 stephen.d // roundTripper manages the request and response from the client-side. A
13 4b33cdd0 2015-11-30 stephen.d // roundTripper must abide by similar rules to the http.RoundTripper.
14 4b33cdd0 2015-11-30 stephen.d // Typically, the roundTripper will manage tag assignment and message
15 4b33cdd0 2015-11-30 stephen.d // serialization.
16 4b33cdd0 2015-11-30 stephen.d type roundTripper interface {
17 4b33cdd0 2015-11-30 stephen.d send(ctx context.Context, msg Message) (Message, error)
18 4b33cdd0 2015-11-30 stephen.d }
19 4b33cdd0 2015-11-30 stephen.d
20 4b33cdd0 2015-11-30 stephen.d // transport plays the role of being a client channel manager. It multiplexes
21 4b33cdd0 2015-11-30 stephen.d // function calls onto the wire and dispatches responses to blocking calls to
22 4b33cdd0 2015-11-30 stephen.d // send. On the whole, transport is thread-safe for calling send
23 4b33cdd0 2015-11-30 stephen.d type transport struct {
24 4b33cdd0 2015-11-30 stephen.d ctx context.Context
25 4b33cdd0 2015-11-30 stephen.d ch Channel
26 4b33cdd0 2015-11-30 stephen.d requests chan *fcallRequest
27 4b33cdd0 2015-11-30 stephen.d closed chan struct{}
28 4b33cdd0 2015-11-30 stephen.d
29 4b33cdd0 2015-11-30 stephen.d tags uint16
30 4b33cdd0 2015-11-30 stephen.d }
31 4b33cdd0 2015-11-30 stephen.d
32 4b33cdd0 2015-11-30 stephen.d var _ roundTripper = &transport{}
33 4b33cdd0 2015-11-30 stephen.d
34 4b33cdd0 2015-11-30 stephen.d func newTransport(ctx context.Context, ch *channel) roundTripper {
35 4b33cdd0 2015-11-30 stephen.d t := &transport{
36 4b33cdd0 2015-11-30 stephen.d ctx: ctx,
37 4b33cdd0 2015-11-30 stephen.d ch: ch,
38 4b33cdd0 2015-11-30 stephen.d requests: make(chan *fcallRequest),
39 4b33cdd0 2015-11-30 stephen.d closed: make(chan struct{}),
40 4b33cdd0 2015-11-30 stephen.d }
41 4b33cdd0 2015-11-30 stephen.d
42 4b33cdd0 2015-11-30 stephen.d go t.handle()
43 4b33cdd0 2015-11-30 stephen.d
44 4b33cdd0 2015-11-30 stephen.d return t
45 4b33cdd0 2015-11-30 stephen.d }
46 4b33cdd0 2015-11-30 stephen.d
47 4b33cdd0 2015-11-30 stephen.d // fcallRequest encompasses the request to send a message via fcall.
48 4b33cdd0 2015-11-30 stephen.d type fcallRequest struct {
49 4b33cdd0 2015-11-30 stephen.d ctx context.Context
50 4b33cdd0 2015-11-30 stephen.d message Message
51 4b33cdd0 2015-11-30 stephen.d response chan *Fcall
52 4b33cdd0 2015-11-30 stephen.d err chan error
53 4b33cdd0 2015-11-30 stephen.d }
54 4b33cdd0 2015-11-30 stephen.d
55 4b33cdd0 2015-11-30 stephen.d func newFcallRequest(ctx context.Context, msg Message) *fcallRequest {
56 4b33cdd0 2015-11-30 stephen.d return &fcallRequest{
57 4b33cdd0 2015-11-30 stephen.d ctx: ctx,
58 4b33cdd0 2015-11-30 stephen.d message: msg,
59 4b33cdd0 2015-11-30 stephen.d response: make(chan *Fcall, 1),
60 4b33cdd0 2015-11-30 stephen.d err: make(chan error, 1),
61 4b33cdd0 2015-11-30 stephen.d }
62 4b33cdd0 2015-11-30 stephen.d }
63 4b33cdd0 2015-11-30 stephen.d
64 4b33cdd0 2015-11-30 stephen.d func (t *transport) send(ctx context.Context, msg Message) (Message, error) {
65 4b33cdd0 2015-11-30 stephen.d req := newFcallRequest(ctx, msg)
66 4b33cdd0 2015-11-30 stephen.d
67 4b33cdd0 2015-11-30 stephen.d // dispatch the request.
68 4b33cdd0 2015-11-30 stephen.d select {
69 4b33cdd0 2015-11-30 stephen.d case <-t.closed:
70 4b33cdd0 2015-11-30 stephen.d return nil, ErrClosed
71 4b33cdd0 2015-11-30 stephen.d case <-ctx.Done():
72 4b33cdd0 2015-11-30 stephen.d return nil, ctx.Err()
73 4b33cdd0 2015-11-30 stephen.d case t.requests <- req:
74 4b33cdd0 2015-11-30 stephen.d }
75 4b33cdd0 2015-11-30 stephen.d
76 4b33cdd0 2015-11-30 stephen.d // wait for the response.
77 4b33cdd0 2015-11-30 stephen.d select {
78 4b33cdd0 2015-11-30 stephen.d case <-t.closed:
79 4b33cdd0 2015-11-30 stephen.d return nil, ErrClosed
80 4b33cdd0 2015-11-30 stephen.d case <-ctx.Done():
81 4b33cdd0 2015-11-30 stephen.d return nil, ctx.Err()
82 4b33cdd0 2015-11-30 stephen.d case err := <-req.err:
83 4b33cdd0 2015-11-30 stephen.d return nil, err
84 4b33cdd0 2015-11-30 stephen.d case resp := <-req.response:
85 4b33cdd0 2015-11-30 stephen.d if resp.Type == Rerror {
86 4b33cdd0 2015-11-30 stephen.d // pack the error into something useful
87 4b33cdd0 2015-11-30 stephen.d respmesg, ok := resp.Message.(MessageRerror)
88 4b33cdd0 2015-11-30 stephen.d if !ok {
89 4b33cdd0 2015-11-30 stephen.d return nil, fmt.Errorf("invalid error response: %v", resp)
90 4b33cdd0 2015-11-30 stephen.d }
91 4b33cdd0 2015-11-30 stephen.d
92 4b33cdd0 2015-11-30 stephen.d return nil, respmesg
93 4b33cdd0 2015-11-30 stephen.d }
94 4b33cdd0 2015-11-30 stephen.d
95 4b33cdd0 2015-11-30 stephen.d return resp.Message, nil
96 4b33cdd0 2015-11-30 stephen.d }
97 4b33cdd0 2015-11-30 stephen.d }
98 4b33cdd0 2015-11-30 stephen.d
99 bd8041e2 2016-05-25 w func allocateTag(r *fcallRequest, m map[Tag]*fcallRequest) (Tag, error) {
100 bd8041e2 2016-05-25 w // Tversion can only use NOTAG, so check if we're sending a Tversion.
101 bd8041e2 2016-05-25 w if r.message.Type() == Tversion {
102 bd8041e2 2016-05-25 w if _, exists := m[NOTAG]; exists {
103 bd8041e2 2016-05-25 w return 0, errors.New("NOTAG already in use")
104 bd8041e2 2016-05-25 w }
105 bd8041e2 2016-05-25 w return NOTAG, nil
106 bd8041e2 2016-05-25 w }
107 bd8041e2 2016-05-25 w
108 bd8041e2 2016-05-25 w // The tag pool is depleted if all 65536 tags are taken, or if 65535 tags
109 bd8041e2 2016-05-25 w // are taken and NOTAG is available.
110 bd8041e2 2016-05-25 w if len(m) > 0xFFFF {
111 bd8041e2 2016-05-25 w return 0, errors.New("tag pool depleted")
112 bd8041e2 2016-05-25 w } else if len(m) == 0xFFFF {
113 bd8041e2 2016-05-25 w if _, exists := m[NOTAG]; !exists {
114 bd8041e2 2016-05-25 w return 0, errors.New("tag pool depleted")
115 bd8041e2 2016-05-25 w }
116 bd8041e2 2016-05-25 w }
117 bd8041e2 2016-05-25 w
118 bd8041e2 2016-05-25 w // Look for the first tag that doesn't exist in the map and return it.
119 bd8041e2 2016-05-25 w for selected := Tag(0); selected < NOTAG; selected++ {
120 bd8041e2 2016-05-25 w if _, exists := m[selected]; !exists {
121 bd8041e2 2016-05-25 w return selected, nil
122 bd8041e2 2016-05-25 w }
123 bd8041e2 2016-05-25 w }
124 bd8041e2 2016-05-25 w
125 bd8041e2 2016-05-25 w return 0, errors.New("allocateTag: unexpected error")
126 bd8041e2 2016-05-25 w }
127 bd8041e2 2016-05-25 w
128 4b33cdd0 2015-11-30 stephen.d // handle takes messages off the wire and wakes up the waiting tag call.
129 4b33cdd0 2015-11-30 stephen.d func (t *transport) handle() {
130 4b33cdd0 2015-11-30 stephen.d defer func() {
131 4b33cdd0 2015-11-30 stephen.d log.Println("exited handle loop")
132 4b33cdd0 2015-11-30 stephen.d t.Close()
133 4b33cdd0 2015-11-30 stephen.d }()
134 4b33cdd0 2015-11-30 stephen.d // the following variable block are protected components owned by this thread.
135 4b33cdd0 2015-11-30 stephen.d var (
136 4b33cdd0 2015-11-30 stephen.d responses = make(chan *Fcall)
137 4b33cdd0 2015-11-30 stephen.d // outstanding provides a map of tags to outstanding requests.
138 4b33cdd0 2015-11-30 stephen.d outstanding = map[Tag]*fcallRequest{}
139 4b33cdd0 2015-11-30 stephen.d )
140 4b33cdd0 2015-11-30 stephen.d
141 4b33cdd0 2015-11-30 stephen.d // loop to read messages off of the connection
142 4b33cdd0 2015-11-30 stephen.d go func() {
143 4b33cdd0 2015-11-30 stephen.d defer func() {
144 4b33cdd0 2015-11-30 stephen.d log.Println("exited read loop")
145 4b33cdd0 2015-11-30 stephen.d t.Close()
146 4b33cdd0 2015-11-30 stephen.d }()
147 4b33cdd0 2015-11-30 stephen.d loop:
148 4b33cdd0 2015-11-30 stephen.d for {
149 4b33cdd0 2015-11-30 stephen.d fcall := new(Fcall)
150 4b33cdd0 2015-11-30 stephen.d if err := t.ch.ReadFcall(t.ctx, fcall); err != nil {
151 4b33cdd0 2015-11-30 stephen.d switch err := err.(type) {
152 4b33cdd0 2015-11-30 stephen.d case net.Error:
153 4b33cdd0 2015-11-30 stephen.d if err.Timeout() || err.Temporary() {
154 4b33cdd0 2015-11-30 stephen.d // BUG(stevvooe): There may be partial reads under
155 4b33cdd0 2015-11-30 stephen.d // timeout errors where this is actually fatal.
156 4b33cdd0 2015-11-30 stephen.d
157 4b33cdd0 2015-11-30 stephen.d // can only retry if we haven't offset the frame.
158 4b33cdd0 2015-11-30 stephen.d continue loop
159 4b33cdd0 2015-11-30 stephen.d }
160 4b33cdd0 2015-11-30 stephen.d }
161 4b33cdd0 2015-11-30 stephen.d
162 4b33cdd0 2015-11-30 stephen.d log.Println("fatal error reading msg:", err)
163 4b33cdd0 2015-11-30 stephen.d t.Close()
164 4b33cdd0 2015-11-30 stephen.d return
165 4b33cdd0 2015-11-30 stephen.d }
166 4b33cdd0 2015-11-30 stephen.d
167 4b33cdd0 2015-11-30 stephen.d select {
168 4b33cdd0 2015-11-30 stephen.d case <-t.ctx.Done():
169 4b33cdd0 2015-11-30 stephen.d log.Println("ctx done")
170 4b33cdd0 2015-11-30 stephen.d return
171 4b33cdd0 2015-11-30 stephen.d case <-t.closed:
172 4b33cdd0 2015-11-30 stephen.d log.Println("transport closed")
173 4b33cdd0 2015-11-30 stephen.d return
174 4b33cdd0 2015-11-30 stephen.d case responses <- fcall:
175 4b33cdd0 2015-11-30 stephen.d }
176 4b33cdd0 2015-11-30 stephen.d }
177 4b33cdd0 2015-11-30 stephen.d }()
178 4b33cdd0 2015-11-30 stephen.d
179 4b33cdd0 2015-11-30 stephen.d for {
180 4b33cdd0 2015-11-30 stephen.d select {
181 4b33cdd0 2015-11-30 stephen.d case req := <-t.requests:
182 bd8041e2 2016-05-25 w selected, err := allocateTag(req, outstanding)
183 bd8041e2 2016-05-25 w if err != nil {
184 bd8041e2 2016-05-25 w req.err <- err
185 bd8041e2 2016-05-25 w continue
186 bd8041e2 2016-05-25 w }
187 4b33cdd0 2015-11-30 stephen.d
188 bd8041e2 2016-05-25 w outstanding[selected] = req
189 bd8041e2 2016-05-25 w fcall := newFcall(selected, req.message)
190 bd8041e2 2016-05-25 w
191 4b33cdd0 2015-11-30 stephen.d // TODO(stevvooe): Consider the case of requests that never
192 4b33cdd0 2015-11-30 stephen.d // receive a response. We need to remove the fcall context from
193 4b33cdd0 2015-11-30 stephen.d // the tag map and dealloc the tag. We may also want to send a
194 4b33cdd0 2015-11-30 stephen.d // flush for the tag.
195 4b33cdd0 2015-11-30 stephen.d if err := t.ch.WriteFcall(req.ctx, fcall); err != nil {
196 4b33cdd0 2015-11-30 stephen.d delete(outstanding, fcall.Tag)
197 4b33cdd0 2015-11-30 stephen.d req.err <- err
198 4b33cdd0 2015-11-30 stephen.d }
199 4b33cdd0 2015-11-30 stephen.d case b := <-responses:
200 4b33cdd0 2015-11-30 stephen.d req, ok := outstanding[b.Tag]
201 4b33cdd0 2015-11-30 stephen.d if !ok {
202 f52b8701 2016-05-19 stevvooe // BUG(stevvooe): The exact handling of an unknown tag is
203 f52b8701 2016-05-19 stevvooe // unclear at this point. These may not necessarily fatal to
204 f52b8701 2016-05-19 stevvooe // the session, since they could be messages that the client no
205 f52b8701 2016-05-19 stevvooe // longer cares for. When we figure this out, replace this
206 f52b8701 2016-05-19 stevvooe // panic with something more sensible.
207 f52b8701 2016-05-19 stevvooe panic(fmt.Sprintf("unknown tag received: %v", b))
208 4b33cdd0 2015-11-30 stephen.d }
209 4b33cdd0 2015-11-30 stephen.d
210 4b33cdd0 2015-11-30 stephen.d // BUG(stevvooe): Must detect duplicate tag and ensure that we are
211 4b33cdd0 2015-11-30 stephen.d // waking up the right caller. If a duplicate is received, the
212 4b33cdd0 2015-11-30 stephen.d // entry should not be deleted.
213 4b33cdd0 2015-11-30 stephen.d delete(outstanding, b.Tag)
214 4b33cdd0 2015-11-30 stephen.d
215 4b33cdd0 2015-11-30 stephen.d req.response <- b
216 4b33cdd0 2015-11-30 stephen.d
217 4b33cdd0 2015-11-30 stephen.d // TODO(stevvooe): Reclaim tag id.
218 4b33cdd0 2015-11-30 stephen.d case <-t.ctx.Done():
219 4b33cdd0 2015-11-30 stephen.d return
220 4b33cdd0 2015-11-30 stephen.d case <-t.closed:
221 4b33cdd0 2015-11-30 stephen.d return
222 4b33cdd0 2015-11-30 stephen.d }
223 4b33cdd0 2015-11-30 stephen.d }
224 4b33cdd0 2015-11-30 stephen.d }
225 4b33cdd0 2015-11-30 stephen.d
226 4b33cdd0 2015-11-30 stephen.d func (t *transport) flush(ctx context.Context, tag Tag) error {
227 4b33cdd0 2015-11-30 stephen.d // TODO(stevvooe): We need to fire and forget flush messages when a call
228 4b33cdd0 2015-11-30 stephen.d // context gets cancelled.
229 4b33cdd0 2015-11-30 stephen.d panic("not implemented")
230 4b33cdd0 2015-11-30 stephen.d }
231 4b33cdd0 2015-11-30 stephen.d
232 4b33cdd0 2015-11-30 stephen.d func (t *transport) Close() error {
233 4b33cdd0 2015-11-30 stephen.d select {
234 4b33cdd0 2015-11-30 stephen.d case <-t.closed:
235 4b33cdd0 2015-11-30 stephen.d return ErrClosed
236 4b33cdd0 2015-11-30 stephen.d case <-t.ctx.Done():
237 4b33cdd0 2015-11-30 stephen.d return t.ctx.Err()
238 4b33cdd0 2015-11-30 stephen.d default:
239 4b33cdd0 2015-11-30 stephen.d close(t.closed)
240 4b33cdd0 2015-11-30 stephen.d }
241 4b33cdd0 2015-11-30 stephen.d
242 4b33cdd0 2015-11-30 stephen.d return nil
243 4b33cdd0 2015-11-30 stephen.d }