Blame


1 1f1a1b98 2015-11-30 stephen.d package p9p
2 499f8c59 2015-10-27 stephen.d
3 96ad2b22 2015-10-28 stephen.d import (
4 1f1a1b98 2015-11-30 stephen.d "fmt"
5 96ad2b22 2015-10-28 stephen.d "log"
6 96ad2b22 2015-10-28 stephen.d "net"
7 40d4a02d 2015-11-03 stephen.d "time"
8 499f8c59 2015-10-27 stephen.d
9 96ad2b22 2015-10-28 stephen.d "golang.org/x/net/context"
10 96ad2b22 2015-10-28 stephen.d )
11 96ad2b22 2015-10-28 stephen.d
12 1f1a1b98 2015-11-30 stephen.d // TODO(stevvooe): Add net/http.Server-like type here to manage connections.
13 1f1a1b98 2015-11-30 stephen.d // Coupled with Handler mux, we can get a very http-like experience for 9p
14 1f1a1b98 2015-11-30 stephen.d // servers.
15 b714e9e0 2015-11-05 stephen.d
16 1f1a1b98 2015-11-30 stephen.d // ServeConn the 9p handler over the provided network connection.
17 1f1a1b98 2015-11-30 stephen.d func ServeConn(ctx context.Context, cn net.Conn, handler Handler) error {
18 1f1a1b98 2015-11-30 stephen.d
19 b714e9e0 2015-11-05 stephen.d // TODO(stevvooe): It would be nice if the handler could declare the
20 b714e9e0 2015-11-05 stephen.d // supported version. Before we had handler, we used the session to get
21 b714e9e0 2015-11-05 stephen.d // the version (msize, version := session.Version()). We must decided if
22 b714e9e0 2015-11-05 stephen.d // we want to proxy version and message size decisions all the back to the
23 b714e9e0 2015-11-05 stephen.d // origin server or make those decisions at each link of a proxy chain.
24 b714e9e0 2015-11-05 stephen.d
25 1f1a1b98 2015-11-30 stephen.d ch := newChannel(cn, codec9p{}, DefaultMSize)
26 40d4a02d 2015-11-03 stephen.d negctx, cancel := context.WithTimeout(ctx, 1*time.Second)
27 40d4a02d 2015-11-03 stephen.d defer cancel()
28 40d4a02d 2015-11-03 stephen.d
29 40d4a02d 2015-11-03 stephen.d // TODO(stevvooe): For now, we negotiate here. It probably makes sense to
30 40d4a02d 2015-11-03 stephen.d // do this outside of this function and then pass in a ready made channel.
31 40d4a02d 2015-11-03 stephen.d // We are not really ready to export the channel type yet.
32 40d4a02d 2015-11-03 stephen.d
33 b714e9e0 2015-11-05 stephen.d if err := servernegotiate(negctx, ch, DefaultVersion); err != nil {
34 40d4a02d 2015-11-03 stephen.d // TODO(stevvooe): Need better error handling and retry support here.
35 1f1a1b98 2015-11-30 stephen.d return fmt.Errorf("error negotiating version:", err)
36 40d4a02d 2015-11-03 stephen.d }
37 40d4a02d 2015-11-03 stephen.d
38 b714e9e0 2015-11-05 stephen.d ctx = withVersion(ctx, DefaultVersion)
39 dce371f6 2015-11-05 stephen.d
40 1f1a1b98 2015-11-30 stephen.d c := &conn{
41 e6bcde66 2015-10-29 stephen.d ctx: ctx,
42 40d4a02d 2015-11-03 stephen.d ch: ch,
43 b714e9e0 2015-11-05 stephen.d handler: handler,
44 40d4a02d 2015-11-03 stephen.d closed: make(chan struct{}),
45 e6bcde66 2015-10-29 stephen.d }
46 e6bcde66 2015-10-29 stephen.d
47 1f1a1b98 2015-11-30 stephen.d return c.serve()
48 499f8c59 2015-10-27 stephen.d }
49 96ad2b22 2015-10-28 stephen.d
50 1f1a1b98 2015-11-30 stephen.d // conn plays role of session dispatch for handler in a server.
51 1f1a1b98 2015-11-30 stephen.d type conn struct {
52 96ad2b22 2015-10-28 stephen.d ctx context.Context
53 96ad2b22 2015-10-28 stephen.d session Session
54 3bf22e58 2015-11-04 stephen.d ch Channel
55 b714e9e0 2015-11-05 stephen.d handler Handler
56 e6bcde66 2015-10-29 stephen.d closed chan struct{}
57 1f1a1b98 2015-11-30 stephen.d err error // terminal error for the conn
58 96ad2b22 2015-10-28 stephen.d }
59 96ad2b22 2015-10-28 stephen.d
60 de4c554d 2015-11-11 stephen.d // activeRequest includes information about the active request.
61 de4c554d 2015-11-11 stephen.d type activeRequest struct {
62 de4c554d 2015-11-11 stephen.d ctx context.Context
63 de4c554d 2015-11-11 stephen.d request *Fcall
64 de4c554d 2015-11-11 stephen.d cancel context.CancelFunc
65 40d4a02d 2015-11-03 stephen.d }
66 40d4a02d 2015-11-03 stephen.d
67 1f1a1b98 2015-11-30 stephen.d // serve messages on the connection until an error is encountered.
68 1f1a1b98 2015-11-30 stephen.d func (c *conn) serve() error {
69 de4c554d 2015-11-11 stephen.d tags := map[Tag]*activeRequest{} // active requests
70 96ad2b22 2015-10-28 stephen.d
71 de4c554d 2015-11-11 stephen.d requests := make(chan *Fcall) // sync, read-limited
72 de4c554d 2015-11-11 stephen.d responses := make(chan *Fcall)
73 de4c554d 2015-11-11 stephen.d completed := make(chan *Fcall, 1)
74 e6bcde66 2015-10-29 stephen.d
75 de4c554d 2015-11-11 stephen.d // read loop
76 1f1a1b98 2015-11-30 stephen.d go c.read(requests)
77 1f1a1b98 2015-11-30 stephen.d go c.write(responses)
78 de4c554d 2015-11-11 stephen.d
79 de4c554d 2015-11-11 stephen.d log.Println("server.run()")
80 de4c554d 2015-11-11 stephen.d for {
81 de4c554d 2015-11-11 stephen.d select {
82 de4c554d 2015-11-11 stephen.d case req := <-requests:
83 de4c554d 2015-11-11 stephen.d if _, ok := tags[req.Tag]; ok {
84 de4c554d 2015-11-11 stephen.d select {
85 de4c554d 2015-11-11 stephen.d case responses <- newErrorFcall(req.Tag, ErrDuptag):
86 de4c554d 2015-11-11 stephen.d // Send to responses, bypass tag management.
87 1f1a1b98 2015-11-30 stephen.d case <-c.ctx.Done():
88 1f1a1b98 2015-11-30 stephen.d return c.ctx.Err()
89 1f1a1b98 2015-11-30 stephen.d case <-c.closed:
90 1f1a1b98 2015-11-30 stephen.d return c.err
91 de4c554d 2015-11-11 stephen.d }
92 de4c554d 2015-11-11 stephen.d continue
93 de4c554d 2015-11-11 stephen.d }
94 e6bcde66 2015-10-29 stephen.d
95 de4c554d 2015-11-11 stephen.d switch msg := req.Message.(type) {
96 de4c554d 2015-11-11 stephen.d case MessageTflush:
97 de4c554d 2015-11-11 stephen.d log.Println("server: flushing message", msg.Oldtag)
98 e6bcde66 2015-10-29 stephen.d
99 de4c554d 2015-11-11 stephen.d var resp *Fcall
100 de4c554d 2015-11-11 stephen.d // check if we have actually know about the requested flush
101 de4c554d 2015-11-11 stephen.d active, ok := tags[msg.Oldtag]
102 de4c554d 2015-11-11 stephen.d if ok {
103 de4c554d 2015-11-11 stephen.d active.cancel() // cancel the context of oldtag
104 de4c554d 2015-11-11 stephen.d resp = newFcall(req.Tag, MessageRflush{})
105 de4c554d 2015-11-11 stephen.d } else {
106 de4c554d 2015-11-11 stephen.d resp = newErrorFcall(req.Tag, ErrUnknownTag)
107 de4c554d 2015-11-11 stephen.d }
108 e6bcde66 2015-10-29 stephen.d
109 de4c554d 2015-11-11 stephen.d select {
110 de4c554d 2015-11-11 stephen.d case responses <- resp:
111 de4c554d 2015-11-11 stephen.d // bypass tag management in completed.
112 1f1a1b98 2015-11-30 stephen.d case <-c.ctx.Done():
113 1f1a1b98 2015-11-30 stephen.d return c.ctx.Err()
114 1f1a1b98 2015-11-30 stephen.d case <-c.closed:
115 1f1a1b98 2015-11-30 stephen.d return c.err
116 de4c554d 2015-11-11 stephen.d }
117 de4c554d 2015-11-11 stephen.d default:
118 de4c554d 2015-11-11 stephen.d // Allows us to session handlers to cancel processing of the fcall
119 de4c554d 2015-11-11 stephen.d // through context.
120 1f1a1b98 2015-11-30 stephen.d ctx, cancel := context.WithCancel(c.ctx)
121 97423e8b 2015-10-29 stephen.d
122 de4c554d 2015-11-11 stephen.d // The contents of these instances are only writable in the main
123 de4c554d 2015-11-11 stephen.d // server loop. The value of tag will not change.
124 de4c554d 2015-11-11 stephen.d tags[req.Tag] = &activeRequest{
125 de4c554d 2015-11-11 stephen.d ctx: ctx,
126 de4c554d 2015-11-11 stephen.d request: req,
127 de4c554d 2015-11-11 stephen.d cancel: cancel,
128 de4c554d 2015-11-11 stephen.d }
129 97423e8b 2015-10-29 stephen.d
130 de4c554d 2015-11-11 stephen.d go func(ctx context.Context, req *Fcall) {
131 de4c554d 2015-11-11 stephen.d var resp *Fcall
132 1f1a1b98 2015-11-30 stephen.d msg, err := c.handler.Handle(ctx, req.Message)
133 de4c554d 2015-11-11 stephen.d if err != nil {
134 de4c554d 2015-11-11 stephen.d // all handler errors are forwarded as protocol errors.
135 de4c554d 2015-11-11 stephen.d resp = newErrorFcall(req.Tag, err)
136 de4c554d 2015-11-11 stephen.d } else {
137 de4c554d 2015-11-11 stephen.d resp = newFcall(req.Tag, msg)
138 de4c554d 2015-11-11 stephen.d }
139 97423e8b 2015-10-29 stephen.d
140 de4c554d 2015-11-11 stephen.d select {
141 de4c554d 2015-11-11 stephen.d case completed <- resp:
142 de4c554d 2015-11-11 stephen.d case <-ctx.Done():
143 de4c554d 2015-11-11 stephen.d return
144 1f1a1b98 2015-11-30 stephen.d case <-c.closed:
145 de4c554d 2015-11-11 stephen.d return
146 de4c554d 2015-11-11 stephen.d }
147 de4c554d 2015-11-11 stephen.d }(ctx, req)
148 40d4a02d 2015-11-03 stephen.d }
149 de4c554d 2015-11-11 stephen.d case resp := <-completed:
150 de4c554d 2015-11-11 stephen.d // only responses that flip the tag state traverse this section.
151 de4c554d 2015-11-11 stephen.d active, ok := tags[resp.Tag]
152 de4c554d 2015-11-11 stephen.d if !ok {
153 de4c554d 2015-11-11 stephen.d panic("BUG: unbalanced tag")
154 de4c554d 2015-11-11 stephen.d }
155 97423e8b 2015-10-29 stephen.d
156 de4c554d 2015-11-11 stephen.d select {
157 de4c554d 2015-11-11 stephen.d case responses <- resp:
158 de4c554d 2015-11-11 stephen.d case <-active.ctx.Done():
159 de4c554d 2015-11-11 stephen.d // the context was canceled for some reason, perhaps timeout or
160 de4c554d 2015-11-11 stephen.d // due to a flush call. We treat this as a condition where a
161 de4c554d 2015-11-11 stephen.d // response should not be sent.
162 de4c554d 2015-11-11 stephen.d log.Println("canceled", resp, active.ctx.Err())
163 de4c554d 2015-11-11 stephen.d }
164 de4c554d 2015-11-11 stephen.d delete(tags, resp.Tag)
165 1f1a1b98 2015-11-30 stephen.d case <-c.ctx.Done():
166 1f1a1b98 2015-11-30 stephen.d return c.ctx.Err()
167 1f1a1b98 2015-11-30 stephen.d case <-c.closed:
168 1f1a1b98 2015-11-30 stephen.d return c.err
169 1f1a1b98 2015-11-30 stephen.d }
170 1f1a1b98 2015-11-30 stephen.d }
171 1f1a1b98 2015-11-30 stephen.d }
172 1f1a1b98 2015-11-30 stephen.d
173 1f1a1b98 2015-11-30 stephen.d // read takes requests off the channel and sends them on requests.
174 1f1a1b98 2015-11-30 stephen.d func (c *conn) read(requests chan *Fcall) {
175 1f1a1b98 2015-11-30 stephen.d for {
176 1f1a1b98 2015-11-30 stephen.d req := new(Fcall)
177 1f1a1b98 2015-11-30 stephen.d if err := c.ch.ReadFcall(c.ctx, req); err != nil {
178 1f1a1b98 2015-11-30 stephen.d if err, ok := err.(net.Error); ok {
179 1f1a1b98 2015-11-30 stephen.d if err.Timeout() || err.Temporary() {
180 1f1a1b98 2015-11-30 stephen.d // TODO(stevvooe): A full idle timeout on the connection
181 1f1a1b98 2015-11-30 stephen.d // should be enforced here. No logging because it is quite
182 1f1a1b98 2015-11-30 stephen.d // chatty.
183 1f1a1b98 2015-11-30 stephen.d continue
184 1f1a1b98 2015-11-30 stephen.d }
185 1f1a1b98 2015-11-30 stephen.d }
186 1f1a1b98 2015-11-30 stephen.d
187 1f1a1b98 2015-11-30 stephen.d c.CloseWithError(fmt.Errorf("error reading fcall: %v", err))
188 de4c554d 2015-11-11 stephen.d return
189 1f1a1b98 2015-11-30 stephen.d }
190 1f1a1b98 2015-11-30 stephen.d
191 1f1a1b98 2015-11-30 stephen.d select {
192 1f1a1b98 2015-11-30 stephen.d case requests <- req:
193 1f1a1b98 2015-11-30 stephen.d case <-c.ctx.Done():
194 1f1a1b98 2015-11-30 stephen.d c.CloseWithError(c.ctx.Err())
195 de4c554d 2015-11-11 stephen.d return
196 1f1a1b98 2015-11-30 stephen.d case <-c.closed:
197 1f1a1b98 2015-11-30 stephen.d return
198 de4c554d 2015-11-11 stephen.d }
199 96ad2b22 2015-10-28 stephen.d }
200 96ad2b22 2015-10-28 stephen.d }
201 1f1a1b98 2015-11-30 stephen.d
202 1f1a1b98 2015-11-30 stephen.d func (c *conn) write(responses chan *Fcall) {
203 1f1a1b98 2015-11-30 stephen.d for {
204 1f1a1b98 2015-11-30 stephen.d select {
205 1f1a1b98 2015-11-30 stephen.d case resp := <-responses:
206 1f1a1b98 2015-11-30 stephen.d if err := c.ch.WriteFcall(c.ctx, resp); err != nil {
207 1f1a1b98 2015-11-30 stephen.d if err, ok := err.(net.Error); ok {
208 1f1a1b98 2015-11-30 stephen.d if err.Timeout() || err.Temporary() {
209 1f1a1b98 2015-11-30 stephen.d // TODO(stevvooe): A full idle timeout on the
210 1f1a1b98 2015-11-30 stephen.d // connection should be enforced here. We log here,
211 1f1a1b98 2015-11-30 stephen.d // since this is less common.
212 1f1a1b98 2015-11-30 stephen.d log.Println("9p server: temporary error writing fcall: %v", err)
213 1f1a1b98 2015-11-30 stephen.d continue
214 1f1a1b98 2015-11-30 stephen.d }
215 1f1a1b98 2015-11-30 stephen.d }
216 1f1a1b98 2015-11-30 stephen.d
217 1f1a1b98 2015-11-30 stephen.d c.CloseWithError(fmt.Errorf("error writing fcall: %v", err))
218 1f1a1b98 2015-11-30 stephen.d return
219 1f1a1b98 2015-11-30 stephen.d }
220 1f1a1b98 2015-11-30 stephen.d case <-c.ctx.Done():
221 1f1a1b98 2015-11-30 stephen.d c.CloseWithError(c.ctx.Err())
222 1f1a1b98 2015-11-30 stephen.d return
223 1f1a1b98 2015-11-30 stephen.d case <-c.closed:
224 1f1a1b98 2015-11-30 stephen.d return
225 1f1a1b98 2015-11-30 stephen.d }
226 1f1a1b98 2015-11-30 stephen.d }
227 1f1a1b98 2015-11-30 stephen.d }
228 1f1a1b98 2015-11-30 stephen.d
229 1f1a1b98 2015-11-30 stephen.d func (c *conn) Close() error {
230 1f1a1b98 2015-11-30 stephen.d return c.CloseWithError(nil)
231 1f1a1b98 2015-11-30 stephen.d }
232 1f1a1b98 2015-11-30 stephen.d
233 1f1a1b98 2015-11-30 stephen.d func (c *conn) CloseWithError(err error) error {
234 1f1a1b98 2015-11-30 stephen.d select {
235 1f1a1b98 2015-11-30 stephen.d case <-c.closed:
236 1f1a1b98 2015-11-30 stephen.d return c.err
237 1f1a1b98 2015-11-30 stephen.d default:
238 1f1a1b98 2015-11-30 stephen.d close(c.closed)
239 1f1a1b98 2015-11-30 stephen.d if err == nil {
240 1f1a1b98 2015-11-30 stephen.d c.err = err
241 1f1a1b98 2015-11-30 stephen.d } else {
242 1f1a1b98 2015-11-30 stephen.d c.err = ErrClosed
243 1f1a1b98 2015-11-30 stephen.d }
244 1f1a1b98 2015-11-30 stephen.d
245 1f1a1b98 2015-11-30 stephen.d return c.err
246 1f1a1b98 2015-11-30 stephen.d }
247 1f1a1b98 2015-11-30 stephen.d }