Blame


1 e342de7d 2015-11-11 adrien package p9p
2 e342de7d 2015-11-11 adrien
3 e342de7d 2015-11-11 adrien import (
4 e342de7d 2015-11-11 adrien "log"
5 e342de7d 2015-11-11 adrien "net"
6 e342de7d 2015-11-11 adrien "time"
7 e342de7d 2015-11-11 adrien
8 e342de7d 2015-11-11 adrien "golang.org/x/net/context"
9 e342de7d 2015-11-11 adrien )
10 e342de7d 2015-11-11 adrien
11 e342de7d 2015-11-11 adrien // Serve the 9p session over the provided network connection.
12 e342de7d 2015-11-11 adrien func Serve(ctx context.Context, conn net.Conn, handler Handler) {
13 e342de7d 2015-11-11 adrien
14 e342de7d 2015-11-11 adrien // TODO(stevvooe): It would be nice if the handler could declare the
15 e342de7d 2015-11-11 adrien // supported version. Before we had handler, we used the session to get
16 e342de7d 2015-11-11 adrien // the version (msize, version := session.Version()). We must decided if
17 e342de7d 2015-11-11 adrien // we want to proxy version and message size decisions all the back to the
18 e342de7d 2015-11-11 adrien // origin server or make those decisions at each link of a proxy chain.
19 e342de7d 2015-11-11 adrien
20 e342de7d 2015-11-11 adrien ch := newChannel(conn, codec9p{}, DefaultMSize)
21 e342de7d 2015-11-11 adrien negctx, cancel := context.WithTimeout(ctx, 1*time.Second)
22 e342de7d 2015-11-11 adrien defer cancel()
23 e342de7d 2015-11-11 adrien
24 e342de7d 2015-11-11 adrien // TODO(stevvooe): For now, we negotiate here. It probably makes sense to
25 e342de7d 2015-11-11 adrien // do this outside of this function and then pass in a ready made channel.
26 e342de7d 2015-11-11 adrien // We are not really ready to export the channel type yet.
27 e342de7d 2015-11-11 adrien
28 e342de7d 2015-11-11 adrien if err := servernegotiate(negctx, ch, DefaultVersion); err != nil {
29 e342de7d 2015-11-11 adrien // TODO(stevvooe): Need better error handling and retry support here.
30 e342de7d 2015-11-11 adrien // For now, we silently ignore the failure.
31 e342de7d 2015-11-11 adrien log.Println("error negotiating version:", err)
32 e342de7d 2015-11-11 adrien return
33 e342de7d 2015-11-11 adrien }
34 e342de7d 2015-11-11 adrien
35 e342de7d 2015-11-11 adrien ctx = withVersion(ctx, DefaultVersion)
36 e342de7d 2015-11-11 adrien
37 e342de7d 2015-11-11 adrien s := &server{
38 e342de7d 2015-11-11 adrien ctx: ctx,
39 e342de7d 2015-11-11 adrien ch: ch,
40 e342de7d 2015-11-11 adrien handler: handler,
41 e342de7d 2015-11-11 adrien closed: make(chan struct{}),
42 e342de7d 2015-11-11 adrien }
43 e342de7d 2015-11-11 adrien
44 e342de7d 2015-11-11 adrien s.run()
45 e342de7d 2015-11-11 adrien }
46 e342de7d 2015-11-11 adrien
47 e342de7d 2015-11-11 adrien type server struct {
48 e342de7d 2015-11-11 adrien ctx context.Context
49 e342de7d 2015-11-11 adrien session Session
50 e342de7d 2015-11-11 adrien ch Channel
51 e342de7d 2015-11-11 adrien handler Handler
52 e342de7d 2015-11-11 adrien closed chan struct{}
53 e342de7d 2015-11-11 adrien }
54 e342de7d 2015-11-11 adrien
55 e342de7d 2015-11-11 adrien type activeTag struct {
56 e342de7d 2015-11-11 adrien ctx context.Context
57 e342de7d 2015-11-11 adrien request *Fcall
58 e342de7d 2015-11-11 adrien cancel context.CancelFunc
59 e342de7d 2015-11-11 adrien responded bool // true, if some response was sent (Response or Rflush/Rerror)
60 e342de7d 2015-11-11 adrien }
61 e342de7d 2015-11-11 adrien
62 e342de7d 2015-11-11 adrien func (s *server) run() {
63 e342de7d 2015-11-11 adrien tags := map[Tag]*activeTag{} // active requests
64 e342de7d 2015-11-11 adrien
65 e342de7d 2015-11-11 adrien log.Println("server.run()")
66 e342de7d 2015-11-11 adrien for {
67 e342de7d 2015-11-11 adrien select {
68 e342de7d 2015-11-11 adrien case <-s.ctx.Done():
69 e342de7d 2015-11-11 adrien log.Println("server: context done")
70 e342de7d 2015-11-11 adrien return
71 e342de7d 2015-11-11 adrien case <-s.closed:
72 e342de7d 2015-11-11 adrien log.Println("server: shutdown")
73 e342de7d 2015-11-11 adrien default:
74 e342de7d 2015-11-11 adrien }
75 e342de7d 2015-11-11 adrien
76 e342de7d 2015-11-11 adrien // BUG(stevvooe): This server blocks on reads, calls to handlers and
77 e342de7d 2015-11-11 adrien // write, effectively single tracking fcalls through a target
78 e342de7d 2015-11-11 adrien // dispatcher. There is no reason we couldn't parallelize these
79 e342de7d 2015-11-11 adrien // requests out to the dispatcher to get massive performance
80 e342de7d 2015-11-11 adrien // improvements.
81 e342de7d 2015-11-11 adrien
82 e342de7d 2015-11-11 adrien log.Println("server:", "wait")
83 e342de7d 2015-11-11 adrien req := new(Fcall)
84 e342de7d 2015-11-11 adrien if err := s.ch.ReadFcall(s.ctx, req); err != nil {
85 e342de7d 2015-11-11 adrien if err, ok := err.(net.Error); ok {
86 e342de7d 2015-11-11 adrien if err.Timeout() || err.Temporary() {
87 e342de7d 2015-11-11 adrien continue
88 e342de7d 2015-11-11 adrien }
89 e342de7d 2015-11-11 adrien }
90 e342de7d 2015-11-11 adrien
91 e342de7d 2015-11-11 adrien log.Println("server: error reading fcall", err)
92 e342de7d 2015-11-11 adrien return
93 e342de7d 2015-11-11 adrien }
94 e342de7d 2015-11-11 adrien
95 e342de7d 2015-11-11 adrien if _, ok := tags[req.Tag]; ok {
96 e342de7d 2015-11-11 adrien resp := newErrorFcall(req.Tag, ErrDuptag)
97 e342de7d 2015-11-11 adrien if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
98 e342de7d 2015-11-11 adrien log.Printf("error sending duplicate tag response: %v", err)
99 e342de7d 2015-11-11 adrien }
100 e342de7d 2015-11-11 adrien continue
101 e342de7d 2015-11-11 adrien }
102 e342de7d 2015-11-11 adrien
103 e342de7d 2015-11-11 adrien // handle flush calls. The tag never makes it into active from here.
104 e342de7d 2015-11-11 adrien if mf, ok := req.Message.(MessageTflush); ok {
105 e342de7d 2015-11-11 adrien log.Println("flushing message", mf.Oldtag)
106 e342de7d 2015-11-11 adrien
107 e342de7d 2015-11-11 adrien // check if we have actually know about the requested flush
108 e342de7d 2015-11-11 adrien active, ok := tags[mf.Oldtag]
109 e342de7d 2015-11-11 adrien if ok {
110 e342de7d 2015-11-11 adrien active.cancel() // cancel the context
111 e342de7d 2015-11-11 adrien
112 e342de7d 2015-11-11 adrien resp := newFcall(MessageRflush{})
113 e342de7d 2015-11-11 adrien resp.Tag = req.Tag
114 e342de7d 2015-11-11 adrien if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
115 e342de7d 2015-11-11 adrien log.Printf("error responding to flush: %v", err)
116 e342de7d 2015-11-11 adrien }
117 e342de7d 2015-11-11 adrien active.responded = true
118 e342de7d 2015-11-11 adrien } else {
119 e342de7d 2015-11-11 adrien resp := newErrorFcall(req.Tag, ErrUnknownTag)
120 e342de7d 2015-11-11 adrien if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
121 e342de7d 2015-11-11 adrien log.Printf("error responding to flush: %v", err)
122 e342de7d 2015-11-11 adrien }
123 e342de7d 2015-11-11 adrien }
124 e342de7d 2015-11-11 adrien
125 e342de7d 2015-11-11 adrien continue
126 e342de7d 2015-11-11 adrien }
127 e342de7d 2015-11-11 adrien
128 e342de7d 2015-11-11 adrien // TODO(stevvooe): Add handler timeout here, as well, if we desire.
129 e342de7d 2015-11-11 adrien
130 e342de7d 2015-11-11 adrien // Allows us to signal handlers to cancel processing of the fcall
131 e342de7d 2015-11-11 adrien // through context.
132 e342de7d 2015-11-11 adrien ctx, cancel := context.WithCancel(s.ctx)
133 e342de7d 2015-11-11 adrien
134 e342de7d 2015-11-11 adrien tags[req.Tag] = &activeTag{
135 e342de7d 2015-11-11 adrien ctx: ctx,
136 e342de7d 2015-11-11 adrien request: req,
137 e342de7d 2015-11-11 adrien cancel: cancel,
138 e342de7d 2015-11-11 adrien }
139 e342de7d 2015-11-11 adrien
140 e342de7d 2015-11-11 adrien var resp *Fcall
141 e342de7d 2015-11-11 adrien msg, err := s.handler.Handle(ctx, req.Message)
142 e342de7d 2015-11-11 adrien if err != nil {
143 e342de7d 2015-11-11 adrien // all handler errors are forwarded as protocol errors.
144 e342de7d 2015-11-11 adrien resp = newErrorFcall(req.Tag, err)
145 e342de7d 2015-11-11 adrien } else {
146 e342de7d 2015-11-11 adrien resp = newFcall(msg)
147 e342de7d 2015-11-11 adrien }
148 e342de7d 2015-11-11 adrien resp.Tag = req.Tag
149 e342de7d 2015-11-11 adrien
150 e342de7d 2015-11-11 adrien if err := ctx.Err(); err != nil {
151 e342de7d 2015-11-11 adrien // NOTE(stevvooe): We aren't really getting our moneys worth for
152 e342de7d 2015-11-11 adrien // how this is being handled. We really need to dispatch each
153 e342de7d 2015-11-11 adrien // request handler to a separate thread.
154 e342de7d 2015-11-11 adrien
155 e342de7d 2015-11-11 adrien // the context was canceled for some reason, perhaps timeout or
156 e342de7d 2015-11-11 adrien // due to a flush call. We treat this as a condition where a
157 e342de7d 2015-11-11 adrien // response should not be sent.
158 e342de7d 2015-11-11 adrien log.Println("context error:", err)
159 e342de7d 2015-11-11 adrien continue
160 e342de7d 2015-11-11 adrien }
161 e342de7d 2015-11-11 adrien
162 e342de7d 2015-11-11 adrien if !tags[req.Tag].responded {
163 e342de7d 2015-11-11 adrien if err := s.ch.WriteFcall(ctx, resp); err != nil {
164 e342de7d 2015-11-11 adrien log.Println("server: error writing fcall:", err)
165 e342de7d 2015-11-11 adrien continue
166 e342de7d 2015-11-11 adrien }
167 e342de7d 2015-11-11 adrien }
168 e342de7d 2015-11-11 adrien
169 e342de7d 2015-11-11 adrien delete(tags, req.Tag)
170 e342de7d 2015-11-11 adrien }
171 e342de7d 2015-11-11 adrien }