Blob


1 package p9pnew
3 import (
4 "fmt"
5 "log"
6 "net"
8 "golang.org/x/net/context"
9 )
11 // roundTripper manages the request and response from the client-side. A
12 // roundTripper must abide by many of the rules of http.RoundTripper.
13 // Typically, the roundTripper will manage tag assignment and message
14 // serialization.
15 type roundTripper interface {
16 send(ctx context.Context, fc *Fcall) (*Fcall, error)
17 }
19 // transport plays the role of being a client channel manager. It multiplexes
20 // function calls onto the wire and dispatches responses to blocking calls to
21 // send. On the whole, transport is thread-safe for calling send
22 type transport struct {
23 ctx context.Context
24 ch Channel
25 requests chan *fcallRequest
26 closed chan struct{}
28 tags uint16
29 }
31 func newTransport(ctx context.Context, ch *channel) roundTripper {
32 t := &transport{
33 ctx: ctx,
34 ch: ch,
35 requests: make(chan *fcallRequest),
36 closed: make(chan struct{}),
37 }
39 go t.handle()
41 return t
42 }
44 type fcallRequest struct {
45 ctx context.Context
46 fcall *Fcall
47 response chan *Fcall
48 err chan error
49 }
51 func newFcallRequest(ctx context.Context, fcall *Fcall) *fcallRequest {
52 return &fcallRequest{
53 ctx: ctx,
54 fcall: fcall,
55 response: make(chan *Fcall, 1),
56 err: make(chan error, 1),
57 }
58 }
60 func (t *transport) send(ctx context.Context, fcall *Fcall) (*Fcall, error) {
61 req := newFcallRequest(ctx, fcall)
63 log.Println("dispatch", fcall)
64 // dispatch the request.
65 select {
66 case <-t.closed:
67 return nil, ErrClosed
68 case <-ctx.Done():
69 return nil, ctx.Err()
70 case t.requests <- req:
71 }
73 log.Println("wait", fcall)
74 // wait for the response.
75 select {
76 case <-t.closed:
77 return nil, ErrClosed
78 case <-ctx.Done():
79 return nil, ctx.Err()
80 case err := <-req.err:
81 return nil, err
82 case resp := <-req.response:
83 log.Println("resp", resp)
84 if resp.Type == Rerror {
85 // pack the error into something useful
86 respmesg, ok := resp.Message.(*MessageRerror)
87 if !ok {
88 return nil, fmt.Errorf("invalid error response: %v", resp)
89 }
91 return nil, new9pError(respmesg.Ename)
92 }
94 return resp, nil
95 }
96 }
98 // handle takes messages off the wire and wakes up the waiting tag call.
99 func (t *transport) handle() {
100 defer func() {
101 log.Println("exited handle loop")
102 close(t.closed)
103 }()
104 // the following variable block are protected components owned by this thread.
105 var (
106 responses = make(chan *Fcall)
107 tags Tag
108 // outstanding provides a map of tags to outstanding requests.
109 outstanding = map[Tag]*fcallRequest{}
112 // loop to read messages off of the connection
113 go func() {
114 defer func() {
115 log.Println("exited read loop")
116 close(t.closed)
117 }()
118 loop:
119 for {
120 fcall := new(Fcall)
121 if err := t.ch.ReadFcall(t.ctx, fcall); err != nil {
122 switch err := err.(type) {
123 case net.Error:
124 if err.Timeout() || err.Temporary() {
125 // BUG(stevvooe): There may be partial reads under
126 // timeout errors where this is actually fatal.
128 // can only retry if we haven't offset the frame.
129 continue loop
133 log.Println("fatal error reading msg:", err)
134 t.Close()
135 return
138 select {
139 case <-t.ctx.Done():
140 log.Println("ctx done")
141 return
142 case <-t.closed:
143 log.Println("transport closed")
144 return
145 case responses <- fcall:
148 }()
150 for {
151 log.Println("wait...")
152 select {
153 case req := <-t.requests:
154 if req.fcall.Tag == NOTAG {
155 // NOTE(stevvooe): We disallow fcalls with NOTAG to come
156 // through this path since we can't join the tagged response
157 // with the waiting caller. This is typically used for the
158 // Tversion/Rversion round trip to setup a session.
159 //
160 // It may be better to allow these through but block all
161 // requests until a notag message has a response.
163 req.err <- fmt.Errorf("disallowed tag through transport")
164 continue
167 // BUG(stevvooe): This is an awful tag allocation procedure.
168 // Replace this with something that let's us allocate tags and
169 // associate data with them, returning to them to a pool when
170 // complete. Such a system would provide a lot of information
171 // about outstanding requests.
172 tags++
173 req.fcall.Tag = tags
174 outstanding[req.fcall.Tag] = req
176 // TODO(stevvooe): Consider the case of requests that never
177 // receive a response. We need to remove the fcall context from
178 // the tag map and dealloc the tag. We may also want to send a
179 // flush for the tag.
180 if err := t.ch.WriteFcall(req.ctx, req.fcall); err != nil {
181 log.Println("error writing fcall", err, req.fcall)
182 delete(outstanding, req.fcall.Tag)
183 req.err <- err
186 log.Println("sent", req.fcall)
187 case b := <-responses:
188 log.Println("recv", b)
189 req, ok := outstanding[b.Tag]
190 if !ok {
191 panic("unknown tag received")
193 delete(outstanding, req.fcall.Tag)
195 req.response <- b
197 // TODO(stevvooe): Reclaim tag id.
198 case <-t.ctx.Done():
199 return
200 case <-t.closed:
201 return
206 func (t *transport) Close() error {
207 select {
208 case <-t.closed:
209 return ErrClosed
210 case <-t.ctx.Done():
211 return t.ctx.Err()
212 default:
213 close(t.closed)
216 return nil