commit b9021eb7ca84c51453365c9814d3a04afa909bb9 from: Stephen Day via: GitHub date: Mon Sep 19 22:12:41 2016 UTC Merge pull request #24 from stevvooe/refactor-transport-close transport: remove race condition on close commit - 4f41ff66898913697507fdf8ebb75d811715b81a commit + b9021eb7ca84c51453365c9814d3a04afa909bb9 blob - 8a88fa3b9a090adadc8d61c1c98894b0c917e6d6 blob + fc3f1aff6c17e0634c0806bf1be9f04524bff27c --- transport.go +++ transport.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "net" + "sync" "golang.org/x/net/context" ) @@ -24,6 +25,9 @@ type transport struct { ctx context.Context ch Channel requests chan *fcallRequest + + shutdown chan struct{} + once sync.Once // protect closure of shutdown closed chan struct{} tags uint16 @@ -36,6 +40,7 @@ func newTransport(ctx context.Context, ch Channel) rou ctx: ctx, ch: ch, requests: make(chan *fcallRequest), + shutdown: make(chan struct{}), closed: make(chan struct{}), } @@ -124,8 +129,9 @@ func allocateTag(r *fcallRequest, m map[Tag]*fcallRequ func (t *transport) handle() { defer func() { log.Println("exited handle loop") - t.Close() + close(t.closed) }() + // the following variable block are protected components owned by this thread. var ( responses = make(chan *Fcall) @@ -138,7 +144,7 @@ func (t *transport) handle() { go func() { defer func() { log.Println("exited read loop") - t.Close() + t.close() // single main loop }() loop: for { @@ -156,13 +162,11 @@ func (t *transport) handle() { } log.Println("fatal error reading msg:", err) - t.Close() return } select { case <-t.ctx.Done(): - log.Println("ctx done") return case <-t.closed: log.Println("transport closed") @@ -213,10 +217,10 @@ func (t *transport) handle() { req.response <- b // TODO(stevvooe): Reclaim tag id. + case <-t.shutdown: + return case <-t.ctx.Done(): return - case <-t.closed: - return } } } @@ -228,14 +232,19 @@ func (t *transport) flush(ctx context.Context, tag Tag } func (t *transport) Close() error { + t.close() + select { case <-t.closed: - return ErrClosed + return nil case <-t.ctx.Done(): return t.ctx.Err() - default: - close(t.closed) } +} - return nil +// close starts the shutdown process. +func (t *transport) close() { + t.once.Do(func() { + close(t.shutdown) + }) }