Blob


1 package p9pnew
3 import (
4 "fmt"
5 "log"
6 "net"
8 "golang.org/x/net/context"
9 )
11 // roundTripper manages the request and response from the client-side. A
12 // roundTripper must abide by many of the rules of http.RoundTripper.
13 // Typically, the roundTripper will manage tag assignment and message
14 // serialization.
15 type roundTripper interface {
16 send(ctx context.Context, fc *Fcall) (*Fcall, error)
17 }
19 type transport struct {
20 ctx context.Context
21 ch *channel
22 requests chan *fcallRequest
23 closed chan struct{}
25 tags uint16
26 }
28 func newTransport(ctx context.Context, ch *channel) roundTripper {
29 t := &transport{
30 ctx: ctx,
31 ch: ch,
32 requests: make(chan *fcallRequest),
33 closed: make(chan struct{}),
34 }
36 go t.handle()
38 return t
39 }
41 type fcallRequest struct {
42 ctx context.Context
43 fcall *Fcall
44 response chan *Fcall
45 err chan error
46 }
48 func newFcallRequest(ctx context.Context, fcall *Fcall) *fcallRequest {
49 return &fcallRequest{
50 ctx: ctx,
51 fcall: fcall,
52 response: make(chan *Fcall, 1),
53 err: make(chan error, 1),
54 }
55 }
57 func (t *transport) send(ctx context.Context, fcall *Fcall) (*Fcall, error) {
58 req := newFcallRequest(ctx, fcall)
60 log.Println("dispatch", fcall)
61 // dispatch the request.
62 select {
63 case <-t.closed:
64 return nil, ErrClosed
65 case <-ctx.Done():
66 return nil, ctx.Err()
67 case t.requests <- req:
68 }
70 log.Println("wait", fcall)
71 // wait for the response.
72 select {
73 case <-t.closed:
74 return nil, ErrClosed
75 case <-ctx.Done():
76 return nil, ctx.Err()
77 case err := <-req.err:
78 return nil, err
79 case resp := <-req.response:
80 log.Println("resp", resp)
81 if resp.Type == Rerror {
82 // pack the error into something useful
83 respmesg, ok := resp.Message.(*MessageRerror)
84 if !ok {
85 return nil, fmt.Errorf("invalid error response: %v", resp)
86 }
88 return nil, new9pError(respmesg.Ename)
89 }
91 return resp, nil
92 }
93 }
95 // handle takes messages off the wire and wakes up the waiting tag call.
96 func (t *transport) handle() {
97 defer func() {
98 log.Println("exited handle loop")
99 close(t.closed)
100 }()
101 // the following variable block are protected components owned by this thread.
102 var (
103 responses = make(chan *Fcall)
104 tags Tag
105 // outstanding provides a map of tags to outstanding requests.
106 outstanding = map[Tag]*fcallRequest{}
109 // loop to read messages off of the connection
110 go func() {
111 defer func() {
112 log.Println("exited read loop")
113 close(t.closed)
114 }()
115 loop:
116 for {
117 fcall := new(Fcall)
118 if err := t.ch.readFcall(t.ctx, fcall); err != nil {
119 switch err := err.(type) {
120 case net.Error:
121 if err.Timeout() || err.Temporary() {
122 // BUG(stevvooe): There may be partial reads under
123 // timeout errors where this is actually fatal.
125 // can only retry if we haven't offset the frame.
126 continue loop
130 log.Println("fatal error reading msg:", err)
131 t.Close()
132 return
135 select {
136 case <-t.ctx.Done():
137 log.Println("ctx done")
138 return
139 case <-t.closed:
140 log.Println("transport closed")
141 return
142 case responses <- fcall:
145 }()
147 for {
148 log.Println("wait...")
149 select {
150 case req := <-t.requests:
151 if req.fcall.Tag == NOTAG {
152 // NOTE(stevvooe): We disallow fcalls with NOTAG to come
153 // through this path since we can't join the tagged response
154 // with the waiting caller. This is typically used for the
155 // Tversion/Rversion round trip to setup a session.
156 //
157 // It may be better to allow these through but block all
158 // requests until a notag message has a response.
160 req.err <- fmt.Errorf("disallowed tag through transport")
161 continue
164 // BUG(stevvooe): This is an awful tag allocation procedure.
165 // Replace this with something that let's us allocate tags and
166 // associate data with them, returning to them to a pool when
167 // complete. Such a system would provide a lot of information
168 // about outstanding requests.
169 tags++
170 req.fcall.Tag = tags
171 outstanding[req.fcall.Tag] = req
173 // TODO(stevvooe): Consider the case of requests that never
174 // receive a response. We need to remove the fcall context from
175 // the tag map and dealloc the tag. We may also want to send a
176 // flush for the tag.
177 if err := t.ch.writeFcall(req.ctx, req.fcall); err != nil {
178 log.Println("error writing fcall", err, req.fcall)
179 delete(outstanding, req.fcall.Tag)
180 req.err <- err
183 log.Println("sent", req.fcall)
184 case b := <-responses:
185 log.Println("recv", b)
186 req, ok := outstanding[b.Tag]
187 if !ok {
188 panic("unknown tag received")
190 delete(outstanding, req.fcall.Tag)
192 req.response <- b
194 // TODO(stevvooe): Reclaim tag id.
195 case <-t.ctx.Done():
196 return
197 case <-t.closed:
198 return
203 func (t *transport) Close() error {
204 select {
205 case <-t.closed:
206 return ErrClosed
207 case <-t.ctx.Done():
208 return t.ctx.Err()
209 default:
210 close(t.closed)
213 return nil