Blob


1 package p9p
3 import (
4 "errors"
5 "fmt"
6 "log"
7 "net"
8 "sync"
10 "context"
11 )
13 // roundTripper manages the request and response from the client-side. A
14 // roundTripper must abide by similar rules to the http.RoundTripper.
15 // Typically, the roundTripper will manage tag assignment and message
16 // serialization.
17 type roundTripper interface {
18 send(ctx context.Context, msg Message) (Message, error)
19 }
21 // transport plays the role of being a client channel manager. It multiplexes
22 // function calls onto the wire and dispatches responses to blocking calls to
23 // send. On the whole, transport is thread-safe for calling send
24 type transport struct {
25 ctx context.Context
26 ch Channel
27 requests chan *fcallRequest
29 shutdown chan struct{}
30 once sync.Once // protect closure of shutdown
31 closed chan struct{}
33 tags uint16
34 }
36 var _ roundTripper = &transport{}
38 func newTransport(ctx context.Context, ch Channel) roundTripper {
39 t := &transport{
40 ctx: ctx,
41 ch: ch,
42 requests: make(chan *fcallRequest),
43 shutdown: make(chan struct{}),
44 closed: make(chan struct{}),
45 }
47 go t.handle()
49 return t
50 }
52 // fcallRequest encompasses the request to send a message via fcall.
53 type fcallRequest struct {
54 ctx context.Context
55 message Message
56 response chan *Fcall
57 err chan error
58 }
60 func newFcallRequest(ctx context.Context, msg Message) *fcallRequest {
61 return &fcallRequest{
62 ctx: ctx,
63 message: msg,
64 response: make(chan *Fcall, 1),
65 err: make(chan error, 1),
66 }
67 }
69 func (t *transport) send(ctx context.Context, msg Message) (Message, error) {
70 req := newFcallRequest(ctx, msg)
72 // dispatch the request.
73 select {
74 case <-t.closed:
75 return nil, ErrClosed
76 case <-ctx.Done():
77 return nil, ctx.Err()
78 case t.requests <- req:
79 }
81 // wait for the response.
82 select {
83 case <-t.closed:
84 return nil, ErrClosed
85 case <-ctx.Done():
86 return nil, ctx.Err()
87 case err := <-req.err:
88 return nil, err
89 case resp := <-req.response:
90 if resp.Type == Rerror {
91 // pack the error into something useful
92 respmesg, ok := resp.Message.(MessageRerror)
93 if !ok {
94 return nil, fmt.Errorf("invalid error response: %v", resp)
95 }
97 return nil, respmesg
98 }
100 return resp.Message, nil
104 // allocateTag returns a valid tag given a tag pool map. It receives a hint as
105 // to where to start the tag search. It returns an error if the allocation is
106 // not possible. The provided map must not contain NOTAG as a key.
107 func allocateTag(r *fcallRequest, m map[Tag]*fcallRequest, hint Tag) (Tag, error) {
108 // The tag pool is depleted if 65535 (0xFFFF) tags are taken.
109 if len(m) >= 0xFFFF {
110 return 0, errors.New("tag pool depleted")
113 // Look for the first tag that doesn't exist in the map and return it.
114 for i := 0; i < 0xFFFF; i++ {
115 hint++
116 if hint == NOTAG {
117 hint = 0
120 if _, exists := m[hint]; !exists {
121 return hint, nil
125 return 0, errors.New("allocateTag: unexpected error")
128 // handle takes messages off the wire and wakes up the waiting tag call.
129 func (t *transport) handle() {
130 defer func() {
131 close(t.closed)
132 }()
134 // the following variable block are protected components owned by this thread.
135 var (
136 responses = make(chan *Fcall)
137 // outstanding provides a map of tags to outstanding requests.
138 outstanding = map[Tag]*fcallRequest{}
139 selected Tag
142 // loop to read messages off of the connection
143 go func() {
144 defer func() {
145 t.close() // single main loop
146 }()
147 loop:
148 for {
149 fcall := new(Fcall)
150 if err := t.ch.ReadFcall(t.ctx, fcall); err != nil {
151 switch err := err.(type) {
152 case net.Error:
153 if err.Timeout() || err.Temporary() {
154 // BUG(stevvooe): There may be partial reads under
155 // timeout errors where this is actually fatal.
157 // can only retry if we haven't offset the frame.
158 continue loop
162 log.Println("p9p: fatal error reading msg:", err)
163 return
166 select {
167 case <-t.ctx.Done():
168 return
169 case <-t.closed:
170 return
171 case responses <- fcall:
174 }()
176 for {
177 select {
178 case req := <-t.requests:
179 var err error
181 selected, err = allocateTag(req, outstanding, selected)
182 if err != nil {
183 req.err <- err
184 continue
187 outstanding[selected] = req
188 fcall := newFcall(selected, req.message)
190 // TODO(stevvooe): Consider the case of requests that never
191 // receive a response. We need to remove the fcall context from
192 // the tag map and dealloc the tag. We may also want to send a
193 // flush for the tag.
194 if err := t.ch.WriteFcall(req.ctx, fcall); err != nil {
195 delete(outstanding, fcall.Tag)
196 req.err <- err
198 case b := <-responses:
199 req, ok := outstanding[b.Tag]
200 if !ok {
201 // BUG(stevvooe): The exact handling of an unknown tag is
202 // unclear at this point. These may not necessarily fatal to
203 // the session, since they could be messages that the client no
204 // longer cares for. When we figure this out, replace this
205 // panic with something more sensible.
206 panic(fmt.Sprintf("unknown tag received: %v", b))
209 // BUG(stevvooe): Must detect duplicate tag and ensure that we are
210 // waking up the right caller. If a duplicate is received, the
211 // entry should not be deleted.
212 delete(outstanding, b.Tag)
214 req.response <- b
216 // TODO(stevvooe): Reclaim tag id.
217 case <-t.shutdown:
218 return
219 case <-t.ctx.Done():
220 return
225 func (t *transport) flush(ctx context.Context, tag Tag) error {
226 // TODO(stevvooe): We need to fire and forget flush messages when a call
227 // context gets cancelled.
228 panic("not implemented")
231 func (t *transport) Close() error {
232 t.close()
234 select {
235 case <-t.closed:
236 return nil
237 case <-t.ctx.Done():
238 return t.ctx.Err()
242 // close starts the shutdown process.
243 func (t *transport) close() {
244 t.once.Do(func() {
245 close(t.shutdown)
246 })