Blob


1 package p9p
3 import (
4 "fmt"
5 "log"
6 "net"
7 "sync"
8 "time"
10 "context"
11 )
13 // TODO(stevvooe): Add net/http.Server-like type here to manage connections.
14 // Coupled with Handler mux, we can get a very http-like experience for 9p
15 // servers.
17 // ServeConn the 9p handler over the provided network connection.
18 func ServeConn(ctx context.Context, cn net.Conn, handler Handler) error {
20 // TODO(stevvooe): It would be nice if the handler could declare the
21 // supported version. Before we had handler, we used the session to get
22 // the version (msize, version := session.Version()). We must decided if
23 // we want to proxy version and message size decisions all the back to the
24 // origin server or make those decisions at each link of a proxy chain.
26 ch := newChannel(cn, codec9p{}, DefaultMSize)
27 negctx, cancel := context.WithTimeout(ctx, 1*time.Second)
28 defer cancel()
30 // TODO(stevvooe): For now, we negotiate here. It probably makes sense to
31 // do this outside of this function and then pass in a ready made channel.
32 // We are not really ready to export the channel type yet.
34 if err := servernegotiate(negctx, ch, DefaultVersion); err != nil {
35 // TODO(stevvooe): Need better error handling and retry support here.
36 return fmt.Errorf("error negotiating version: %s", err)
37 }
39 ctx = withVersion(ctx, DefaultVersion)
41 c := &conn{
42 ctx: ctx,
43 ch: ch,
44 handler: handler,
45 closed: make(chan struct{}),
46 }
48 return c.serve()
49 }
51 // conn plays role of session dispatch for handler in a server.
52 type conn struct {
53 ctx context.Context
54 session Session
55 ch Channel
56 handler Handler
58 once sync.Once
59 closed chan struct{}
60 err error // terminal error for the conn
61 }
63 // activeRequest includes information about the active request.
64 type activeRequest struct {
65 ctx context.Context
66 request *Fcall
67 cancel context.CancelFunc
68 }
70 // serve messages on the connection until an error is encountered.
71 func (c *conn) serve() error {
72 tags := map[Tag]*activeRequest{} // active requests
74 requests := make(chan *Fcall) // sync, read-limited
75 responses := make(chan *Fcall) // sync, goroutine consumed
76 completed := make(chan *Fcall) // sync, send in goroutine per request
78 // read loop
79 go c.read(requests)
80 go c.write(responses)
82 log.Println("server.run()")
83 for {
84 select {
85 case req := <-requests:
86 if _, ok := tags[req.Tag]; ok {
87 select {
88 case responses <- newErrorFcall(req.Tag, ErrDuptag):
89 // Send to responses, bypass tag management.
90 case <-c.ctx.Done():
91 return c.ctx.Err()
92 case <-c.closed:
93 return c.err
94 }
95 continue
96 }
98 switch msg := req.Message.(type) {
99 case MessageTflush:
100 log.Println("server: flushing message", msg.Oldtag)
102 var resp *Fcall
103 // check if we have actually know about the requested flush
104 active, ok := tags[msg.Oldtag]
105 if ok {
106 active.cancel() // propagate cancellation to callees
107 delete(tags, msg.Oldtag)
108 resp = newFcall(req.Tag, MessageRflush{})
109 } else {
110 resp = newErrorFcall(req.Tag, ErrUnknownTag)
113 select {
114 case responses <- resp:
115 // bypass tag management in completed.
116 case <-c.ctx.Done():
117 return c.ctx.Err()
118 case <-c.closed:
119 return c.err
121 default:
122 // Allows us to session handlers to cancel processing of the fcall
123 // through context.
124 ctx, cancel := context.WithCancel(c.ctx)
126 // The contents of these instances are only writable in the main
127 // server loop. The value of tag will not change.
128 tags[req.Tag] = &activeRequest{
129 ctx: ctx,
130 request: req,
131 cancel: cancel,
134 go func(ctx context.Context, req *Fcall) {
135 // TODO(stevvooe): Re-write incoming Treads so that handler
136 // can always respond with a message of the correct msize.
138 var resp *Fcall
139 msg, err := c.handler.Handle(ctx, req.Message)
140 if err != nil {
141 // all handler errors are forwarded as protocol errors.
142 resp = newErrorFcall(req.Tag, err)
143 } else {
144 resp = newFcall(req.Tag, msg)
147 select {
148 case completed <- resp:
149 case <-ctx.Done():
150 return
151 case <-c.closed:
152 return
154 }(ctx, req)
156 case resp := <-completed:
157 // only responses that flip the tag state traverse this section.
158 active, ok := tags[resp.Tag]
159 if !ok {
160 // The tag is no longer active. Likely a flushed message.
161 continue
164 select {
165 case responses <- resp:
166 case <-active.ctx.Done():
167 // the context was canceled for some reason, perhaps timeout or
168 // due to a flush call. We treat this as a condition where a
169 // response should not be sent.
170 log.Println("canceled", resp, active.ctx.Err())
172 delete(tags, resp.Tag)
173 case <-c.ctx.Done():
174 return c.ctx.Err()
175 case <-c.closed:
176 return c.err
181 // read takes requests off the channel and sends them on requests.
182 func (c *conn) read(requests chan *Fcall) {
183 for {
184 req := new(Fcall)
185 if err := c.ch.ReadFcall(c.ctx, req); err != nil {
186 if err, ok := err.(net.Error); ok {
187 if err.Timeout() || err.Temporary() {
188 // TODO(stevvooe): A full idle timeout on the connection
189 // should be enforced here. No logging because it is quite
190 // chatty.
191 continue
195 c.CloseWithError(fmt.Errorf("error reading fcall: %v", err))
196 return
199 select {
200 case requests <- req:
201 case <-c.ctx.Done():
202 c.CloseWithError(c.ctx.Err())
203 return
204 case <-c.closed:
205 return
210 func (c *conn) write(responses chan *Fcall) {
211 for {
212 select {
213 case resp := <-responses:
214 // TODO(stevvooe): Correctly protect againt overflowing msize from
215 // handler. This can be done above, in the main message handler
216 // loop, by adjusting incoming Tread calls to have a Count that
217 // won't overflow the msize.
219 if err := c.ch.WriteFcall(c.ctx, resp); err != nil {
220 if err, ok := err.(net.Error); ok {
221 if err.Timeout() || err.Temporary() {
222 // TODO(stevvooe): A full idle timeout on the
223 // connection should be enforced here. We log here,
224 // since this is less common.
225 log.Printf("9p server: temporary error writing fcall: %v", err)
226 continue
230 c.CloseWithError(fmt.Errorf("error writing fcall: %v", err))
231 return
233 case <-c.ctx.Done():
234 c.CloseWithError(c.ctx.Err())
235 return
236 case <-c.closed:
237 return
242 func (c *conn) Close() error {
243 return c.CloseWithError(nil)
246 func (c *conn) CloseWithError(err error) error {
247 c.once.Do(func() {
248 if err == nil {
249 err = ErrClosed
252 c.err = err
253 close(c.closed)
254 })
256 return c.err