Commit Diff


commit - 308039809be9bdd39a0971486ecb769afaa93b25
commit + c3af85271ab1388caa2b948e638b8e83446d75d4
blob - 24d78b3be3d6009581e39ed6283df08d6e6dd118
blob + 5e305f87cbefdf120171822a73dd51b8fd2e13ac
--- server.go
+++ server.go
@@ -68,9 +68,9 @@ type activeRequest struct {
 func (c *conn) serve() error {
 	tags := map[Tag]*activeRequest{} // active requests
 
-	requests := make(chan *Fcall) // sync, read-limited
-	responses := make(chan *Fcall)
-	completed := make(chan *Fcall, 1)
+	requests := make(chan *Fcall)  // sync, read-limited
+	responses := make(chan *Fcall) // sync, goroutine consumed
+	completed := make(chan *Fcall) // sync, send in goroutine per request
 
 	// read loop
 	go c.read(requests)
@@ -100,7 +100,8 @@ func (c *conn) serve() error {
 				// check if we have actually know about the requested flush
 				active, ok := tags[msg.Oldtag]
 				if ok {
-					active.cancel() // cancel the context of oldtag
+					active.cancel() // propagate cancellation to callees
+					delete(tags, msg.Oldtag)
 					resp = newFcall(req.Tag, MessageRflush{})
 				} else {
 					resp = newErrorFcall(req.Tag, ErrUnknownTag)
@@ -150,7 +151,8 @@ func (c *conn) serve() error {
 			// only responses that flip the tag state traverse this section.
 			active, ok := tags[resp.Tag]
 			if !ok {
-				panic("BUG: unbalanced tag")
+				// The tag is no longer active. Likely a flushed message.
+				continue
 			}
 
 			select {