Blame


1 4b33cdd0 2015-11-30 stephen.d package p9p
2 4b33cdd0 2015-11-30 stephen.d
3 4b33cdd0 2015-11-30 stephen.d import (
4 4b33cdd0 2015-11-30 stephen.d "fmt"
5 4b33cdd0 2015-11-30 stephen.d "log"
6 4b33cdd0 2015-11-30 stephen.d "net"
7 4b33cdd0 2015-11-30 stephen.d
8 4b33cdd0 2015-11-30 stephen.d "golang.org/x/net/context"
9 4b33cdd0 2015-11-30 stephen.d )
10 4b33cdd0 2015-11-30 stephen.d
11 4b33cdd0 2015-11-30 stephen.d // roundTripper manages the request and response from the client-side. A
12 4b33cdd0 2015-11-30 stephen.d // roundTripper must abide by similar rules to the http.RoundTripper.
13 4b33cdd0 2015-11-30 stephen.d // Typically, the roundTripper will manage tag assignment and message
14 4b33cdd0 2015-11-30 stephen.d // serialization.
15 4b33cdd0 2015-11-30 stephen.d type roundTripper interface {
16 4b33cdd0 2015-11-30 stephen.d send(ctx context.Context, msg Message) (Message, error)
17 4b33cdd0 2015-11-30 stephen.d }
18 4b33cdd0 2015-11-30 stephen.d
19 4b33cdd0 2015-11-30 stephen.d // transport plays the role of being a client channel manager. It multiplexes
20 4b33cdd0 2015-11-30 stephen.d // function calls onto the wire and dispatches responses to blocking calls to
21 4b33cdd0 2015-11-30 stephen.d // send. On the whole, transport is thread-safe for calling send
22 4b33cdd0 2015-11-30 stephen.d type transport struct {
23 4b33cdd0 2015-11-30 stephen.d ctx context.Context
24 4b33cdd0 2015-11-30 stephen.d ch Channel
25 4b33cdd0 2015-11-30 stephen.d requests chan *fcallRequest
26 4b33cdd0 2015-11-30 stephen.d closed chan struct{}
27 4b33cdd0 2015-11-30 stephen.d
28 4b33cdd0 2015-11-30 stephen.d tags uint16
29 4b33cdd0 2015-11-30 stephen.d }
30 4b33cdd0 2015-11-30 stephen.d
31 4b33cdd0 2015-11-30 stephen.d var _ roundTripper = &transport{}
32 4b33cdd0 2015-11-30 stephen.d
33 4b33cdd0 2015-11-30 stephen.d func newTransport(ctx context.Context, ch *channel) roundTripper {
34 4b33cdd0 2015-11-30 stephen.d t := &transport{
35 4b33cdd0 2015-11-30 stephen.d ctx: ctx,
36 4b33cdd0 2015-11-30 stephen.d ch: ch,
37 4b33cdd0 2015-11-30 stephen.d requests: make(chan *fcallRequest),
38 4b33cdd0 2015-11-30 stephen.d closed: make(chan struct{}),
39 4b33cdd0 2015-11-30 stephen.d }
40 4b33cdd0 2015-11-30 stephen.d
41 4b33cdd0 2015-11-30 stephen.d go t.handle()
42 4b33cdd0 2015-11-30 stephen.d
43 4b33cdd0 2015-11-30 stephen.d return t
44 4b33cdd0 2015-11-30 stephen.d }
45 4b33cdd0 2015-11-30 stephen.d
46 4b33cdd0 2015-11-30 stephen.d // fcallRequest encompasses the request to send a message via fcall.
47 4b33cdd0 2015-11-30 stephen.d type fcallRequest struct {
48 4b33cdd0 2015-11-30 stephen.d ctx context.Context
49 4b33cdd0 2015-11-30 stephen.d message Message
50 4b33cdd0 2015-11-30 stephen.d response chan *Fcall
51 4b33cdd0 2015-11-30 stephen.d err chan error
52 4b33cdd0 2015-11-30 stephen.d }
53 4b33cdd0 2015-11-30 stephen.d
54 4b33cdd0 2015-11-30 stephen.d func newFcallRequest(ctx context.Context, msg Message) *fcallRequest {
55 4b33cdd0 2015-11-30 stephen.d return &fcallRequest{
56 4b33cdd0 2015-11-30 stephen.d ctx: ctx,
57 4b33cdd0 2015-11-30 stephen.d message: msg,
58 4b33cdd0 2015-11-30 stephen.d response: make(chan *Fcall, 1),
59 4b33cdd0 2015-11-30 stephen.d err: make(chan error, 1),
60 4b33cdd0 2015-11-30 stephen.d }
61 4b33cdd0 2015-11-30 stephen.d }
62 4b33cdd0 2015-11-30 stephen.d
63 4b33cdd0 2015-11-30 stephen.d func (t *transport) send(ctx context.Context, msg Message) (Message, error) {
64 4b33cdd0 2015-11-30 stephen.d req := newFcallRequest(ctx, msg)
65 4b33cdd0 2015-11-30 stephen.d
66 4b33cdd0 2015-11-30 stephen.d // dispatch the request.
67 4b33cdd0 2015-11-30 stephen.d select {
68 4b33cdd0 2015-11-30 stephen.d case <-t.closed:
69 4b33cdd0 2015-11-30 stephen.d return nil, ErrClosed
70 4b33cdd0 2015-11-30 stephen.d case <-ctx.Done():
71 4b33cdd0 2015-11-30 stephen.d return nil, ctx.Err()
72 4b33cdd0 2015-11-30 stephen.d case t.requests <- req:
73 4b33cdd0 2015-11-30 stephen.d }
74 4b33cdd0 2015-11-30 stephen.d
75 4b33cdd0 2015-11-30 stephen.d // wait for the response.
76 4b33cdd0 2015-11-30 stephen.d select {
77 4b33cdd0 2015-11-30 stephen.d case <-t.closed:
78 4b33cdd0 2015-11-30 stephen.d return nil, ErrClosed
79 4b33cdd0 2015-11-30 stephen.d case <-ctx.Done():
80 4b33cdd0 2015-11-30 stephen.d return nil, ctx.Err()
81 4b33cdd0 2015-11-30 stephen.d case err := <-req.err:
82 4b33cdd0 2015-11-30 stephen.d return nil, err
83 4b33cdd0 2015-11-30 stephen.d case resp := <-req.response:
84 4b33cdd0 2015-11-30 stephen.d if resp.Type == Rerror {
85 4b33cdd0 2015-11-30 stephen.d // pack the error into something useful
86 4b33cdd0 2015-11-30 stephen.d respmesg, ok := resp.Message.(MessageRerror)
87 4b33cdd0 2015-11-30 stephen.d if !ok {
88 4b33cdd0 2015-11-30 stephen.d return nil, fmt.Errorf("invalid error response: %v", resp)
89 4b33cdd0 2015-11-30 stephen.d }
90 4b33cdd0 2015-11-30 stephen.d
91 4b33cdd0 2015-11-30 stephen.d return nil, respmesg
92 4b33cdd0 2015-11-30 stephen.d }
93 4b33cdd0 2015-11-30 stephen.d
94 4b33cdd0 2015-11-30 stephen.d return resp.Message, nil
95 4b33cdd0 2015-11-30 stephen.d }
96 4b33cdd0 2015-11-30 stephen.d }
97 4b33cdd0 2015-11-30 stephen.d
98 4b33cdd0 2015-11-30 stephen.d // handle takes messages off the wire and wakes up the waiting tag call.
99 4b33cdd0 2015-11-30 stephen.d func (t *transport) handle() {
100 4b33cdd0 2015-11-30 stephen.d defer func() {
101 4b33cdd0 2015-11-30 stephen.d log.Println("exited handle loop")
102 4b33cdd0 2015-11-30 stephen.d t.Close()
103 4b33cdd0 2015-11-30 stephen.d }()
104 4b33cdd0 2015-11-30 stephen.d // the following variable block are protected components owned by this thread.
105 4b33cdd0 2015-11-30 stephen.d var (
106 4b33cdd0 2015-11-30 stephen.d responses = make(chan *Fcall)
107 4b33cdd0 2015-11-30 stephen.d tags Tag
108 4b33cdd0 2015-11-30 stephen.d // outstanding provides a map of tags to outstanding requests.
109 4b33cdd0 2015-11-30 stephen.d outstanding = map[Tag]*fcallRequest{}
110 4b33cdd0 2015-11-30 stephen.d )
111 4b33cdd0 2015-11-30 stephen.d
112 4b33cdd0 2015-11-30 stephen.d // loop to read messages off of the connection
113 4b33cdd0 2015-11-30 stephen.d go func() {
114 4b33cdd0 2015-11-30 stephen.d defer func() {
115 4b33cdd0 2015-11-30 stephen.d log.Println("exited read loop")
116 4b33cdd0 2015-11-30 stephen.d t.Close()
117 4b33cdd0 2015-11-30 stephen.d }()
118 4b33cdd0 2015-11-30 stephen.d loop:
119 4b33cdd0 2015-11-30 stephen.d for {
120 4b33cdd0 2015-11-30 stephen.d fcall := new(Fcall)
121 4b33cdd0 2015-11-30 stephen.d if err := t.ch.ReadFcall(t.ctx, fcall); err != nil {
122 4b33cdd0 2015-11-30 stephen.d switch err := err.(type) {
123 4b33cdd0 2015-11-30 stephen.d case net.Error:
124 4b33cdd0 2015-11-30 stephen.d if err.Timeout() || err.Temporary() {
125 4b33cdd0 2015-11-30 stephen.d // BUG(stevvooe): There may be partial reads under
126 4b33cdd0 2015-11-30 stephen.d // timeout errors where this is actually fatal.
127 4b33cdd0 2015-11-30 stephen.d
128 4b33cdd0 2015-11-30 stephen.d // can only retry if we haven't offset the frame.
129 4b33cdd0 2015-11-30 stephen.d continue loop
130 4b33cdd0 2015-11-30 stephen.d }
131 4b33cdd0 2015-11-30 stephen.d }
132 4b33cdd0 2015-11-30 stephen.d
133 4b33cdd0 2015-11-30 stephen.d log.Println("fatal error reading msg:", err)
134 4b33cdd0 2015-11-30 stephen.d t.Close()
135 4b33cdd0 2015-11-30 stephen.d return
136 4b33cdd0 2015-11-30 stephen.d }
137 4b33cdd0 2015-11-30 stephen.d
138 4b33cdd0 2015-11-30 stephen.d select {
139 4b33cdd0 2015-11-30 stephen.d case <-t.ctx.Done():
140 4b33cdd0 2015-11-30 stephen.d log.Println("ctx done")
141 4b33cdd0 2015-11-30 stephen.d return
142 4b33cdd0 2015-11-30 stephen.d case <-t.closed:
143 4b33cdd0 2015-11-30 stephen.d log.Println("transport closed")
144 4b33cdd0 2015-11-30 stephen.d return
145 4b33cdd0 2015-11-30 stephen.d case responses <- fcall:
146 4b33cdd0 2015-11-30 stephen.d }
147 4b33cdd0 2015-11-30 stephen.d }
148 4b33cdd0 2015-11-30 stephen.d }()
149 4b33cdd0 2015-11-30 stephen.d
150 4b33cdd0 2015-11-30 stephen.d for {
151 4b33cdd0 2015-11-30 stephen.d select {
152 4b33cdd0 2015-11-30 stephen.d case req := <-t.requests:
153 4b33cdd0 2015-11-30 stephen.d // BUG(stevvooe): This is an awful tag allocation procedure.
154 4b33cdd0 2015-11-30 stephen.d // Replace this with something that let's us allocate tags and
155 4b33cdd0 2015-11-30 stephen.d // associate data with them, returning to them to a pool when
156 4b33cdd0 2015-11-30 stephen.d // complete. Such a system would provide a lot of information
157 4b33cdd0 2015-11-30 stephen.d // about outstanding requests.
158 4b33cdd0 2015-11-30 stephen.d tags++
159 4b33cdd0 2015-11-30 stephen.d fcall := newFcall(tags, req.message)
160 4b33cdd0 2015-11-30 stephen.d outstanding[fcall.Tag] = req
161 4b33cdd0 2015-11-30 stephen.d
162 4b33cdd0 2015-11-30 stephen.d // TODO(stevvooe): Consider the case of requests that never
163 4b33cdd0 2015-11-30 stephen.d // receive a response. We need to remove the fcall context from
164 4b33cdd0 2015-11-30 stephen.d // the tag map and dealloc the tag. We may also want to send a
165 4b33cdd0 2015-11-30 stephen.d // flush for the tag.
166 4b33cdd0 2015-11-30 stephen.d if err := t.ch.WriteFcall(req.ctx, fcall); err != nil {
167 4b33cdd0 2015-11-30 stephen.d delete(outstanding, fcall.Tag)
168 4b33cdd0 2015-11-30 stephen.d req.err <- err
169 4b33cdd0 2015-11-30 stephen.d }
170 4b33cdd0 2015-11-30 stephen.d case b := <-responses:
171 4b33cdd0 2015-11-30 stephen.d req, ok := outstanding[b.Tag]
172 4b33cdd0 2015-11-30 stephen.d if !ok {
173 4b33cdd0 2015-11-30 stephen.d panic("unknown tag received")
174 4b33cdd0 2015-11-30 stephen.d }
175 4b33cdd0 2015-11-30 stephen.d
176 4b33cdd0 2015-11-30 stephen.d // BUG(stevvooe): Must detect duplicate tag and ensure that we are
177 4b33cdd0 2015-11-30 stephen.d // waking up the right caller. If a duplicate is received, the
178 4b33cdd0 2015-11-30 stephen.d // entry should not be deleted.
179 4b33cdd0 2015-11-30 stephen.d delete(outstanding, b.Tag)
180 4b33cdd0 2015-11-30 stephen.d
181 4b33cdd0 2015-11-30 stephen.d req.response <- b
182 4b33cdd0 2015-11-30 stephen.d
183 4b33cdd0 2015-11-30 stephen.d // TODO(stevvooe): Reclaim tag id.
184 4b33cdd0 2015-11-30 stephen.d case <-t.ctx.Done():
185 4b33cdd0 2015-11-30 stephen.d return
186 4b33cdd0 2015-11-30 stephen.d case <-t.closed:
187 4b33cdd0 2015-11-30 stephen.d return
188 4b33cdd0 2015-11-30 stephen.d }
189 4b33cdd0 2015-11-30 stephen.d }
190 4b33cdd0 2015-11-30 stephen.d }
191 4b33cdd0 2015-11-30 stephen.d
192 4b33cdd0 2015-11-30 stephen.d func (t *transport) flush(ctx context.Context, tag Tag) error {
193 4b33cdd0 2015-11-30 stephen.d // TODO(stevvooe): We need to fire and forget flush messages when a call
194 4b33cdd0 2015-11-30 stephen.d // context gets cancelled.
195 4b33cdd0 2015-11-30 stephen.d panic("not implemented")
196 4b33cdd0 2015-11-30 stephen.d }
197 4b33cdd0 2015-11-30 stephen.d
198 4b33cdd0 2015-11-30 stephen.d func (t *transport) Close() error {
199 4b33cdd0 2015-11-30 stephen.d select {
200 4b33cdd0 2015-11-30 stephen.d case <-t.closed:
201 4b33cdd0 2015-11-30 stephen.d return ErrClosed
202 4b33cdd0 2015-11-30 stephen.d case <-t.ctx.Done():
203 4b33cdd0 2015-11-30 stephen.d return t.ctx.Err()
204 4b33cdd0 2015-11-30 stephen.d default:
205 4b33cdd0 2015-11-30 stephen.d close(t.closed)
206 4b33cdd0 2015-11-30 stephen.d }
207 4b33cdd0 2015-11-30 stephen.d
208 4b33cdd0 2015-11-30 stephen.d return nil
209 4b33cdd0 2015-11-30 stephen.d }