Blob


1 package p9p
3 import (
4 "fmt"
5 "log"
6 "net"
7 "time"
9 "golang.org/x/net/context"
10 )
12 // TODO(stevvooe): Add net/http.Server-like type here to manage connections.
13 // Coupled with Handler mux, we can get a very http-like experience for 9p
14 // servers.
16 // ServeConn the 9p handler over the provided network connection.
17 func ServeConn(ctx context.Context, cn net.Conn, handler Handler) error {
19 // TODO(stevvooe): It would be nice if the handler could declare the
20 // supported version. Before we had handler, we used the session to get
21 // the version (msize, version := session.Version()). We must decided if
22 // we want to proxy version and message size decisions all the back to the
23 // origin server or make those decisions at each link of a proxy chain.
25 ch := newChannel(cn, codec9p{}, DefaultMSize)
26 negctx, cancel := context.WithTimeout(ctx, 1*time.Second)
27 defer cancel()
29 // TODO(stevvooe): For now, we negotiate here. It probably makes sense to
30 // do this outside of this function and then pass in a ready made channel.
31 // We are not really ready to export the channel type yet.
33 if err := servernegotiate(negctx, ch, DefaultVersion); err != nil {
34 // TODO(stevvooe): Need better error handling and retry support here.
35 return fmt.Errorf("error negotiating version:", err)
36 }
38 ctx = withVersion(ctx, DefaultVersion)
40 c := &conn{
41 ctx: ctx,
42 ch: ch,
43 handler: handler,
44 closed: make(chan struct{}),
45 }
47 return c.serve()
48 }
50 // conn plays role of session dispatch for handler in a server.
51 type conn struct {
52 ctx context.Context
53 session Session
54 ch Channel
55 handler Handler
56 closed chan struct{}
57 err error // terminal error for the conn
58 }
60 // activeRequest includes information about the active request.
61 type activeRequest struct {
62 ctx context.Context
63 request *Fcall
64 cancel context.CancelFunc
65 }
67 // serve messages on the connection until an error is encountered.
68 func (c *conn) serve() error {
69 tags := map[Tag]*activeRequest{} // active requests
71 requests := make(chan *Fcall) // sync, read-limited
72 responses := make(chan *Fcall) // sync, goroutine consumed
73 completed := make(chan *Fcall) // sync, send in goroutine per request
75 // read loop
76 go c.read(requests)
77 go c.write(responses)
79 log.Println("server.run()")
80 for {
81 select {
82 case req := <-requests:
83 if _, ok := tags[req.Tag]; ok {
84 select {
85 case responses <- newErrorFcall(req.Tag, ErrDuptag):
86 // Send to responses, bypass tag management.
87 case <-c.ctx.Done():
88 return c.ctx.Err()
89 case <-c.closed:
90 return c.err
91 }
92 continue
93 }
95 switch msg := req.Message.(type) {
96 case MessageTflush:
97 log.Println("server: flushing message", msg.Oldtag)
99 var resp *Fcall
100 // check if we have actually know about the requested flush
101 active, ok := tags[msg.Oldtag]
102 if ok {
103 active.cancel() // propagate cancellation to callees
104 delete(tags, msg.Oldtag)
105 resp = newFcall(req.Tag, MessageRflush{})
106 } else {
107 resp = newErrorFcall(req.Tag, ErrUnknownTag)
110 select {
111 case responses <- resp:
112 // bypass tag management in completed.
113 case <-c.ctx.Done():
114 return c.ctx.Err()
115 case <-c.closed:
116 return c.err
118 default:
119 // Allows us to session handlers to cancel processing of the fcall
120 // through context.
121 ctx, cancel := context.WithCancel(c.ctx)
123 // The contents of these instances are only writable in the main
124 // server loop. The value of tag will not change.
125 tags[req.Tag] = &activeRequest{
126 ctx: ctx,
127 request: req,
128 cancel: cancel,
131 go func(ctx context.Context, req *Fcall) {
132 var resp *Fcall
133 msg, err := c.handler.Handle(ctx, req.Message)
134 if err != nil {
135 // all handler errors are forwarded as protocol errors.
136 resp = newErrorFcall(req.Tag, err)
137 } else {
138 resp = newFcall(req.Tag, msg)
141 select {
142 case completed <- resp:
143 case <-ctx.Done():
144 return
145 case <-c.closed:
146 return
148 }(ctx, req)
150 case resp := <-completed:
151 // only responses that flip the tag state traverse this section.
152 active, ok := tags[resp.Tag]
153 if !ok {
154 // The tag is no longer active. Likely a flushed message.
155 continue
158 select {
159 case responses <- resp:
160 case <-active.ctx.Done():
161 // the context was canceled for some reason, perhaps timeout or
162 // due to a flush call. We treat this as a condition where a
163 // response should not be sent.
164 log.Println("canceled", resp, active.ctx.Err())
166 delete(tags, resp.Tag)
167 case <-c.ctx.Done():
168 return c.ctx.Err()
169 case <-c.closed:
170 return c.err
175 // read takes requests off the channel and sends them on requests.
176 func (c *conn) read(requests chan *Fcall) {
177 for {
178 req := new(Fcall)
179 if err := c.ch.ReadFcall(c.ctx, req); err != nil {
180 if err, ok := err.(net.Error); ok {
181 if err.Timeout() || err.Temporary() {
182 // TODO(stevvooe): A full idle timeout on the connection
183 // should be enforced here. No logging because it is quite
184 // chatty.
185 continue
189 c.CloseWithError(fmt.Errorf("error reading fcall: %v", err))
190 return
193 select {
194 case requests <- req:
195 case <-c.ctx.Done():
196 c.CloseWithError(c.ctx.Err())
197 return
198 case <-c.closed:
199 return
204 func (c *conn) write(responses chan *Fcall) {
205 for {
206 select {
207 case resp := <-responses:
208 if err := c.ch.WriteFcall(c.ctx, resp); err != nil {
209 if err, ok := err.(net.Error); ok {
210 if err.Timeout() || err.Temporary() {
211 // TODO(stevvooe): A full idle timeout on the
212 // connection should be enforced here. We log here,
213 // since this is less common.
214 log.Println("9p server: temporary error writing fcall: %v", err)
215 continue
219 c.CloseWithError(fmt.Errorf("error writing fcall: %v", err))
220 return
222 case <-c.ctx.Done():
223 c.CloseWithError(c.ctx.Err())
224 return
225 case <-c.closed:
226 return
231 func (c *conn) Close() error {
232 return c.CloseWithError(nil)
235 func (c *conn) CloseWithError(err error) error {
236 select {
237 case <-c.closed:
238 return c.err
239 default:
240 close(c.closed)
241 if err == nil {
242 c.err = err
243 } else {
244 c.err = ErrClosed
247 return c.err