Blob


1 package p9pnew
3 import (
4 "log"
5 "net"
6 "time"
8 "golang.org/x/net/context"
9 )
11 // Serve the 9p session over the provided network connection.
12 func Serve(ctx context.Context, conn net.Conn, session Session) {
13 const msize = 64 << 10
14 const vers = "9P2000"
16 ch := newChannel(conn, codec9p{}, msize)
18 negctx, cancel := context.WithTimeout(ctx, 1*time.Second)
19 defer cancel()
21 // TODO(stevvooe): For now, we negotiate here. It probably makes sense to
22 // do this outside of this function and then pass in a ready made channel.
23 // We are not really ready to export the channel type yet.
25 if err := servernegotiate(negctx, ch, vers); err != nil {
26 // TODO(stevvooe): Need better error handling and retry support here.
27 // For now, we silently ignore the failure.
28 log.Println("error negotiating version:", err)
29 return
30 }
32 s := &server{
33 ctx: ctx,
34 ch: ch,
35 handler: &dispatcher{session: session},
36 closed: make(chan struct{}),
37 }
39 s.run()
40 }
42 type server struct {
43 ctx context.Context
44 session Session
45 ch Channel
46 handler handler
47 closed chan struct{}
48 }
50 type activeTag struct {
51 ctx context.Context
52 request *Fcall
53 cancel context.CancelFunc
54 responded bool // true, if some response was sent (Response or Rflush/Rerror)
55 }
57 func (s *server) run() {
58 tags := map[Tag]*activeTag{} // active requests
60 log.Println("server.run()")
61 for {
62 select {
63 case <-s.ctx.Done():
64 log.Println("server: shutdown")
65 return
66 case <-s.closed:
67 default:
68 }
70 // BUG(stevvooe): This server blocks on reads, calls to handlers and
71 // write, effectively single tracking fcalls through a target
72 // dispatcher. There is no reason we couldn't parallelize these
73 // requests out to the dispatcher to get massive performance
74 // improvements.
76 log.Println("server:", "wait")
77 req := new(Fcall)
78 if err := s.ch.ReadFcall(s.ctx, req); err != nil {
79 log.Println("server: error reading fcall", err)
80 continue
81 }
83 if _, ok := tags[req.Tag]; ok {
84 resp := newErrorFcall(req.Tag, ErrDuptag)
85 if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
86 log.Printf("error sending duplicate tag response: %v", err)
87 }
88 continue
89 }
91 // handle flush calls. The tag never makes it into active from here.
92 if mf, ok := req.Message.(MessageTflush); ok {
93 log.Println("flushing message", mf.Oldtag)
95 // check if we have actually know about the requested flush
96 active, ok := tags[mf.Oldtag]
97 if ok {
98 active.cancel() // cancel the context
100 resp := newFcall(MessageRflush{})
101 resp.Tag = req.Tag
102 if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
103 log.Printf("error responding to flush: %v", err)
105 active.responded = true
106 } else {
107 resp := newErrorFcall(req.Tag, ErrUnknownTag)
108 if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
109 log.Printf("error responding to flush: %v", err)
113 continue
116 // TODO(stevvooe): Add handler timeout here, as well, if we desire.
118 // Allows us to signal handlers to cancel processing of the fcall
119 // through context.
120 ctx, cancel := context.WithCancel(s.ctx)
122 tags[req.Tag] = &activeTag{
123 ctx: ctx,
124 request: req,
125 cancel: cancel,
128 resp, err := s.handler.handle(ctx, req)
129 if err != nil {
130 // all handler errors are forwarded as protocol errors.
131 resp = newErrorFcall(req.Tag, err)
133 resp.Tag = req.Tag
135 if err := ctx.Err(); err != nil {
136 // NOTE(stevvooe): We aren't really getting our moneys worth for
137 // how this is being handled. We really need to dispatch each
138 // request handler to a separate thread.
140 // the context was canceled for some reason, perhaps timeout or
141 // due to a flush call. We treat this as a condition where a
142 // response should not be sent.
143 log.Println("context error:", err)
144 continue
147 if !tags[req.Tag].responded {
148 if err := s.ch.WriteFcall(ctx, resp); err != nil {
149 log.Println("server: error writing fcall:", err)
150 continue
154 delete(tags, req.Tag)