Blame


1 499f8c59 2015-10-27 stephen.d package p9pnew
2 499f8c59 2015-10-27 stephen.d
3 96ad2b22 2015-10-28 stephen.d import (
4 96ad2b22 2015-10-28 stephen.d "log"
5 96ad2b22 2015-10-28 stephen.d "net"
6 40d4a02d 2015-11-03 stephen.d "time"
7 499f8c59 2015-10-27 stephen.d
8 96ad2b22 2015-10-28 stephen.d "golang.org/x/net/context"
9 96ad2b22 2015-10-28 stephen.d )
10 96ad2b22 2015-10-28 stephen.d
11 499f8c59 2015-10-27 stephen.d // Serve the 9p session over the provided network connection.
12 b714e9e0 2015-11-05 stephen.d func Serve(ctx context.Context, conn net.Conn, handler Handler) {
13 b714e9e0 2015-11-05 stephen.d
14 b714e9e0 2015-11-05 stephen.d // TODO(stevvooe): It would be nice if the handler could declare the
15 b714e9e0 2015-11-05 stephen.d // supported version. Before we had handler, we used the session to get
16 b714e9e0 2015-11-05 stephen.d // the version (msize, version := session.Version()). We must decided if
17 b714e9e0 2015-11-05 stephen.d // we want to proxy version and message size decisions all the back to the
18 b714e9e0 2015-11-05 stephen.d // origin server or make those decisions at each link of a proxy chain.
19 b714e9e0 2015-11-05 stephen.d
20 b714e9e0 2015-11-05 stephen.d ch := newChannel(conn, codec9p{}, DefaultMSize)
21 40d4a02d 2015-11-03 stephen.d negctx, cancel := context.WithTimeout(ctx, 1*time.Second)
22 40d4a02d 2015-11-03 stephen.d defer cancel()
23 40d4a02d 2015-11-03 stephen.d
24 40d4a02d 2015-11-03 stephen.d // TODO(stevvooe): For now, we negotiate here. It probably makes sense to
25 40d4a02d 2015-11-03 stephen.d // do this outside of this function and then pass in a ready made channel.
26 40d4a02d 2015-11-03 stephen.d // We are not really ready to export the channel type yet.
27 40d4a02d 2015-11-03 stephen.d
28 b714e9e0 2015-11-05 stephen.d if err := servernegotiate(negctx, ch, DefaultVersion); err != nil {
29 40d4a02d 2015-11-03 stephen.d // TODO(stevvooe): Need better error handling and retry support here.
30 40d4a02d 2015-11-03 stephen.d // For now, we silently ignore the failure.
31 40d4a02d 2015-11-03 stephen.d log.Println("error negotiating version:", err)
32 40d4a02d 2015-11-03 stephen.d return
33 40d4a02d 2015-11-03 stephen.d }
34 40d4a02d 2015-11-03 stephen.d
35 b714e9e0 2015-11-05 stephen.d ctx = withVersion(ctx, DefaultVersion)
36 dce371f6 2015-11-05 stephen.d
37 e6bcde66 2015-10-29 stephen.d s := &server{
38 e6bcde66 2015-10-29 stephen.d ctx: ctx,
39 40d4a02d 2015-11-03 stephen.d ch: ch,
40 b714e9e0 2015-11-05 stephen.d handler: handler,
41 40d4a02d 2015-11-03 stephen.d closed: make(chan struct{}),
42 e6bcde66 2015-10-29 stephen.d }
43 e6bcde66 2015-10-29 stephen.d
44 e6bcde66 2015-10-29 stephen.d s.run()
45 499f8c59 2015-10-27 stephen.d }
46 96ad2b22 2015-10-28 stephen.d
47 96ad2b22 2015-10-28 stephen.d type server struct {
48 96ad2b22 2015-10-28 stephen.d ctx context.Context
49 96ad2b22 2015-10-28 stephen.d session Session
50 3bf22e58 2015-11-04 stephen.d ch Channel
51 b714e9e0 2015-11-05 stephen.d handler Handler
52 e6bcde66 2015-10-29 stephen.d closed chan struct{}
53 96ad2b22 2015-10-28 stephen.d }
54 96ad2b22 2015-10-28 stephen.d
55 40d4a02d 2015-11-03 stephen.d type activeTag struct {
56 40d4a02d 2015-11-03 stephen.d ctx context.Context
57 40d4a02d 2015-11-03 stephen.d request *Fcall
58 40d4a02d 2015-11-03 stephen.d cancel context.CancelFunc
59 40d4a02d 2015-11-03 stephen.d responded bool // true, if some response was sent (Response or Rflush/Rerror)
60 40d4a02d 2015-11-03 stephen.d }
61 40d4a02d 2015-11-03 stephen.d
62 96ad2b22 2015-10-28 stephen.d func (s *server) run() {
63 40d4a02d 2015-11-03 stephen.d tags := map[Tag]*activeTag{} // active requests
64 96ad2b22 2015-10-28 stephen.d
65 e6bcde66 2015-10-29 stephen.d log.Println("server.run()")
66 e6bcde66 2015-10-29 stephen.d for {
67 e6bcde66 2015-10-29 stephen.d select {
68 e6bcde66 2015-10-29 stephen.d case <-s.ctx.Done():
69 b714e9e0 2015-11-05 stephen.d log.Println("server: context done")
70 e6bcde66 2015-10-29 stephen.d return
71 e6bcde66 2015-10-29 stephen.d case <-s.closed:
72 b714e9e0 2015-11-05 stephen.d log.Println("server: shutdown")
73 e6bcde66 2015-10-29 stephen.d default:
74 e6bcde66 2015-10-29 stephen.d }
75 e6bcde66 2015-10-29 stephen.d
76 40d4a02d 2015-11-03 stephen.d // BUG(stevvooe): This server blocks on reads, calls to handlers and
77 40d4a02d 2015-11-03 stephen.d // write, effectively single tracking fcalls through a target
78 40d4a02d 2015-11-03 stephen.d // dispatcher. There is no reason we couldn't parallelize these
79 40d4a02d 2015-11-03 stephen.d // requests out to the dispatcher to get massive performance
80 40d4a02d 2015-11-03 stephen.d // improvements.
81 e6bcde66 2015-10-29 stephen.d
82 e6bcde66 2015-10-29 stephen.d log.Println("server:", "wait")
83 40d4a02d 2015-11-03 stephen.d req := new(Fcall)
84 3bf22e58 2015-11-04 stephen.d if err := s.ch.ReadFcall(s.ctx, req); err != nil {
85 b714e9e0 2015-11-05 stephen.d if err, ok := err.(net.Error); ok {
86 b714e9e0 2015-11-05 stephen.d if err.Timeout() || err.Temporary() {
87 b714e9e0 2015-11-05 stephen.d continue
88 b714e9e0 2015-11-05 stephen.d }
89 b714e9e0 2015-11-05 stephen.d }
90 b714e9e0 2015-11-05 stephen.d
91 40d4a02d 2015-11-03 stephen.d log.Println("server: error reading fcall", err)
92 b714e9e0 2015-11-05 stephen.d return
93 e6bcde66 2015-10-29 stephen.d }
94 e6bcde66 2015-10-29 stephen.d
95 40d4a02d 2015-11-03 stephen.d if _, ok := tags[req.Tag]; ok {
96 40d4a02d 2015-11-03 stephen.d resp := newErrorFcall(req.Tag, ErrDuptag)
97 3bf22e58 2015-11-04 stephen.d if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
98 40d4a02d 2015-11-03 stephen.d log.Printf("error sending duplicate tag response: %v", err)
99 e6bcde66 2015-10-29 stephen.d }
100 e6bcde66 2015-10-29 stephen.d continue
101 e6bcde66 2015-10-29 stephen.d }
102 e6bcde66 2015-10-29 stephen.d
103 40d4a02d 2015-11-03 stephen.d // handle flush calls. The tag never makes it into active from here.
104 40d4a02d 2015-11-03 stephen.d if mf, ok := req.Message.(MessageTflush); ok {
105 40d4a02d 2015-11-03 stephen.d log.Println("flushing message", mf.Oldtag)
106 e6bcde66 2015-10-29 stephen.d
107 40d4a02d 2015-11-03 stephen.d // check if we have actually know about the requested flush
108 40d4a02d 2015-11-03 stephen.d active, ok := tags[mf.Oldtag]
109 40d4a02d 2015-11-03 stephen.d if ok {
110 40d4a02d 2015-11-03 stephen.d active.cancel() // cancel the context
111 e6bcde66 2015-10-29 stephen.d
112 40d4a02d 2015-11-03 stephen.d resp := newFcall(MessageRflush{})
113 40d4a02d 2015-11-03 stephen.d resp.Tag = req.Tag
114 3bf22e58 2015-11-04 stephen.d if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
115 40d4a02d 2015-11-03 stephen.d log.Printf("error responding to flush: %v", err)
116 40d4a02d 2015-11-03 stephen.d }
117 40d4a02d 2015-11-03 stephen.d active.responded = true
118 40d4a02d 2015-11-03 stephen.d } else {
119 40d4a02d 2015-11-03 stephen.d resp := newErrorFcall(req.Tag, ErrUnknownTag)
120 3bf22e58 2015-11-04 stephen.d if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
121 40d4a02d 2015-11-03 stephen.d log.Printf("error responding to flush: %v", err)
122 40d4a02d 2015-11-03 stephen.d }
123 40d4a02d 2015-11-03 stephen.d }
124 96ad2b22 2015-10-28 stephen.d
125 40d4a02d 2015-11-03 stephen.d continue
126 96ad2b22 2015-10-28 stephen.d }
127 96ad2b22 2015-10-28 stephen.d
128 40d4a02d 2015-11-03 stephen.d // TODO(stevvooe): Add handler timeout here, as well, if we desire.
129 e6bcde66 2015-10-29 stephen.d
130 40d4a02d 2015-11-03 stephen.d // Allows us to signal handlers to cancel processing of the fcall
131 40d4a02d 2015-11-03 stephen.d // through context.
132 40d4a02d 2015-11-03 stephen.d ctx, cancel := context.WithCancel(s.ctx)
133 e6bcde66 2015-10-29 stephen.d
134 40d4a02d 2015-11-03 stephen.d tags[req.Tag] = &activeTag{
135 40d4a02d 2015-11-03 stephen.d ctx: ctx,
136 40d4a02d 2015-11-03 stephen.d request: req,
137 40d4a02d 2015-11-03 stephen.d cancel: cancel,
138 e6bcde66 2015-10-29 stephen.d }
139 e6bcde66 2015-10-29 stephen.d
140 b714e9e0 2015-11-05 stephen.d var resp *Fcall
141 b714e9e0 2015-11-05 stephen.d msg, err := s.handler.Handle(ctx, req.Message)
142 97423e8b 2015-10-29 stephen.d if err != nil {
143 40d4a02d 2015-11-03 stephen.d // all handler errors are forwarded as protocol errors.
144 40d4a02d 2015-11-03 stephen.d resp = newErrorFcall(req.Tag, err)
145 b714e9e0 2015-11-05 stephen.d } else {
146 b714e9e0 2015-11-05 stephen.d resp = newFcall(msg)
147 97423e8b 2015-10-29 stephen.d }
148 40d4a02d 2015-11-03 stephen.d resp.Tag = req.Tag
149 97423e8b 2015-10-29 stephen.d
150 40d4a02d 2015-11-03 stephen.d if err := ctx.Err(); err != nil {
151 40d4a02d 2015-11-03 stephen.d // NOTE(stevvooe): We aren't really getting our moneys worth for
152 40d4a02d 2015-11-03 stephen.d // how this is being handled. We really need to dispatch each
153 40d4a02d 2015-11-03 stephen.d // request handler to a separate thread.
154 97423e8b 2015-10-29 stephen.d
155 40d4a02d 2015-11-03 stephen.d // the context was canceled for some reason, perhaps timeout or
156 40d4a02d 2015-11-03 stephen.d // due to a flush call. We treat this as a condition where a
157 40d4a02d 2015-11-03 stephen.d // response should not be sent.
158 40d4a02d 2015-11-03 stephen.d log.Println("context error:", err)
159 40d4a02d 2015-11-03 stephen.d continue
160 97423e8b 2015-10-29 stephen.d }
161 97423e8b 2015-10-29 stephen.d
162 40d4a02d 2015-11-03 stephen.d if !tags[req.Tag].responded {
163 3bf22e58 2015-11-04 stephen.d if err := s.ch.WriteFcall(ctx, resp); err != nil {
164 40d4a02d 2015-11-03 stephen.d log.Println("server: error writing fcall:", err)
165 40d4a02d 2015-11-03 stephen.d continue
166 40d4a02d 2015-11-03 stephen.d }
167 97423e8b 2015-10-29 stephen.d }
168 97423e8b 2015-10-29 stephen.d
169 40d4a02d 2015-11-03 stephen.d delete(tags, req.Tag)
170 96ad2b22 2015-10-28 stephen.d }
171 96ad2b22 2015-10-28 stephen.d }