Blame


1 e342de7d 2015-11-11 adrien package p9p
2 e342de7d 2015-11-11 adrien
3 e342de7d 2015-11-11 adrien import (
4 641595f8 2015-11-14 stephen.d "fmt"
5 e342de7d 2015-11-11 adrien "log"
6 e342de7d 2015-11-11 adrien "net"
7 e342de7d 2015-11-11 adrien "time"
8 e342de7d 2015-11-11 adrien
9 e342de7d 2015-11-11 adrien "golang.org/x/net/context"
10 e342de7d 2015-11-11 adrien )
11 e342de7d 2015-11-11 adrien
12 641595f8 2015-11-14 stephen.d // TODO(stevvooe): Add net/http.Server-like type here to manage connections.
13 641595f8 2015-11-14 stephen.d // Coupled with Handler mux, we can get a very http-like experience for 9p
14 641595f8 2015-11-14 stephen.d // servers.
15 e342de7d 2015-11-11 adrien
16 641595f8 2015-11-14 stephen.d // ServeConn the 9p handler over the provided network connection.
17 641595f8 2015-11-14 stephen.d func ServeConn(ctx context.Context, cn net.Conn, handler Handler) error {
18 641595f8 2015-11-14 stephen.d
19 e342de7d 2015-11-11 adrien // TODO(stevvooe): It would be nice if the handler could declare the
20 e342de7d 2015-11-11 adrien // supported version. Before we had handler, we used the session to get
21 e342de7d 2015-11-11 adrien // the version (msize, version := session.Version()). We must decided if
22 e342de7d 2015-11-11 adrien // we want to proxy version and message size decisions all the back to the
23 e342de7d 2015-11-11 adrien // origin server or make those decisions at each link of a proxy chain.
24 e342de7d 2015-11-11 adrien
25 641595f8 2015-11-14 stephen.d ch := newChannel(cn, codec9p{}, DefaultMSize)
26 e342de7d 2015-11-11 adrien negctx, cancel := context.WithTimeout(ctx, 1*time.Second)
27 e342de7d 2015-11-11 adrien defer cancel()
28 e342de7d 2015-11-11 adrien
29 e342de7d 2015-11-11 adrien // TODO(stevvooe): For now, we negotiate here. It probably makes sense to
30 e342de7d 2015-11-11 adrien // do this outside of this function and then pass in a ready made channel.
31 e342de7d 2015-11-11 adrien // We are not really ready to export the channel type yet.
32 e342de7d 2015-11-11 adrien
33 e342de7d 2015-11-11 adrien if err := servernegotiate(negctx, ch, DefaultVersion); err != nil {
34 e342de7d 2015-11-11 adrien // TODO(stevvooe): Need better error handling and retry support here.
35 641595f8 2015-11-14 stephen.d return fmt.Errorf("error negotiating version:", err)
36 e342de7d 2015-11-11 adrien }
37 e342de7d 2015-11-11 adrien
38 e342de7d 2015-11-11 adrien ctx = withVersion(ctx, DefaultVersion)
39 e342de7d 2015-11-11 adrien
40 641595f8 2015-11-14 stephen.d c := &conn{
41 e342de7d 2015-11-11 adrien ctx: ctx,
42 e342de7d 2015-11-11 adrien ch: ch,
43 e342de7d 2015-11-11 adrien handler: handler,
44 e342de7d 2015-11-11 adrien closed: make(chan struct{}),
45 e342de7d 2015-11-11 adrien }
46 e342de7d 2015-11-11 adrien
47 641595f8 2015-11-14 stephen.d return c.serve()
48 e342de7d 2015-11-11 adrien }
49 e342de7d 2015-11-11 adrien
50 641595f8 2015-11-14 stephen.d // conn plays role of session dispatch for handler in a server.
51 641595f8 2015-11-14 stephen.d type conn struct {
52 e342de7d 2015-11-11 adrien ctx context.Context
53 e342de7d 2015-11-11 adrien session Session
54 e342de7d 2015-11-11 adrien ch Channel
55 e342de7d 2015-11-11 adrien handler Handler
56 e342de7d 2015-11-11 adrien closed chan struct{}
57 641595f8 2015-11-14 stephen.d err error // terminal error for the conn
58 e342de7d 2015-11-11 adrien }
59 e342de7d 2015-11-11 adrien
60 ceb907e4 2015-11-11 adrien // activeRequest includes information about the active request.
61 ceb907e4 2015-11-11 adrien type activeRequest struct {
62 ceb907e4 2015-11-11 adrien ctx context.Context
63 ceb907e4 2015-11-11 adrien request *Fcall
64 ceb907e4 2015-11-11 adrien cancel context.CancelFunc
65 e342de7d 2015-11-11 adrien }
66 e342de7d 2015-11-11 adrien
67 641595f8 2015-11-14 stephen.d // serve messages on the connection until an error is encountered.
68 641595f8 2015-11-14 stephen.d func (c *conn) serve() error {
69 ceb907e4 2015-11-11 adrien tags := map[Tag]*activeRequest{} // active requests
70 e342de7d 2015-11-11 adrien
71 ceb907e4 2015-11-11 adrien requests := make(chan *Fcall) // sync, read-limited
72 ceb907e4 2015-11-11 adrien responses := make(chan *Fcall)
73 ceb907e4 2015-11-11 adrien completed := make(chan *Fcall, 1)
74 e342de7d 2015-11-11 adrien
75 ceb907e4 2015-11-11 adrien // read loop
76 641595f8 2015-11-14 stephen.d go c.read(requests)
77 641595f8 2015-11-14 stephen.d go c.write(responses)
78 ceb907e4 2015-11-11 adrien
79 ceb907e4 2015-11-11 adrien log.Println("server.run()")
80 ceb907e4 2015-11-11 adrien for {
81 ceb907e4 2015-11-11 adrien log.Println("server:", "wait")
82 ceb907e4 2015-11-11 adrien select {
83 ceb907e4 2015-11-11 adrien case req := <-requests:
84 ceb907e4 2015-11-11 adrien log.Println("request", req)
85 ceb907e4 2015-11-11 adrien if _, ok := tags[req.Tag]; ok {
86 ceb907e4 2015-11-11 adrien select {
87 ceb907e4 2015-11-11 adrien case responses <- newErrorFcall(req.Tag, ErrDuptag):
88 ceb907e4 2015-11-11 adrien // Send to responses, bypass tag management.
89 641595f8 2015-11-14 stephen.d case <-c.ctx.Done():
90 641595f8 2015-11-14 stephen.d return c.ctx.Err()
91 641595f8 2015-11-14 stephen.d case <-c.closed:
92 641595f8 2015-11-14 stephen.d return c.err
93 ceb907e4 2015-11-11 adrien }
94 ceb907e4 2015-11-11 adrien continue
95 ceb907e4 2015-11-11 adrien }
96 e342de7d 2015-11-11 adrien
97 ceb907e4 2015-11-11 adrien switch msg := req.Message.(type) {
98 ceb907e4 2015-11-11 adrien case MessageTflush:
99 ceb907e4 2015-11-11 adrien log.Println("server: flushing message", msg.Oldtag)
100 e342de7d 2015-11-11 adrien
101 ceb907e4 2015-11-11 adrien var resp *Fcall
102 ceb907e4 2015-11-11 adrien // check if we have actually know about the requested flush
103 ceb907e4 2015-11-11 adrien active, ok := tags[msg.Oldtag]
104 ceb907e4 2015-11-11 adrien if ok {
105 ceb907e4 2015-11-11 adrien active.cancel() // cancel the context of oldtag
106 ceb907e4 2015-11-11 adrien resp = newFcall(req.Tag, MessageRflush{})
107 ceb907e4 2015-11-11 adrien } else {
108 ceb907e4 2015-11-11 adrien resp = newErrorFcall(req.Tag, ErrUnknownTag)
109 ceb907e4 2015-11-11 adrien }
110 e342de7d 2015-11-11 adrien
111 ceb907e4 2015-11-11 adrien select {
112 ceb907e4 2015-11-11 adrien case responses <- resp:
113 ceb907e4 2015-11-11 adrien // bypass tag management in completed.
114 641595f8 2015-11-14 stephen.d case <-c.ctx.Done():
115 641595f8 2015-11-14 stephen.d return c.ctx.Err()
116 641595f8 2015-11-14 stephen.d case <-c.closed:
117 641595f8 2015-11-14 stephen.d return c.err
118 ceb907e4 2015-11-11 adrien }
119 ceb907e4 2015-11-11 adrien default:
120 ceb907e4 2015-11-11 adrien // Allows us to session handlers to cancel processing of the fcall
121 ceb907e4 2015-11-11 adrien // through context.
122 641595f8 2015-11-14 stephen.d ctx, cancel := context.WithCancel(c.ctx)
123 e342de7d 2015-11-11 adrien
124 ceb907e4 2015-11-11 adrien // The contents of these instances are only writable in the main
125 ceb907e4 2015-11-11 adrien // server loop. The value of tag will not change.
126 ceb907e4 2015-11-11 adrien tags[req.Tag] = &activeRequest{
127 ceb907e4 2015-11-11 adrien ctx: ctx,
128 ceb907e4 2015-11-11 adrien request: req,
129 ceb907e4 2015-11-11 adrien cancel: cancel,
130 ceb907e4 2015-11-11 adrien }
131 e342de7d 2015-11-11 adrien
132 ceb907e4 2015-11-11 adrien go func(ctx context.Context, req *Fcall) {
133 ceb907e4 2015-11-11 adrien var resp *Fcall
134 641595f8 2015-11-14 stephen.d msg, err := c.handler.Handle(ctx, req.Message)
135 ceb907e4 2015-11-11 adrien if err != nil {
136 ceb907e4 2015-11-11 adrien // all handler errors are forwarded as protocol errors.
137 ceb907e4 2015-11-11 adrien resp = newErrorFcall(req.Tag, err)
138 ceb907e4 2015-11-11 adrien } else {
139 ceb907e4 2015-11-11 adrien resp = newFcall(req.Tag, msg)
140 ceb907e4 2015-11-11 adrien }
141 e342de7d 2015-11-11 adrien
142 ceb907e4 2015-11-11 adrien select {
143 ceb907e4 2015-11-11 adrien case completed <- resp:
144 ceb907e4 2015-11-11 adrien case <-ctx.Done():
145 ceb907e4 2015-11-11 adrien return
146 641595f8 2015-11-14 stephen.d case <-c.closed:
147 ceb907e4 2015-11-11 adrien return
148 ceb907e4 2015-11-11 adrien }
149 ceb907e4 2015-11-11 adrien }(ctx, req)
150 e342de7d 2015-11-11 adrien }
151 ceb907e4 2015-11-11 adrien case resp := <-completed:
152 ceb907e4 2015-11-11 adrien log.Println("completed", resp)
153 ceb907e4 2015-11-11 adrien // only responses that flip the tag state traverse this section.
154 ceb907e4 2015-11-11 adrien active, ok := tags[resp.Tag]
155 ceb907e4 2015-11-11 adrien if !ok {
156 ceb907e4 2015-11-11 adrien panic("BUG: unbalanced tag")
157 ceb907e4 2015-11-11 adrien }
158 e342de7d 2015-11-11 adrien
159 ceb907e4 2015-11-11 adrien select {
160 ceb907e4 2015-11-11 adrien case responses <- resp:
161 ceb907e4 2015-11-11 adrien case <-active.ctx.Done():
162 ceb907e4 2015-11-11 adrien // the context was canceled for some reason, perhaps timeout or
163 ceb907e4 2015-11-11 adrien // due to a flush call. We treat this as a condition where a
164 ceb907e4 2015-11-11 adrien // response should not be sent.
165 ceb907e4 2015-11-11 adrien log.Println("canceled", resp, active.ctx.Err())
166 ceb907e4 2015-11-11 adrien }
167 ceb907e4 2015-11-11 adrien delete(tags, resp.Tag)
168 641595f8 2015-11-14 stephen.d case <-c.ctx.Done():
169 641595f8 2015-11-14 stephen.d return c.ctx.Err()
170 641595f8 2015-11-14 stephen.d case <-c.closed:
171 641595f8 2015-11-14 stephen.d return c.err
172 641595f8 2015-11-14 stephen.d }
173 641595f8 2015-11-14 stephen.d }
174 641595f8 2015-11-14 stephen.d }
175 641595f8 2015-11-14 stephen.d
176 641595f8 2015-11-14 stephen.d // read takes requests off the channel and sends them on requests.
177 641595f8 2015-11-14 stephen.d func (c *conn) read(requests chan *Fcall) {
178 641595f8 2015-11-14 stephen.d for {
179 641595f8 2015-11-14 stephen.d req := new(Fcall)
180 641595f8 2015-11-14 stephen.d if err := c.ch.ReadFcall(c.ctx, req); err != nil {
181 641595f8 2015-11-14 stephen.d if err, ok := err.(net.Error); ok {
182 641595f8 2015-11-14 stephen.d if err.Timeout() || err.Temporary() {
183 641595f8 2015-11-14 stephen.d // TODO(stevvooe): A full idle timeout on the connection
184 641595f8 2015-11-14 stephen.d // should be enforced here. No logging because it is quite
185 641595f8 2015-11-14 stephen.d // chatty.
186 641595f8 2015-11-14 stephen.d continue
187 641595f8 2015-11-14 stephen.d }
188 641595f8 2015-11-14 stephen.d }
189 641595f8 2015-11-14 stephen.d
190 641595f8 2015-11-14 stephen.d c.CloseWithError(fmt.Errorf("error reading fcall: %v", err))
191 ceb907e4 2015-11-11 adrien return
192 641595f8 2015-11-14 stephen.d }
193 641595f8 2015-11-14 stephen.d
194 641595f8 2015-11-14 stephen.d select {
195 641595f8 2015-11-14 stephen.d case requests <- req:
196 641595f8 2015-11-14 stephen.d case <-c.ctx.Done():
197 641595f8 2015-11-14 stephen.d c.CloseWithError(c.ctx.Err())
198 ceb907e4 2015-11-11 adrien return
199 641595f8 2015-11-14 stephen.d case <-c.closed:
200 641595f8 2015-11-14 stephen.d return
201 ceb907e4 2015-11-11 adrien }
202 e342de7d 2015-11-11 adrien }
203 e342de7d 2015-11-11 adrien }
204 641595f8 2015-11-14 stephen.d
205 641595f8 2015-11-14 stephen.d func (c *conn) write(responses chan *Fcall) {
206 641595f8 2015-11-14 stephen.d for {
207 641595f8 2015-11-14 stephen.d select {
208 641595f8 2015-11-14 stephen.d case resp := <-responses:
209 641595f8 2015-11-14 stephen.d if err := c.ch.WriteFcall(c.ctx, resp); err != nil {
210 641595f8 2015-11-14 stephen.d if err, ok := err.(net.Error); ok {
211 641595f8 2015-11-14 stephen.d if err.Timeout() || err.Temporary() {
212 641595f8 2015-11-14 stephen.d // TODO(stevvooe): A full idle timeout on the
213 641595f8 2015-11-14 stephen.d // connection should be enforced here. We log here,
214 641595f8 2015-11-14 stephen.d // since this is less common.
215 641595f8 2015-11-14 stephen.d log.Println("9p server: temporary error writing fcall: %v", err)
216 641595f8 2015-11-14 stephen.d continue
217 641595f8 2015-11-14 stephen.d }
218 641595f8 2015-11-14 stephen.d }
219 641595f8 2015-11-14 stephen.d
220 641595f8 2015-11-14 stephen.d c.CloseWithError(fmt.Errorf("error writing fcall: %v", err))
221 641595f8 2015-11-14 stephen.d return
222 641595f8 2015-11-14 stephen.d }
223 641595f8 2015-11-14 stephen.d case <-c.ctx.Done():
224 641595f8 2015-11-14 stephen.d c.CloseWithError(c.ctx.Err())
225 641595f8 2015-11-14 stephen.d return
226 641595f8 2015-11-14 stephen.d case <-c.closed:
227 641595f8 2015-11-14 stephen.d return
228 641595f8 2015-11-14 stephen.d }
229 641595f8 2015-11-14 stephen.d }
230 641595f8 2015-11-14 stephen.d }
231 641595f8 2015-11-14 stephen.d
232 641595f8 2015-11-14 stephen.d func (c *conn) Close() error {
233 641595f8 2015-11-14 stephen.d return c.CloseWithError(nil)
234 641595f8 2015-11-14 stephen.d }
235 641595f8 2015-11-14 stephen.d
236 641595f8 2015-11-14 stephen.d func (c *conn) CloseWithError(err error) error {
237 641595f8 2015-11-14 stephen.d select {
238 641595f8 2015-11-14 stephen.d case <-c.closed:
239 641595f8 2015-11-14 stephen.d return c.err
240 641595f8 2015-11-14 stephen.d default:
241 641595f8 2015-11-14 stephen.d close(c.closed)
242 641595f8 2015-11-14 stephen.d if err == nil {
243 641595f8 2015-11-14 stephen.d c.err = err
244 641595f8 2015-11-14 stephen.d } else {
245 641595f8 2015-11-14 stephen.d c.err = ErrClosed
246 641595f8 2015-11-14 stephen.d }
247 641595f8 2015-11-14 stephen.d
248 641595f8 2015-11-14 stephen.d return c.err
249 641595f8 2015-11-14 stephen.d }
250 641595f8 2015-11-14 stephen.d }