8 "golang.org/x/net/context"
11 // roundTripper manages the request and response from the client-side. A
12 // roundTripper must abide by similar rules to the http.RoundTripper.
13 // Typically, the roundTripper will manage tag assignment and message
15 type roundTripper interface {
16 send(ctx context.Context, msg Message) (Message, error)
19 // transport plays the role of being a client channel manager. It multiplexes
20 // function calls onto the wire and dispatches responses to blocking calls to
21 // send. On the whole, transport is thread-safe for calling send
22 type transport struct {
25 requests chan *fcallRequest
31 var _ roundTripper = &transport{}
33 func newTransport(ctx context.Context, ch *channel) roundTripper {
37 requests: make(chan *fcallRequest),
38 closed: make(chan struct{}),
46 // fcallRequest encompasses the request to send a message via fcall.
47 type fcallRequest struct {
54 func newFcallRequest(ctx context.Context, msg Message) *fcallRequest {
58 response: make(chan *Fcall, 1),
59 err: make(chan error, 1),
63 func (t *transport) send(ctx context.Context, msg Message) (Message, error) {
64 req := newFcallRequest(ctx, msg)
66 // dispatch the request.
72 case t.requests <- req:
75 // wait for the response.
81 case err := <-req.err:
83 case resp := <-req.response:
84 if resp.Type == Rerror {
85 // pack the error into something useful
86 respmesg, ok := resp.Message.(MessageRerror)
88 return nil, fmt.Errorf("invalid error response: %v", resp)
94 return resp.Message, nil
98 // handle takes messages off the wire and wakes up the waiting tag call.
99 func (t *transport) handle() {
101 log.Println("exited handle loop")
104 // the following variable block are protected components owned by this thread.
106 responses = make(chan *Fcall)
108 // outstanding provides a map of tags to outstanding requests.
109 outstanding = map[Tag]*fcallRequest{}
112 // loop to read messages off of the connection
115 log.Println("exited read loop")
121 if err := t.ch.ReadFcall(t.ctx, fcall); err != nil {
122 switch err := err.(type) {
124 if err.Timeout() || err.Temporary() {
125 // BUG(stevvooe): There may be partial reads under
126 // timeout errors where this is actually fatal.
128 // can only retry if we haven't offset the frame.
133 log.Println("fatal error reading msg:", err)
140 log.Println("ctx done")
143 log.Println("transport closed")
145 case responses <- fcall:
151 log.Println("wait...")
153 case req := <-t.requests:
154 // BUG(stevvooe): This is an awful tag allocation procedure.
155 // Replace this with something that let's us allocate tags and
156 // associate data with them, returning to them to a pool when
157 // complete. Such a system would provide a lot of information
158 // about outstanding requests.
160 fcall := newFcall(tags, req.message)
161 outstanding[fcall.Tag] = req
163 // TODO(stevvooe): Consider the case of requests that never
164 // receive a response. We need to remove the fcall context from
165 // the tag map and dealloc the tag. We may also want to send a
166 // flush for the tag.
167 if err := t.ch.WriteFcall(req.ctx, fcall); err != nil {
168 delete(outstanding, fcall.Tag)
171 case b := <-responses:
172 req, ok := outstanding[b.Tag]
174 panic("unknown tag received")
177 // BUG(stevvooe): Must detect duplicate tag and ensure that we are
178 // waking up the right caller. If a duplicate is received, the
179 // entry should not be deleted.
180 delete(outstanding, b.Tag)
184 // TODO(stevvooe): Reclaim tag id.
193 func (t *transport) flush(ctx context.Context, tag Tag) error {
194 // TODO(stevvooe): We need to fire and forget flush messages when a call
195 // context gets cancelled.
196 panic("not implemented")
199 func (t *transport) Close() error {