Blame


1 4b33cdd0 2015-11-30 stephen.d package p9p
2 4b33cdd0 2015-11-30 stephen.d
3 4b33cdd0 2015-11-30 stephen.d import (
4 4b33cdd0 2015-11-30 stephen.d "fmt"
5 4b33cdd0 2015-11-30 stephen.d "log"
6 4b33cdd0 2015-11-30 stephen.d "net"
7 4f41ff66 2016-08-12 noreply "sync"
8 4b33cdd0 2015-11-30 stephen.d "time"
9 4b33cdd0 2015-11-30 stephen.d
10 529e2b2e 2016-11-14 noreply "context"
11 4b33cdd0 2015-11-30 stephen.d )
12 4b33cdd0 2015-11-30 stephen.d
13 4b33cdd0 2015-11-30 stephen.d // TODO(stevvooe): Add net/http.Server-like type here to manage connections.
14 4b33cdd0 2015-11-30 stephen.d // Coupled with Handler mux, we can get a very http-like experience for 9p
15 4b33cdd0 2015-11-30 stephen.d // servers.
16 4b33cdd0 2015-11-30 stephen.d
17 4b33cdd0 2015-11-30 stephen.d // ServeConn the 9p handler over the provided network connection.
18 4b33cdd0 2015-11-30 stephen.d func ServeConn(ctx context.Context, cn net.Conn, handler Handler) error {
19 4b33cdd0 2015-11-30 stephen.d
20 4b33cdd0 2015-11-30 stephen.d // TODO(stevvooe): It would be nice if the handler could declare the
21 4b33cdd0 2015-11-30 stephen.d // supported version. Before we had handler, we used the session to get
22 4b33cdd0 2015-11-30 stephen.d // the version (msize, version := session.Version()). We must decided if
23 4b33cdd0 2015-11-30 stephen.d // we want to proxy version and message size decisions all the back to the
24 4b33cdd0 2015-11-30 stephen.d // origin server or make those decisions at each link of a proxy chain.
25 4b33cdd0 2015-11-30 stephen.d
26 4b33cdd0 2015-11-30 stephen.d ch := newChannel(cn, codec9p{}, DefaultMSize)
27 4b33cdd0 2015-11-30 stephen.d negctx, cancel := context.WithTimeout(ctx, 1*time.Second)
28 4b33cdd0 2015-11-30 stephen.d defer cancel()
29 4b33cdd0 2015-11-30 stephen.d
30 4b33cdd0 2015-11-30 stephen.d // TODO(stevvooe): For now, we negotiate here. It probably makes sense to
31 4b33cdd0 2015-11-30 stephen.d // do this outside of this function and then pass in a ready made channel.
32 4b33cdd0 2015-11-30 stephen.d // We are not really ready to export the channel type yet.
33 4b33cdd0 2015-11-30 stephen.d
34 4b33cdd0 2015-11-30 stephen.d if err := servernegotiate(negctx, ch, DefaultVersion); err != nil {
35 4b33cdd0 2015-11-30 stephen.d // TODO(stevvooe): Need better error handling and retry support here.
36 63c84aa9 2016-02-05 stevvooe return fmt.Errorf("error negotiating version: %s", err)
37 4b33cdd0 2015-11-30 stephen.d }
38 4b33cdd0 2015-11-30 stephen.d
39 4b33cdd0 2015-11-30 stephen.d ctx = withVersion(ctx, DefaultVersion)
40 4b33cdd0 2015-11-30 stephen.d
41 4b33cdd0 2015-11-30 stephen.d c := &conn{
42 4b33cdd0 2015-11-30 stephen.d ctx: ctx,
43 4b33cdd0 2015-11-30 stephen.d ch: ch,
44 4b33cdd0 2015-11-30 stephen.d handler: handler,
45 4b33cdd0 2015-11-30 stephen.d closed: make(chan struct{}),
46 4b33cdd0 2015-11-30 stephen.d }
47 4b33cdd0 2015-11-30 stephen.d
48 4b33cdd0 2015-11-30 stephen.d return c.serve()
49 4b33cdd0 2015-11-30 stephen.d }
50 4b33cdd0 2015-11-30 stephen.d
51 4b33cdd0 2015-11-30 stephen.d // conn plays role of session dispatch for handler in a server.
52 4b33cdd0 2015-11-30 stephen.d type conn struct {
53 4b33cdd0 2015-11-30 stephen.d ctx context.Context
54 4b33cdd0 2015-11-30 stephen.d session Session
55 4b33cdd0 2015-11-30 stephen.d ch Channel
56 4b33cdd0 2015-11-30 stephen.d handler Handler
57 4f41ff66 2016-08-12 noreply
58 4f41ff66 2016-08-12 noreply once sync.Once
59 4f41ff66 2016-08-12 noreply closed chan struct{}
60 4f41ff66 2016-08-12 noreply err error // terminal error for the conn
61 4b33cdd0 2015-11-30 stephen.d }
62 4b33cdd0 2015-11-30 stephen.d
63 4b33cdd0 2015-11-30 stephen.d // activeRequest includes information about the active request.
64 4b33cdd0 2015-11-30 stephen.d type activeRequest struct {
65 4b33cdd0 2015-11-30 stephen.d ctx context.Context
66 4b33cdd0 2015-11-30 stephen.d request *Fcall
67 4b33cdd0 2015-11-30 stephen.d cancel context.CancelFunc
68 4b33cdd0 2015-11-30 stephen.d }
69 4b33cdd0 2015-11-30 stephen.d
70 4b33cdd0 2015-11-30 stephen.d // serve messages on the connection until an error is encountered.
71 4b33cdd0 2015-11-30 stephen.d func (c *conn) serve() error {
72 4b33cdd0 2015-11-30 stephen.d tags := map[Tag]*activeRequest{} // active requests
73 4b33cdd0 2015-11-30 stephen.d
74 6b62df47 2015-12-04 stevvooe requests := make(chan *Fcall) // sync, read-limited
75 6b62df47 2015-12-04 stevvooe responses := make(chan *Fcall) // sync, goroutine consumed
76 6b62df47 2015-12-04 stevvooe completed := make(chan *Fcall) // sync, send in goroutine per request
77 4b33cdd0 2015-11-30 stephen.d
78 4b33cdd0 2015-11-30 stephen.d // read loop
79 4b33cdd0 2015-11-30 stephen.d go c.read(requests)
80 4b33cdd0 2015-11-30 stephen.d go c.write(responses)
81 4b33cdd0 2015-11-30 stephen.d
82 4b33cdd0 2015-11-30 stephen.d log.Println("server.run()")
83 4b33cdd0 2015-11-30 stephen.d for {
84 4b33cdd0 2015-11-30 stephen.d select {
85 4b33cdd0 2015-11-30 stephen.d case req := <-requests:
86 4b33cdd0 2015-11-30 stephen.d if _, ok := tags[req.Tag]; ok {
87 4b33cdd0 2015-11-30 stephen.d select {
88 4b33cdd0 2015-11-30 stephen.d case responses <- newErrorFcall(req.Tag, ErrDuptag):
89 4b33cdd0 2015-11-30 stephen.d // Send to responses, bypass tag management.
90 4b33cdd0 2015-11-30 stephen.d case <-c.ctx.Done():
91 4b33cdd0 2015-11-30 stephen.d return c.ctx.Err()
92 4b33cdd0 2015-11-30 stephen.d case <-c.closed:
93 4b33cdd0 2015-11-30 stephen.d return c.err
94 4b33cdd0 2015-11-30 stephen.d }
95 4b33cdd0 2015-11-30 stephen.d continue
96 4b33cdd0 2015-11-30 stephen.d }
97 4b33cdd0 2015-11-30 stephen.d
98 4b33cdd0 2015-11-30 stephen.d switch msg := req.Message.(type) {
99 4b33cdd0 2015-11-30 stephen.d case MessageTflush:
100 4b33cdd0 2015-11-30 stephen.d log.Println("server: flushing message", msg.Oldtag)
101 4b33cdd0 2015-11-30 stephen.d
102 4b33cdd0 2015-11-30 stephen.d var resp *Fcall
103 4b33cdd0 2015-11-30 stephen.d // check if we have actually know about the requested flush
104 4b33cdd0 2015-11-30 stephen.d active, ok := tags[msg.Oldtag]
105 4b33cdd0 2015-11-30 stephen.d if ok {
106 6b62df47 2015-12-04 stevvooe active.cancel() // propagate cancellation to callees
107 6b62df47 2015-12-04 stevvooe delete(tags, msg.Oldtag)
108 4b33cdd0 2015-11-30 stephen.d resp = newFcall(req.Tag, MessageRflush{})
109 4b33cdd0 2015-11-30 stephen.d } else {
110 4b33cdd0 2015-11-30 stephen.d resp = newErrorFcall(req.Tag, ErrUnknownTag)
111 4b33cdd0 2015-11-30 stephen.d }
112 4b33cdd0 2015-11-30 stephen.d
113 4b33cdd0 2015-11-30 stephen.d select {
114 4b33cdd0 2015-11-30 stephen.d case responses <- resp:
115 4b33cdd0 2015-11-30 stephen.d // bypass tag management in completed.
116 4b33cdd0 2015-11-30 stephen.d case <-c.ctx.Done():
117 4b33cdd0 2015-11-30 stephen.d return c.ctx.Err()
118 4b33cdd0 2015-11-30 stephen.d case <-c.closed:
119 4b33cdd0 2015-11-30 stephen.d return c.err
120 4b33cdd0 2015-11-30 stephen.d }
121 4b33cdd0 2015-11-30 stephen.d default:
122 4b33cdd0 2015-11-30 stephen.d // Allows us to session handlers to cancel processing of the fcall
123 4b33cdd0 2015-11-30 stephen.d // through context.
124 4b33cdd0 2015-11-30 stephen.d ctx, cancel := context.WithCancel(c.ctx)
125 4b33cdd0 2015-11-30 stephen.d
126 4b33cdd0 2015-11-30 stephen.d // The contents of these instances are only writable in the main
127 4b33cdd0 2015-11-30 stephen.d // server loop. The value of tag will not change.
128 4b33cdd0 2015-11-30 stephen.d tags[req.Tag] = &activeRequest{
129 4b33cdd0 2015-11-30 stephen.d ctx: ctx,
130 4b33cdd0 2015-11-30 stephen.d request: req,
131 4b33cdd0 2015-11-30 stephen.d cancel: cancel,
132 4b33cdd0 2015-11-30 stephen.d }
133 4b33cdd0 2015-11-30 stephen.d
134 4b33cdd0 2015-11-30 stephen.d go func(ctx context.Context, req *Fcall) {
135 c74282f8 2016-11-16 noreply // TODO(stevvooe): Re-write incoming Treads so that handler
136 c74282f8 2016-11-16 noreply // can always respond with a message of the correct msize.
137 c74282f8 2016-11-16 noreply
138 4b33cdd0 2015-11-30 stephen.d var resp *Fcall
139 4b33cdd0 2015-11-30 stephen.d msg, err := c.handler.Handle(ctx, req.Message)
140 4b33cdd0 2015-11-30 stephen.d if err != nil {
141 4b33cdd0 2015-11-30 stephen.d // all handler errors are forwarded as protocol errors.
142 4b33cdd0 2015-11-30 stephen.d resp = newErrorFcall(req.Tag, err)
143 4b33cdd0 2015-11-30 stephen.d } else {
144 4b33cdd0 2015-11-30 stephen.d resp = newFcall(req.Tag, msg)
145 4b33cdd0 2015-11-30 stephen.d }
146 4b33cdd0 2015-11-30 stephen.d
147 4b33cdd0 2015-11-30 stephen.d select {
148 4b33cdd0 2015-11-30 stephen.d case completed <- resp:
149 4b33cdd0 2015-11-30 stephen.d case <-ctx.Done():
150 4b33cdd0 2015-11-30 stephen.d return
151 4b33cdd0 2015-11-30 stephen.d case <-c.closed:
152 4b33cdd0 2015-11-30 stephen.d return
153 4b33cdd0 2015-11-30 stephen.d }
154 4b33cdd0 2015-11-30 stephen.d }(ctx, req)
155 4b33cdd0 2015-11-30 stephen.d }
156 4b33cdd0 2015-11-30 stephen.d case resp := <-completed:
157 4b33cdd0 2015-11-30 stephen.d // only responses that flip the tag state traverse this section.
158 4b33cdd0 2015-11-30 stephen.d active, ok := tags[resp.Tag]
159 4b33cdd0 2015-11-30 stephen.d if !ok {
160 6b62df47 2015-12-04 stevvooe // The tag is no longer active. Likely a flushed message.
161 6b62df47 2015-12-04 stevvooe continue
162 4b33cdd0 2015-11-30 stephen.d }
163 4b33cdd0 2015-11-30 stephen.d
164 4b33cdd0 2015-11-30 stephen.d select {
165 4b33cdd0 2015-11-30 stephen.d case responses <- resp:
166 4b33cdd0 2015-11-30 stephen.d case <-active.ctx.Done():
167 4b33cdd0 2015-11-30 stephen.d // the context was canceled for some reason, perhaps timeout or
168 4b33cdd0 2015-11-30 stephen.d // due to a flush call. We treat this as a condition where a
169 4b33cdd0 2015-11-30 stephen.d // response should not be sent.
170 4b33cdd0 2015-11-30 stephen.d log.Println("canceled", resp, active.ctx.Err())
171 4b33cdd0 2015-11-30 stephen.d }
172 4b33cdd0 2015-11-30 stephen.d delete(tags, resp.Tag)
173 4b33cdd0 2015-11-30 stephen.d case <-c.ctx.Done():
174 4b33cdd0 2015-11-30 stephen.d return c.ctx.Err()
175 4b33cdd0 2015-11-30 stephen.d case <-c.closed:
176 4b33cdd0 2015-11-30 stephen.d return c.err
177 4b33cdd0 2015-11-30 stephen.d }
178 4b33cdd0 2015-11-30 stephen.d }
179 4b33cdd0 2015-11-30 stephen.d }
180 4b33cdd0 2015-11-30 stephen.d
181 4b33cdd0 2015-11-30 stephen.d // read takes requests off the channel and sends them on requests.
182 4b33cdd0 2015-11-30 stephen.d func (c *conn) read(requests chan *Fcall) {
183 4b33cdd0 2015-11-30 stephen.d for {
184 4b33cdd0 2015-11-30 stephen.d req := new(Fcall)
185 4b33cdd0 2015-11-30 stephen.d if err := c.ch.ReadFcall(c.ctx, req); err != nil {
186 4b33cdd0 2015-11-30 stephen.d if err, ok := err.(net.Error); ok {
187 4b33cdd0 2015-11-30 stephen.d if err.Timeout() || err.Temporary() {
188 4b33cdd0 2015-11-30 stephen.d // TODO(stevvooe): A full idle timeout on the connection
189 4b33cdd0 2015-11-30 stephen.d // should be enforced here. No logging because it is quite
190 4b33cdd0 2015-11-30 stephen.d // chatty.
191 4b33cdd0 2015-11-30 stephen.d continue
192 4b33cdd0 2015-11-30 stephen.d }
193 4b33cdd0 2015-11-30 stephen.d }
194 4b33cdd0 2015-11-30 stephen.d
195 4b33cdd0 2015-11-30 stephen.d c.CloseWithError(fmt.Errorf("error reading fcall: %v", err))
196 4b33cdd0 2015-11-30 stephen.d return
197 4b33cdd0 2015-11-30 stephen.d }
198 4b33cdd0 2015-11-30 stephen.d
199 4b33cdd0 2015-11-30 stephen.d select {
200 4b33cdd0 2015-11-30 stephen.d case requests <- req:
201 4b33cdd0 2015-11-30 stephen.d case <-c.ctx.Done():
202 4b33cdd0 2015-11-30 stephen.d c.CloseWithError(c.ctx.Err())
203 4b33cdd0 2015-11-30 stephen.d return
204 4b33cdd0 2015-11-30 stephen.d case <-c.closed:
205 4b33cdd0 2015-11-30 stephen.d return
206 4b33cdd0 2015-11-30 stephen.d }
207 4b33cdd0 2015-11-30 stephen.d }
208 4b33cdd0 2015-11-30 stephen.d }
209 4b33cdd0 2015-11-30 stephen.d
210 4b33cdd0 2015-11-30 stephen.d func (c *conn) write(responses chan *Fcall) {
211 4b33cdd0 2015-11-30 stephen.d for {
212 4b33cdd0 2015-11-30 stephen.d select {
213 4b33cdd0 2015-11-30 stephen.d case resp := <-responses:
214 c74282f8 2016-11-16 noreply // TODO(stevvooe): Correctly protect againt overflowing msize from
215 c74282f8 2016-11-16 noreply // handler. This can be done above, in the main message handler
216 c74282f8 2016-11-16 noreply // loop, by adjusting incoming Tread calls to have a Count that
217 c74282f8 2016-11-16 noreply // won't overflow the msize.
218 c74282f8 2016-11-16 noreply
219 4b33cdd0 2015-11-30 stephen.d if err := c.ch.WriteFcall(c.ctx, resp); err != nil {
220 4b33cdd0 2015-11-30 stephen.d if err, ok := err.(net.Error); ok {
221 4b33cdd0 2015-11-30 stephen.d if err.Timeout() || err.Temporary() {
222 4b33cdd0 2015-11-30 stephen.d // TODO(stevvooe): A full idle timeout on the
223 4b33cdd0 2015-11-30 stephen.d // connection should be enforced here. We log here,
224 4b33cdd0 2015-11-30 stephen.d // since this is less common.
225 a0568195 2016-05-23 stevvooe log.Printf("9p server: temporary error writing fcall: %v", err)
226 4b33cdd0 2015-11-30 stephen.d continue
227 4b33cdd0 2015-11-30 stephen.d }
228 4b33cdd0 2015-11-30 stephen.d }
229 4b33cdd0 2015-11-30 stephen.d
230 4b33cdd0 2015-11-30 stephen.d c.CloseWithError(fmt.Errorf("error writing fcall: %v", err))
231 4b33cdd0 2015-11-30 stephen.d return
232 4b33cdd0 2015-11-30 stephen.d }
233 4b33cdd0 2015-11-30 stephen.d case <-c.ctx.Done():
234 4b33cdd0 2015-11-30 stephen.d c.CloseWithError(c.ctx.Err())
235 4b33cdd0 2015-11-30 stephen.d return
236 4b33cdd0 2015-11-30 stephen.d case <-c.closed:
237 4b33cdd0 2015-11-30 stephen.d return
238 4b33cdd0 2015-11-30 stephen.d }
239 4b33cdd0 2015-11-30 stephen.d }
240 4b33cdd0 2015-11-30 stephen.d }
241 4b33cdd0 2015-11-30 stephen.d
242 4b33cdd0 2015-11-30 stephen.d func (c *conn) Close() error {
243 4b33cdd0 2015-11-30 stephen.d return c.CloseWithError(nil)
244 4b33cdd0 2015-11-30 stephen.d }
245 4b33cdd0 2015-11-30 stephen.d
246 4b33cdd0 2015-11-30 stephen.d func (c *conn) CloseWithError(err error) error {
247 4f41ff66 2016-08-12 noreply c.once.Do(func() {
248 4b33cdd0 2015-11-30 stephen.d if err == nil {
249 4f41ff66 2016-08-12 noreply err = ErrClosed
250 4b33cdd0 2015-11-30 stephen.d }
251 4b33cdd0 2015-11-30 stephen.d
252 4f41ff66 2016-08-12 noreply c.err = err
253 4f41ff66 2016-08-12 noreply close(c.closed)
254 4f41ff66 2016-08-12 noreply })
255 4f41ff66 2016-08-12 noreply
256 4f41ff66 2016-08-12 noreply return c.err
257 4b33cdd0 2015-11-30 stephen.d }