1 4b33cdd0 2015-11-30 stephen.d package p9p
3 4b33cdd0 2015-11-30 stephen.d import (
10 529e2b2e 2016-11-14 noreply "context"
13 4b33cdd0 2015-11-30 stephen.d // roundTripper manages the request and response from the client-side. A
14 4b33cdd0 2015-11-30 stephen.d // roundTripper must abide by similar rules to the http.RoundTripper.
15 4b33cdd0 2015-11-30 stephen.d // Typically, the roundTripper will manage tag assignment and message
16 4b33cdd0 2015-11-30 stephen.d // serialization.
17 4b33cdd0 2015-11-30 stephen.d type roundTripper interface {
18 4b33cdd0 2015-11-30 stephen.d send(ctx context.Context, msg Message) (Message, error)
21 4b33cdd0 2015-11-30 stephen.d // transport plays the role of being a client channel manager. It multiplexes
22 4b33cdd0 2015-11-30 stephen.d // function calls onto the wire and dispatches responses to blocking calls to
23 4b33cdd0 2015-11-30 stephen.d // send. On the whole, transport is thread-safe for calling send
24 4b33cdd0 2015-11-30 stephen.d type transport struct {
25 4b33cdd0 2015-11-30 stephen.d ctx context.Context
26 4b33cdd0 2015-11-30 stephen.d ch Channel
27 4b33cdd0 2015-11-30 stephen.d requests chan *fcallRequest
29 b9021eb7 2016-09-19 noreply shutdown chan struct{}
30 b9021eb7 2016-09-19 noreply once sync.Once // protect closure of shutdown
31 4b33cdd0 2015-11-30 stephen.d closed chan struct{}
33 4b33cdd0 2015-11-30 stephen.d tags uint16
36 4b33cdd0 2015-11-30 stephen.d var _ roundTripper = &transport{}
38 20882a7b 2016-05-25 stevvooe func newTransport(ctx context.Context, ch Channel) roundTripper {
39 4b33cdd0 2015-11-30 stephen.d t := &transport{
40 4b33cdd0 2015-11-30 stephen.d ctx: ctx,
41 4b33cdd0 2015-11-30 stephen.d ch: ch,
42 4b33cdd0 2015-11-30 stephen.d requests: make(chan *fcallRequest),
43 b9021eb7 2016-09-19 noreply shutdown: make(chan struct{}),
44 4b33cdd0 2015-11-30 stephen.d closed: make(chan struct{}),
47 4b33cdd0 2015-11-30 stephen.d go t.handle()
49 4b33cdd0 2015-11-30 stephen.d return t
52 4b33cdd0 2015-11-30 stephen.d // fcallRequest encompasses the request to send a message via fcall.
53 4b33cdd0 2015-11-30 stephen.d type fcallRequest struct {
54 4b33cdd0 2015-11-30 stephen.d ctx context.Context
55 4b33cdd0 2015-11-30 stephen.d message Message
56 4b33cdd0 2015-11-30 stephen.d response chan *Fcall
57 4b33cdd0 2015-11-30 stephen.d err chan error
60 4b33cdd0 2015-11-30 stephen.d func newFcallRequest(ctx context.Context, msg Message) *fcallRequest {
61 4b33cdd0 2015-11-30 stephen.d return &fcallRequest{
62 4b33cdd0 2015-11-30 stephen.d ctx: ctx,
63 4b33cdd0 2015-11-30 stephen.d message: msg,
64 4b33cdd0 2015-11-30 stephen.d response: make(chan *Fcall, 1),
65 4b33cdd0 2015-11-30 stephen.d err: make(chan error, 1),
69 4b33cdd0 2015-11-30 stephen.d func (t *transport) send(ctx context.Context, msg Message) (Message, error) {
70 4b33cdd0 2015-11-30 stephen.d req := newFcallRequest(ctx, msg)
72 4b33cdd0 2015-11-30 stephen.d // dispatch the request.
73 4b33cdd0 2015-11-30 stephen.d select {
74 4b33cdd0 2015-11-30 stephen.d case <-t.closed:
75 4b33cdd0 2015-11-30 stephen.d return nil, ErrClosed
76 4b33cdd0 2015-11-30 stephen.d case <-ctx.Done():
77 4b33cdd0 2015-11-30 stephen.d return nil, ctx.Err()
78 4b33cdd0 2015-11-30 stephen.d case t.requests <- req:
81 4b33cdd0 2015-11-30 stephen.d // wait for the response.
82 4b33cdd0 2015-11-30 stephen.d select {
83 4b33cdd0 2015-11-30 stephen.d case <-t.closed:
84 4b33cdd0 2015-11-30 stephen.d return nil, ErrClosed
85 4b33cdd0 2015-11-30 stephen.d case <-ctx.Done():
86 4b33cdd0 2015-11-30 stephen.d return nil, ctx.Err()
87 4b33cdd0 2015-11-30 stephen.d case err := <-req.err:
88 4b33cdd0 2015-11-30 stephen.d return nil, err
89 4b33cdd0 2015-11-30 stephen.d case resp := <-req.response:
90 4b33cdd0 2015-11-30 stephen.d if resp.Type == Rerror {
91 4b33cdd0 2015-11-30 stephen.d // pack the error into something useful
92 4b33cdd0 2015-11-30 stephen.d respmesg, ok := resp.Message.(MessageRerror)
93 4b33cdd0 2015-11-30 stephen.d if !ok {
94 4b33cdd0 2015-11-30 stephen.d return nil, fmt.Errorf("invalid error response: %v", resp)
97 4b33cdd0 2015-11-30 stephen.d return nil, respmesg
100 4b33cdd0 2015-11-30 stephen.d return resp.Message, nil
104 11b5e5c7 2016-05-25 stevvooe // allocateTag returns a valid tag given a tag pool map. It receives a hint as
105 11b5e5c7 2016-05-25 stevvooe // to where to start the tag search. It returns an error if the allocation is
106 11b5e5c7 2016-05-25 stevvooe // not possible. The provided map must not contain NOTAG as a key.
107 11b5e5c7 2016-05-25 stevvooe func allocateTag(r *fcallRequest, m map[Tag]*fcallRequest, hint Tag) (Tag, error) {
108 11b5e5c7 2016-05-25 stevvooe // The tag pool is depleted if 65535 (0xFFFF) tags are taken.
109 11b5e5c7 2016-05-25 stevvooe if len(m) >= 0xFFFF {
110 11b5e5c7 2016-05-25 stevvooe return 0, errors.New("tag pool depleted")
113 11b5e5c7 2016-05-25 stevvooe // Look for the first tag that doesn't exist in the map and return it.
114 11b5e5c7 2016-05-25 stevvooe for i := 0; i < 0xFFFF; i++ {
116 11b5e5c7 2016-05-25 stevvooe if hint == NOTAG {
117 11b5e5c7 2016-05-25 stevvooe hint = 0
120 11b5e5c7 2016-05-25 stevvooe if _, exists := m[hint]; !exists {
121 11b5e5c7 2016-05-25 stevvooe return hint, nil
125 11b5e5c7 2016-05-25 stevvooe return 0, errors.New("allocateTag: unexpected error")
128 4b33cdd0 2015-11-30 stephen.d // handle takes messages off the wire and wakes up the waiting tag call.
129 4b33cdd0 2015-11-30 stephen.d func (t *transport) handle() {
130 4b33cdd0 2015-11-30 stephen.d defer func() {
131 4b33cdd0 2015-11-30 stephen.d log.Println("exited handle loop")
132 b9021eb7 2016-09-19 noreply close(t.closed)
135 4b33cdd0 2015-11-30 stephen.d // the following variable block are protected components owned by this thread.
137 4b33cdd0 2015-11-30 stephen.d responses = make(chan *Fcall)
138 4b33cdd0 2015-11-30 stephen.d // outstanding provides a map of tags to outstanding requests.
139 4b33cdd0 2015-11-30 stephen.d outstanding = map[Tag]*fcallRequest{}
140 11b5e5c7 2016-05-25 stevvooe selected Tag
143 4b33cdd0 2015-11-30 stephen.d // loop to read messages off of the connection
144 4b33cdd0 2015-11-30 stephen.d go func() {
145 4b33cdd0 2015-11-30 stephen.d defer func() {
146 4b33cdd0 2015-11-30 stephen.d log.Println("exited read loop")
147 b9021eb7 2016-09-19 noreply t.close() // single main loop
151 4b33cdd0 2015-11-30 stephen.d fcall := new(Fcall)
152 4b33cdd0 2015-11-30 stephen.d if err := t.ch.ReadFcall(t.ctx, fcall); err != nil {
153 4b33cdd0 2015-11-30 stephen.d switch err := err.(type) {
154 4b33cdd0 2015-11-30 stephen.d case net.Error:
155 4b33cdd0 2015-11-30 stephen.d if err.Timeout() || err.Temporary() {
156 4b33cdd0 2015-11-30 stephen.d // BUG(stevvooe): There may be partial reads under
157 4b33cdd0 2015-11-30 stephen.d // timeout errors where this is actually fatal.
159 4b33cdd0 2015-11-30 stephen.d // can only retry if we haven't offset the frame.
160 4b33cdd0 2015-11-30 stephen.d continue loop
164 4b33cdd0 2015-11-30 stephen.d log.Println("fatal error reading msg:", err)
165 4b33cdd0 2015-11-30 stephen.d return
168 4b33cdd0 2015-11-30 stephen.d select {
169 4b33cdd0 2015-11-30 stephen.d case <-t.ctx.Done():
170 4b33cdd0 2015-11-30 stephen.d return
171 4b33cdd0 2015-11-30 stephen.d case <-t.closed:
172 4b33cdd0 2015-11-30 stephen.d log.Println("transport closed")
173 4b33cdd0 2015-11-30 stephen.d return
174 4b33cdd0 2015-11-30 stephen.d case responses <- fcall:
180 4b33cdd0 2015-11-30 stephen.d select {
181 4b33cdd0 2015-11-30 stephen.d case req := <-t.requests:
182 11b5e5c7 2016-05-25 stevvooe var err error
184 11b5e5c7 2016-05-25 stevvooe selected, err = allocateTag(req, outstanding, selected)
185 11b5e5c7 2016-05-25 stevvooe if err != nil {
186 11b5e5c7 2016-05-25 stevvooe req.err <- err
187 11b5e5c7 2016-05-25 stevvooe continue
190 11b5e5c7 2016-05-25 stevvooe outstanding[selected] = req
191 11b5e5c7 2016-05-25 stevvooe fcall := newFcall(selected, req.message)
193 4b33cdd0 2015-11-30 stephen.d // TODO(stevvooe): Consider the case of requests that never
194 4b33cdd0 2015-11-30 stephen.d // receive a response. We need to remove the fcall context from
195 4b33cdd0 2015-11-30 stephen.d // the tag map and dealloc the tag. We may also want to send a
196 4b33cdd0 2015-11-30 stephen.d // flush for the tag.
197 4b33cdd0 2015-11-30 stephen.d if err := t.ch.WriteFcall(req.ctx, fcall); err != nil {
198 4b33cdd0 2015-11-30 stephen.d delete(outstanding, fcall.Tag)
199 4b33cdd0 2015-11-30 stephen.d req.err <- err
201 4b33cdd0 2015-11-30 stephen.d case b := <-responses:
202 4b33cdd0 2015-11-30 stephen.d req, ok := outstanding[b.Tag]
203 4b33cdd0 2015-11-30 stephen.d if !ok {
204 f52b8701 2016-05-19 stevvooe // BUG(stevvooe): The exact handling of an unknown tag is
205 f52b8701 2016-05-19 stevvooe // unclear at this point. These may not necessarily fatal to
206 f52b8701 2016-05-19 stevvooe // the session, since they could be messages that the client no
207 f52b8701 2016-05-19 stevvooe // longer cares for. When we figure this out, replace this
208 f52b8701 2016-05-19 stevvooe // panic with something more sensible.
209 f52b8701 2016-05-19 stevvooe panic(fmt.Sprintf("unknown tag received: %v", b))
212 4b33cdd0 2015-11-30 stephen.d // BUG(stevvooe): Must detect duplicate tag and ensure that we are
213 4b33cdd0 2015-11-30 stephen.d // waking up the right caller. If a duplicate is received, the
214 4b33cdd0 2015-11-30 stephen.d // entry should not be deleted.
215 4b33cdd0 2015-11-30 stephen.d delete(outstanding, b.Tag)
217 4b33cdd0 2015-11-30 stephen.d req.response <- b
219 4b33cdd0 2015-11-30 stephen.d // TODO(stevvooe): Reclaim tag id.
220 b9021eb7 2016-09-19 noreply case <-t.shutdown:
222 4b33cdd0 2015-11-30 stephen.d case <-t.ctx.Done():
223 4b33cdd0 2015-11-30 stephen.d return
228 4b33cdd0 2015-11-30 stephen.d func (t *transport) flush(ctx context.Context, tag Tag) error {
229 4b33cdd0 2015-11-30 stephen.d // TODO(stevvooe): We need to fire and forget flush messages when a call
230 4b33cdd0 2015-11-30 stephen.d // context gets cancelled.
231 4b33cdd0 2015-11-30 stephen.d panic("not implemented")
234 4b33cdd0 2015-11-30 stephen.d func (t *transport) Close() error {
235 b9021eb7 2016-09-19 noreply t.close()
237 4b33cdd0 2015-11-30 stephen.d select {
238 4b33cdd0 2015-11-30 stephen.d case <-t.closed:
239 b9021eb7 2016-09-19 noreply return nil
240 4b33cdd0 2015-11-30 stephen.d case <-t.ctx.Done():
241 4b33cdd0 2015-11-30 stephen.d return t.ctx.Err()
245 b9021eb7 2016-09-19 noreply // close starts the shutdown process.
246 b9021eb7 2016-09-19 noreply func (t *transport) close() {
247 b9021eb7 2016-09-19 noreply t.once.Do(func() {
248 b9021eb7 2016-09-19 noreply close(t.shutdown)