Blob


1 package p9p
3 import (
4 "errors"
5 "fmt"
6 "log"
7 "net"
9 "golang.org/x/net/context"
10 )
12 // roundTripper manages the request and response from the client-side. A
13 // roundTripper must abide by similar rules to the http.RoundTripper.
14 // Typically, the roundTripper will manage tag assignment and message
15 // serialization.
16 type roundTripper interface {
17 send(ctx context.Context, msg Message) (Message, error)
18 }
20 // transport plays the role of being a client channel manager. It multiplexes
21 // function calls onto the wire and dispatches responses to blocking calls to
22 // send. On the whole, transport is thread-safe for calling send
23 type transport struct {
24 ctx context.Context
25 ch Channel
26 requests chan *fcallRequest
27 closed chan struct{}
29 tags uint16
30 }
32 var _ roundTripper = &transport{}
34 func newTransport(ctx context.Context, ch *channel) roundTripper {
35 t := &transport{
36 ctx: ctx,
37 ch: ch,
38 requests: make(chan *fcallRequest),
39 closed: make(chan struct{}),
40 }
42 go t.handle()
44 return t
45 }
47 // fcallRequest encompasses the request to send a message via fcall.
48 type fcallRequest struct {
49 ctx context.Context
50 message Message
51 response chan *Fcall
52 err chan error
53 }
55 func newFcallRequest(ctx context.Context, msg Message) *fcallRequest {
56 return &fcallRequest{
57 ctx: ctx,
58 message: msg,
59 response: make(chan *Fcall, 1),
60 err: make(chan error, 1),
61 }
62 }
64 func (t *transport) send(ctx context.Context, msg Message) (Message, error) {
65 req := newFcallRequest(ctx, msg)
67 // dispatch the request.
68 select {
69 case <-t.closed:
70 return nil, ErrClosed
71 case <-ctx.Done():
72 return nil, ctx.Err()
73 case t.requests <- req:
74 }
76 // wait for the response.
77 select {
78 case <-t.closed:
79 return nil, ErrClosed
80 case <-ctx.Done():
81 return nil, ctx.Err()
82 case err := <-req.err:
83 return nil, err
84 case resp := <-req.response:
85 if resp.Type == Rerror {
86 // pack the error into something useful
87 respmesg, ok := resp.Message.(MessageRerror)
88 if !ok {
89 return nil, fmt.Errorf("invalid error response: %v", resp)
90 }
92 return nil, respmesg
93 }
95 return resp.Message, nil
96 }
97 }
99 // allocateTag returns a valid tag given a tag pool map. It receives a hint as
100 // to where to start the tag search. It returns an error if the allocation is
101 // not possible.
102 func allocateTag(r *fcallRequest, m map[Tag]*fcallRequest, hint Tag) (Tag, error) {
103 // Tversion can only use NOTAG, so check if we're sending a Tversion.
104 if r.message.Type() == Tversion {
105 if _, exists := m[NOTAG]; exists {
106 return 0, errors.New("NOTAG already in use")
108 return NOTAG, nil
111 // The tag pool is depleted if all 65536 tags are taken, or if 65535 tags
112 // are taken and NOTAG is available.
113 if len(m) > 0xFFFF {
114 return 0, errors.New("tag pool depleted")
115 } else if len(m) == 0xFFFF {
116 if _, exists := m[NOTAG]; !exists {
117 return 0, errors.New("tag pool depleted")
121 // Look for the first tag that doesn't exist in the map and return it.
122 for i := 0; i < 0xFFFF; i++ {
123 hint++
124 if hint == NOTAG {
125 hint = 0
128 if _, exists := m[hint]; !exists {
129 return hint, nil
133 return 0, errors.New("allocateTag: unexpected error")
136 // handle takes messages off the wire and wakes up the waiting tag call.
137 func (t *transport) handle() {
138 defer func() {
139 log.Println("exited handle loop")
140 t.Close()
141 }()
142 // the following variable block are protected components owned by this thread.
143 var (
144 responses = make(chan *Fcall)
145 // outstanding provides a map of tags to outstanding requests.
146 outstanding = map[Tag]*fcallRequest{}
147 selected Tag
150 // loop to read messages off of the connection
151 go func() {
152 defer func() {
153 log.Println("exited read loop")
154 t.Close()
155 }()
156 loop:
157 for {
158 fcall := new(Fcall)
159 if err := t.ch.ReadFcall(t.ctx, fcall); err != nil {
160 switch err := err.(type) {
161 case net.Error:
162 if err.Timeout() || err.Temporary() {
163 // BUG(stevvooe): There may be partial reads under
164 // timeout errors where this is actually fatal.
166 // can only retry if we haven't offset the frame.
167 continue loop
171 log.Println("fatal error reading msg:", err)
172 t.Close()
173 return
176 select {
177 case <-t.ctx.Done():
178 log.Println("ctx done")
179 return
180 case <-t.closed:
181 log.Println("transport closed")
182 return
183 case responses <- fcall:
186 }()
188 for {
189 select {
190 case req := <-t.requests:
191 var err error
193 selected, err = allocateTag(req, outstanding, selected)
194 if err != nil {
195 req.err <- err
196 continue
199 outstanding[selected] = req
200 fcall := newFcall(selected, req.message)
202 // TODO(stevvooe): Consider the case of requests that never
203 // receive a response. We need to remove the fcall context from
204 // the tag map and dealloc the tag. We may also want to send a
205 // flush for the tag.
206 if err := t.ch.WriteFcall(req.ctx, fcall); err != nil {
207 delete(outstanding, fcall.Tag)
208 req.err <- err
210 case b := <-responses:
211 req, ok := outstanding[b.Tag]
212 if !ok {
213 // BUG(stevvooe): The exact handling of an unknown tag is
214 // unclear at this point. These may not necessarily fatal to
215 // the session, since they could be messages that the client no
216 // longer cares for. When we figure this out, replace this
217 // panic with something more sensible.
218 panic(fmt.Sprintf("unknown tag received: %v", b))
221 // BUG(stevvooe): Must detect duplicate tag and ensure that we are
222 // waking up the right caller. If a duplicate is received, the
223 // entry should not be deleted.
224 delete(outstanding, b.Tag)
226 req.response <- b
228 // TODO(stevvooe): Reclaim tag id.
229 case <-t.ctx.Done():
230 return
231 case <-t.closed:
232 return
237 func (t *transport) flush(ctx context.Context, tag Tag) error {
238 // TODO(stevvooe): We need to fire and forget flush messages when a call
239 // context gets cancelled.
240 panic("not implemented")
243 func (t *transport) Close() error {
244 select {
245 case <-t.closed:
246 return ErrClosed
247 case <-t.ctx.Done():
248 return t.ctx.Err()
249 default:
250 close(t.closed)
253 return nil