Blob


1 package p9pnew
3 import (
4 "log"
5 "net"
6 "time"
8 "golang.org/x/net/context"
9 )
11 // Serve the 9p session over the provided network connection.
12 func Serve(ctx context.Context, conn net.Conn, session Session) {
13 msize, version := session.Version()
14 ch := newChannel(conn, codec9p{}, msize)
15 negctx, cancel := context.WithTimeout(ctx, 1*time.Second)
16 defer cancel()
18 // TODO(stevvooe): For now, we negotiate here. It probably makes sense to
19 // do this outside of this function and then pass in a ready made channel.
20 // We are not really ready to export the channel type yet.
22 if err := servernegotiate(negctx, ch, version); err != nil {
23 // TODO(stevvooe): Need better error handling and retry support here.
24 // For now, we silently ignore the failure.
25 log.Println("error negotiating version:", err)
26 return
27 }
29 ctx = withVersion(ctx, version)
31 s := &server{
32 ctx: ctx,
33 ch: ch,
34 handler: &dispatcher{session: session},
35 closed: make(chan struct{}),
36 }
38 s.run()
39 }
41 type server struct {
42 ctx context.Context
43 session Session
44 ch Channel
45 handler handler
46 closed chan struct{}
47 }
49 type activeTag struct {
50 ctx context.Context
51 request *Fcall
52 cancel context.CancelFunc
53 responded bool // true, if some response was sent (Response or Rflush/Rerror)
54 }
56 func (s *server) run() {
57 tags := map[Tag]*activeTag{} // active requests
59 log.Println("server.run()")
60 for {
61 select {
62 case <-s.ctx.Done():
63 log.Println("server: shutdown")
64 return
65 case <-s.closed:
66 default:
67 }
69 // BUG(stevvooe): This server blocks on reads, calls to handlers and
70 // write, effectively single tracking fcalls through a target
71 // dispatcher. There is no reason we couldn't parallelize these
72 // requests out to the dispatcher to get massive performance
73 // improvements.
75 log.Println("server:", "wait")
76 req := new(Fcall)
77 if err := s.ch.ReadFcall(s.ctx, req); err != nil {
78 log.Println("server: error reading fcall", err)
79 continue
80 }
82 if _, ok := tags[req.Tag]; ok {
83 resp := newErrorFcall(req.Tag, ErrDuptag)
84 if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
85 log.Printf("error sending duplicate tag response: %v", err)
86 }
87 continue
88 }
90 // handle flush calls. The tag never makes it into active from here.
91 if mf, ok := req.Message.(MessageTflush); ok {
92 log.Println("flushing message", mf.Oldtag)
94 // check if we have actually know about the requested flush
95 active, ok := tags[mf.Oldtag]
96 if ok {
97 active.cancel() // cancel the context
99 resp := newFcall(MessageRflush{})
100 resp.Tag = req.Tag
101 if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
102 log.Printf("error responding to flush: %v", err)
104 active.responded = true
105 } else {
106 resp := newErrorFcall(req.Tag, ErrUnknownTag)
107 if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
108 log.Printf("error responding to flush: %v", err)
112 continue
115 // TODO(stevvooe): Add handler timeout here, as well, if we desire.
117 // Allows us to signal handlers to cancel processing of the fcall
118 // through context.
119 ctx, cancel := context.WithCancel(s.ctx)
121 tags[req.Tag] = &activeTag{
122 ctx: ctx,
123 request: req,
124 cancel: cancel,
127 resp, err := s.handler.handle(ctx, req)
128 if err != nil {
129 // all handler errors are forwarded as protocol errors.
130 resp = newErrorFcall(req.Tag, err)
132 resp.Tag = req.Tag
134 if err := ctx.Err(); err != nil {
135 // NOTE(stevvooe): We aren't really getting our moneys worth for
136 // how this is being handled. We really need to dispatch each
137 // request handler to a separate thread.
139 // the context was canceled for some reason, perhaps timeout or
140 // due to a flush call. We treat this as a condition where a
141 // response should not be sent.
142 log.Println("context error:", err)
143 continue
146 if !tags[req.Tag].responded {
147 if err := s.ch.WriteFcall(ctx, resp); err != nil {
148 log.Println("server: error writing fcall:", err)
149 continue
153 delete(tags, req.Tag)