9 "golang.org/x/net/context"
12 // roundTripper manages the request and response from the client-side. A
13 // roundTripper must abide by similar rules to the http.RoundTripper.
14 // Typically, the roundTripper will manage tag assignment and message
16 type roundTripper interface {
17 send(ctx context.Context, msg Message) (Message, error)
20 // transport plays the role of being a client channel manager. It multiplexes
21 // function calls onto the wire and dispatches responses to blocking calls to
22 // send. On the whole, transport is thread-safe for calling send
23 type transport struct {
26 requests chan *fcallRequest
32 var _ roundTripper = &transport{}
34 func newTransport(ctx context.Context, ch *channel) roundTripper {
38 requests: make(chan *fcallRequest),
39 closed: make(chan struct{}),
47 // fcallRequest encompasses the request to send a message via fcall.
48 type fcallRequest struct {
55 func newFcallRequest(ctx context.Context, msg Message) *fcallRequest {
59 response: make(chan *Fcall, 1),
60 err: make(chan error, 1),
64 func (t *transport) send(ctx context.Context, msg Message) (Message, error) {
65 req := newFcallRequest(ctx, msg)
67 // dispatch the request.
73 case t.requests <- req:
76 // wait for the response.
82 case err := <-req.err:
84 case resp := <-req.response:
85 if resp.Type == Rerror {
86 // pack the error into something useful
87 respmesg, ok := resp.Message.(MessageRerror)
89 return nil, fmt.Errorf("invalid error response: %v", resp)
95 return resp.Message, nil
99 func allocateTag(r *fcallRequest, m map[Tag]*fcallRequest) (Tag, error) {
100 // Tversion can only use NOTAG, so check if we're sending a Tversion.
101 if r.message.Type() == Tversion {
102 if _, exists := m[NOTAG]; exists {
103 return 0, errors.New("NOTAG already in use")
108 // The tag pool is depleted if all 65536 tags are taken, or if 65535 tags
109 // are taken and NOTAG is available.
111 return 0, errors.New("tag pool depleted")
112 } else if len(m) == 0xFFFF {
113 if _, exists := m[NOTAG]; !exists {
114 return 0, errors.New("tag pool depleted")
118 // Look for the first tag that doesn't exist in the map and return it.
119 for selected := Tag(0); selected < NOTAG; selected++ {
120 if _, exists := m[selected]; !exists {
125 return 0, errors.New("allocateTag: unexpected error")
128 // handle takes messages off the wire and wakes up the waiting tag call.
129 func (t *transport) handle() {
131 log.Println("exited handle loop")
134 // the following variable block are protected components owned by this thread.
136 responses = make(chan *Fcall)
137 // outstanding provides a map of tags to outstanding requests.
138 outstanding = map[Tag]*fcallRequest{}
141 // loop to read messages off of the connection
144 log.Println("exited read loop")
150 if err := t.ch.ReadFcall(t.ctx, fcall); err != nil {
151 switch err := err.(type) {
153 if err.Timeout() || err.Temporary() {
154 // BUG(stevvooe): There may be partial reads under
155 // timeout errors where this is actually fatal.
157 // can only retry if we haven't offset the frame.
162 log.Println("fatal error reading msg:", err)
169 log.Println("ctx done")
172 log.Println("transport closed")
174 case responses <- fcall:
181 case req := <-t.requests:
182 selected, err := allocateTag(req, outstanding)
188 outstanding[selected] = req
189 fcall := newFcall(selected, req.message)
191 // TODO(stevvooe): Consider the case of requests that never
192 // receive a response. We need to remove the fcall context from
193 // the tag map and dealloc the tag. We may also want to send a
194 // flush for the tag.
195 if err := t.ch.WriteFcall(req.ctx, fcall); err != nil {
196 delete(outstanding, fcall.Tag)
199 case b := <-responses:
200 req, ok := outstanding[b.Tag]
202 // BUG(stevvooe): The exact handling of an unknown tag is
203 // unclear at this point. These may not necessarily fatal to
204 // the session, since they could be messages that the client no
205 // longer cares for. When we figure this out, replace this
206 // panic with something more sensible.
207 panic(fmt.Sprintf("unknown tag received: %v", b))
210 // BUG(stevvooe): Must detect duplicate tag and ensure that we are
211 // waking up the right caller. If a duplicate is received, the
212 // entry should not be deleted.
213 delete(outstanding, b.Tag)
217 // TODO(stevvooe): Reclaim tag id.
226 func (t *transport) flush(ctx context.Context, tag Tag) error {
227 // TODO(stevvooe): We need to fire and forget flush messages when a call
228 // context gets cancelled.
229 panic("not implemented")
232 func (t *transport) Close() error {