commit ceb907e462028ec71f0eff0bc477c39be2c7f440 from: Adrien Duermael date: Wed Nov 11 17:46:51 2015 UTC Putting a8b3e87a changes in pkg/p9p commit - dfdc90e8217fbe1023860c90daff5415d2bc0ab9 commit + ceb907e462028ec71f0eff0bc477c39be2c7f440 blob - 52a62823d787f4e4969c511b5a6c31cba9079b3b blob + db9416a562dc32d1b484f84c03327f59afcacb8e --- fcall.go +++ fcall.go @@ -106,9 +106,7 @@ type Fcall struct { Message Message } -func newFcall(msg Message) *Fcall { - var tag Tag - +func newFcall(tag Tag, msg Message) *Fcall { switch msg.Type() { case Tversion, Rversion: tag = NOTAG blob - 4be04253703a1b05cad060ebd8df239a38e272fb blob + a26cd69172fb7be642c1163f04d269b242745a9a --- server.go +++ server.go @@ -52,120 +52,160 @@ type server struct { closed chan struct{} } -type activeTag struct { - ctx context.Context - request *Fcall - cancel context.CancelFunc - responded bool // true, if some response was sent (Response or Rflush/Rerror) +// activeRequest includes information about the active request. +type activeRequest struct { + ctx context.Context + request *Fcall + cancel context.CancelFunc } func (s *server) run() { - tags := map[Tag]*activeTag{} // active requests + tags := map[Tag]*activeRequest{} // active requests - log.Println("server.run()") - for { - select { - case <-s.ctx.Done(): - log.Println("server: context done") - return - case <-s.closed: - log.Println("server: shutdown") - default: - } + requests := make(chan *Fcall) // sync, read-limited + responses := make(chan *Fcall) + completed := make(chan *Fcall, 1) - // BUG(stevvooe): This server blocks on reads, calls to handlers and - // write, effectively single tracking fcalls through a target - // dispatcher. There is no reason we couldn't parallelize these - // requests out to the dispatcher to get massive performance - // improvements. - - log.Println("server:", "wait") - req := new(Fcall) - if err := s.ch.ReadFcall(s.ctx, req); err != nil { - if err, ok := err.(net.Error); ok { - if err.Timeout() || err.Temporary() { - continue + // read loop + go func() { + for { + req := new(Fcall) + if err := s.ch.ReadFcall(s.ctx, req); err != nil { + if err, ok := err.(net.Error); ok { + if err.Timeout() || err.Temporary() { + continue + } } + + log.Println("server: error reading fcall", err) + return } - log.Println("server: error reading fcall", err) - return - } - - if _, ok := tags[req.Tag]; ok { - resp := newErrorFcall(req.Tag, ErrDuptag) - if err := s.ch.WriteFcall(s.ctx, resp); err != nil { - log.Printf("error sending duplicate tag response: %v", err) + select { + case requests <- req: + case <-s.ctx.Done(): + log.Println("server: context done") + return + case <-s.closed: + log.Println("server: shutdown") + return } - continue } + }() - // handle flush calls. The tag never makes it into active from here. - if mf, ok := req.Message.(MessageTflush); ok { - log.Println("flushing message", mf.Oldtag) - - // check if we have actually know about the requested flush - active, ok := tags[mf.Oldtag] - if ok { - active.cancel() // cancel the context - - resp := newFcall(MessageRflush{}) - resp.Tag = req.Tag + // write loop + go func() { + for { + select { + case resp := <-responses: if err := s.ch.WriteFcall(s.ctx, resp); err != nil { - log.Printf("error responding to flush: %v", err) + log.Println("server: error writing fcall:", err) } - active.responded = true - } else { - resp := newErrorFcall(req.Tag, ErrUnknownTag) - if err := s.ch.WriteFcall(s.ctx, resp); err != nil { - log.Printf("error responding to flush: %v", err) - } + case <-s.ctx.Done(): + log.Println("server: context done") + return + case <-s.closed: + log.Println("server: shutdown") + return } - - continue } + }() - // TODO(stevvooe): Add handler timeout here, as well, if we desire. + log.Println("server.run()") + for { + log.Println("server:", "wait") + select { + case req := <-requests: + log.Println("request", req) + if _, ok := tags[req.Tag]; ok { + select { + case responses <- newErrorFcall(req.Tag, ErrDuptag): + // Send to responses, bypass tag management. + case <-s.ctx.Done(): + return + case <-s.closed: + return + } + continue + } - // Allows us to signal handlers to cancel processing of the fcall - // through context. - ctx, cancel := context.WithCancel(s.ctx) + switch msg := req.Message.(type) { + case MessageTflush: + log.Println("server: flushing message", msg.Oldtag) - tags[req.Tag] = &activeTag{ - ctx: ctx, - request: req, - cancel: cancel, - } + var resp *Fcall + // check if we have actually know about the requested flush + active, ok := tags[msg.Oldtag] + if ok { + active.cancel() // cancel the context of oldtag + resp = newFcall(req.Tag, MessageRflush{}) + } else { + resp = newErrorFcall(req.Tag, ErrUnknownTag) + } - var resp *Fcall - msg, err := s.handler.Handle(ctx, req.Message) - if err != nil { - // all handler errors are forwarded as protocol errors. - resp = newErrorFcall(req.Tag, err) - } else { - resp = newFcall(msg) - } - resp.Tag = req.Tag + select { + case responses <- resp: + // bypass tag management in completed. + case <-s.ctx.Done(): + return + case <-s.closed: + return + } + default: + // Allows us to session handlers to cancel processing of the fcall + // through context. + ctx, cancel := context.WithCancel(s.ctx) - if err := ctx.Err(); err != nil { - // NOTE(stevvooe): We aren't really getting our moneys worth for - // how this is being handled. We really need to dispatch each - // request handler to a separate thread. + // The contents of these instances are only writable in the main + // server loop. The value of tag will not change. + tags[req.Tag] = &activeRequest{ + ctx: ctx, + request: req, + cancel: cancel, + } - // the context was canceled for some reason, perhaps timeout or - // due to a flush call. We treat this as a condition where a - // response should not be sent. - log.Println("context error:", err) - continue - } + go func(ctx context.Context, req *Fcall) { + var resp *Fcall + msg, err := s.handler.Handle(ctx, req.Message) + if err != nil { + // all handler errors are forwarded as protocol errors. + resp = newErrorFcall(req.Tag, err) + } else { + resp = newFcall(req.Tag, msg) + } - if !tags[req.Tag].responded { - if err := s.ch.WriteFcall(ctx, resp); err != nil { - log.Println("server: error writing fcall:", err) - continue + select { + case completed <- resp: + case <-ctx.Done(): + return + case <-s.closed: + return + } + }(ctx, req) } - } + case resp := <-completed: + log.Println("completed", resp) + // only responses that flip the tag state traverse this section. + active, ok := tags[resp.Tag] + if !ok { + panic("BUG: unbalanced tag") + } - delete(tags, req.Tag) + select { + case responses <- resp: + case <-active.ctx.Done(): + // the context was canceled for some reason, perhaps timeout or + // due to a flush call. We treat this as a condition where a + // response should not be sent. + log.Println("canceled", resp, active.ctx.Err()) + } + delete(tags, resp.Tag) + case <-s.ctx.Done(): + log.Println("server: context done") + return + case <-s.closed: + log.Println("server: shutdown") + return + } } } blob - 441111086f8fac97bbe5efa2a66485e1d18edb9c blob + 4bbb78b65e3074494cf8d0fb6e53d1839f628936 --- transport.go +++ transport.go @@ -43,25 +43,25 @@ func newTransport(ctx context.Context, ch *channel) ro return t } +// fcallRequest encompasses the request to send a message via fcall. type fcallRequest struct { ctx context.Context - fcall *Fcall + message Message response chan *Fcall err chan error } -func newFcallRequest(ctx context.Context, fcall *Fcall) *fcallRequest { +func newFcallRequest(ctx context.Context, msg Message) *fcallRequest { return &fcallRequest{ ctx: ctx, - fcall: fcall, + message: msg, response: make(chan *Fcall, 1), err: make(chan error, 1), } } func (t *transport) send(ctx context.Context, msg Message) (Message, error) { - fcall := newFcall(msg) - req := newFcallRequest(ctx, fcall) + req := newFcallRequest(ctx, msg) // dispatch the request. select { @@ -151,34 +151,21 @@ func (t *transport) handle() { log.Println("wait...") select { case req := <-t.requests: - if req.fcall.Tag == NOTAG { - // NOTE(stevvooe): We disallow fcalls with NOTAG to come - // through this path since we can't join the tagged response - // with the waiting caller. This is typically used for the - // Tversion/Rversion round trip to setup a session. - // - // It may be better to allow these through but block all - // requests until a notag message has a response. - - req.err <- fmt.Errorf("disallowed tag through transport") - continue - } - // BUG(stevvooe): This is an awful tag allocation procedure. // Replace this with something that let's us allocate tags and // associate data with them, returning to them to a pool when // complete. Such a system would provide a lot of information // about outstanding requests. tags++ - req.fcall.Tag = tags - outstanding[req.fcall.Tag] = req + fcall := newFcall(tags, req.message) + outstanding[fcall.Tag] = req // TODO(stevvooe): Consider the case of requests that never // receive a response. We need to remove the fcall context from // the tag map and dealloc the tag. We may also want to send a // flush for the tag. - if err := t.ch.WriteFcall(req.ctx, req.fcall); err != nil { - delete(outstanding, req.fcall.Tag) + if err := t.ch.WriteFcall(req.ctx, fcall); err != nil { + delete(outstanding, fcall.Tag) req.err <- err } case b := <-responses: @@ -186,8 +173,12 @@ func (t *transport) handle() { if !ok { panic("unknown tag received") } - delete(outstanding, req.fcall.Tag) + // BUG(stevvooe): Must detect duplicate tag and ensure that we are + // waking up the right caller. If a duplicate is received, the + // entry should not be deleted. + delete(outstanding, b.Tag) + req.response <- b // TODO(stevvooe): Reclaim tag id. blob - d23b4aae4ce948d568c577af4bbea35ecfa92f74 blob + ac20fb9dfd45e42713513aee43a5ad975d41c0cc --- version.go +++ version.go @@ -16,7 +16,7 @@ import ( // until a response is received. The received value will be the version // implemented by the server. func clientnegotiate(ctx context.Context, ch Channel, version string) (string, error) { - req := newFcall(MessageTversion{ + req := newFcall(NOTAG, MessageTversion{ MSize: uint32(ch.MSize()), Version: version, }) @@ -93,7 +93,7 @@ func servernegotiate(ctx context.Context, ch Channel, respmsg.MSize = uint32(ch.MSize()) } - resp := newFcall(respmsg) + resp := newFcall(NOTAG, respmsg) if err := ch.WriteFcall(ctx, resp); err != nil { return err }