commit - 308039809be9bdd39a0971486ecb769afaa93b25
commit + c3af85271ab1388caa2b948e638b8e83446d75d4
blob - 24d78b3be3d6009581e39ed6283df08d6e6dd118
blob + 5e305f87cbefdf120171822a73dd51b8fd2e13ac
--- server.go
+++ server.go
func (c *conn) serve() error {
tags := map[Tag]*activeRequest{} // active requests
- requests := make(chan *Fcall) // sync, read-limited
- responses := make(chan *Fcall)
- completed := make(chan *Fcall, 1)
+ requests := make(chan *Fcall) // sync, read-limited
+ responses := make(chan *Fcall) // sync, goroutine consumed
+ completed := make(chan *Fcall) // sync, send in goroutine per request
// read loop
go c.read(requests)
// check if we have actually know about the requested flush
active, ok := tags[msg.Oldtag]
if ok {
- active.cancel() // cancel the context of oldtag
+ active.cancel() // propagate cancellation to callees
+ delete(tags, msg.Oldtag)
resp = newFcall(req.Tag, MessageRflush{})
} else {
resp = newErrorFcall(req.Tag, ErrUnknownTag)
// only responses that flip the tag state traverse this section.
active, ok := tags[resp.Tag]
if !ok {
- panic("BUG: unbalanced tag")
+ // The tag is no longer active. Likely a flushed message.
+ continue
}
select {