Blob


1 package p9p
3 import (
4 "log"
5 "net"
6 "time"
8 "golang.org/x/net/context"
9 )
11 // Serve the 9p session over the provided network connection.
12 func Serve(ctx context.Context, conn net.Conn, handler Handler) {
14 // TODO(stevvooe): It would be nice if the handler could declare the
15 // supported version. Before we had handler, we used the session to get
16 // the version (msize, version := session.Version()). We must decided if
17 // we want to proxy version and message size decisions all the back to the
18 // origin server or make those decisions at each link of a proxy chain.
20 ch := newChannel(conn, codec9p{}, DefaultMSize)
21 negctx, cancel := context.WithTimeout(ctx, 1*time.Second)
22 defer cancel()
24 // TODO(stevvooe): For now, we negotiate here. It probably makes sense to
25 // do this outside of this function and then pass in a ready made channel.
26 // We are not really ready to export the channel type yet.
28 if err := servernegotiate(negctx, ch, DefaultVersion); err != nil {
29 // TODO(stevvooe): Need better error handling and retry support here.
30 // For now, we silently ignore the failure.
31 log.Println("error negotiating version:", err)
32 return
33 }
35 ctx = withVersion(ctx, DefaultVersion)
37 s := &server{
38 ctx: ctx,
39 ch: ch,
40 handler: handler,
41 closed: make(chan struct{}),
42 }
44 s.run()
45 }
47 type server struct {
48 ctx context.Context
49 session Session
50 ch Channel
51 handler Handler
52 closed chan struct{}
53 }
55 type activeTag struct {
56 ctx context.Context
57 request *Fcall
58 cancel context.CancelFunc
59 responded bool // true, if some response was sent (Response or Rflush/Rerror)
60 }
62 func (s *server) run() {
63 tags := map[Tag]*activeTag{} // active requests
65 log.Println("server.run()")
66 for {
67 select {
68 case <-s.ctx.Done():
69 log.Println("server: context done")
70 return
71 case <-s.closed:
72 log.Println("server: shutdown")
73 default:
74 }
76 // BUG(stevvooe): This server blocks on reads, calls to handlers and
77 // write, effectively single tracking fcalls through a target
78 // dispatcher. There is no reason we couldn't parallelize these
79 // requests out to the dispatcher to get massive performance
80 // improvements.
82 log.Println("server:", "wait")
83 req := new(Fcall)
84 if err := s.ch.ReadFcall(s.ctx, req); err != nil {
85 if err, ok := err.(net.Error); ok {
86 if err.Timeout() || err.Temporary() {
87 continue
88 }
89 }
91 log.Println("server: error reading fcall", err)
92 return
93 }
95 if _, ok := tags[req.Tag]; ok {
96 resp := newErrorFcall(req.Tag, ErrDuptag)
97 if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
98 log.Printf("error sending duplicate tag response: %v", err)
99 }
100 continue
103 // handle flush calls. The tag never makes it into active from here.
104 if mf, ok := req.Message.(MessageTflush); ok {
105 log.Println("flushing message", mf.Oldtag)
107 // check if we have actually know about the requested flush
108 active, ok := tags[mf.Oldtag]
109 if ok {
110 active.cancel() // cancel the context
112 resp := newFcall(MessageRflush{})
113 resp.Tag = req.Tag
114 if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
115 log.Printf("error responding to flush: %v", err)
117 active.responded = true
118 } else {
119 resp := newErrorFcall(req.Tag, ErrUnknownTag)
120 if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
121 log.Printf("error responding to flush: %v", err)
125 continue
128 // TODO(stevvooe): Add handler timeout here, as well, if we desire.
130 // Allows us to signal handlers to cancel processing of the fcall
131 // through context.
132 ctx, cancel := context.WithCancel(s.ctx)
134 tags[req.Tag] = &activeTag{
135 ctx: ctx,
136 request: req,
137 cancel: cancel,
140 var resp *Fcall
141 msg, err := s.handler.Handle(ctx, req.Message)
142 if err != nil {
143 // all handler errors are forwarded as protocol errors.
144 resp = newErrorFcall(req.Tag, err)
145 } else {
146 resp = newFcall(msg)
148 resp.Tag = req.Tag
150 if err := ctx.Err(); err != nil {
151 // NOTE(stevvooe): We aren't really getting our moneys worth for
152 // how this is being handled. We really need to dispatch each
153 // request handler to a separate thread.
155 // the context was canceled for some reason, perhaps timeout or
156 // due to a flush call. We treat this as a condition where a
157 // response should not be sent.
158 log.Println("context error:", err)
159 continue
162 if !tags[req.Tag].responded {
163 if err := s.ch.WriteFcall(ctx, resp); err != nil {
164 log.Println("server: error writing fcall:", err)
165 continue
169 delete(tags, req.Tag)