commit 6b62df4706f86d9f1d38e2dcc363d440561d8c8d from: Stephen Day date: Fri Dec 04 22:55:59 2015 UTC Merge pull request #1 from stevvooe/flush-race-fix p9p: address race condition in flush response commit - 308039809be9bdd39a0971486ecb769afaa93b25 commit + 6b62df4706f86d9f1d38e2dcc363d440561d8c8d blob - 24d78b3be3d6009581e39ed6283df08d6e6dd118 blob + 5e305f87cbefdf120171822a73dd51b8fd2e13ac --- server.go +++ server.go @@ -68,9 +68,9 @@ type activeRequest struct { func (c *conn) serve() error { tags := map[Tag]*activeRequest{} // active requests - requests := make(chan *Fcall) // sync, read-limited - responses := make(chan *Fcall) - completed := make(chan *Fcall, 1) + requests := make(chan *Fcall) // sync, read-limited + responses := make(chan *Fcall) // sync, goroutine consumed + completed := make(chan *Fcall) // sync, send in goroutine per request // read loop go c.read(requests) @@ -100,7 +100,8 @@ func (c *conn) serve() error { // check if we have actually know about the requested flush active, ok := tags[msg.Oldtag] if ok { - active.cancel() // cancel the context of oldtag + active.cancel() // propagate cancellation to callees + delete(tags, msg.Oldtag) resp = newFcall(req.Tag, MessageRflush{}) } else { resp = newErrorFcall(req.Tag, ErrUnknownTag) @@ -150,7 +151,8 @@ func (c *conn) serve() error { // only responses that flip the tag state traverse this section. active, ok := tags[resp.Tag] if !ok { - panic("BUG: unbalanced tag") + // The tag is no longer active. Likely a flushed message. + continue } select {