commit - 4f41ff66898913697507fdf8ebb75d811715b81a
commit + b9021eb7ca84c51453365c9814d3a04afa909bb9
blob - 8a88fa3b9a090adadc8d61c1c98894b0c917e6d6
blob + fc3f1aff6c17e0634c0806bf1be9f04524bff27c
--- transport.go
+++ transport.go
"fmt"
"log"
"net"
+ "sync"
"golang.org/x/net/context"
)
ctx context.Context
ch Channel
requests chan *fcallRequest
+
+ shutdown chan struct{}
+ once sync.Once // protect closure of shutdown
closed chan struct{}
tags uint16
ctx: ctx,
ch: ch,
requests: make(chan *fcallRequest),
+ shutdown: make(chan struct{}),
closed: make(chan struct{}),
}
func (t *transport) handle() {
defer func() {
log.Println("exited handle loop")
- t.Close()
+ close(t.closed)
}()
+
// the following variable block are protected components owned by this thread.
var (
responses = make(chan *Fcall)
go func() {
defer func() {
log.Println("exited read loop")
- t.Close()
+ t.close() // single main loop
}()
loop:
for {
}
log.Println("fatal error reading msg:", err)
- t.Close()
return
}
select {
case <-t.ctx.Done():
- log.Println("ctx done")
return
case <-t.closed:
log.Println("transport closed")
req.response <- b
// TODO(stevvooe): Reclaim tag id.
+ case <-t.shutdown:
+ return
case <-t.ctx.Done():
return
- case <-t.closed:
- return
}
}
}
}
func (t *transport) Close() error {
+ t.close()
+
select {
case <-t.closed:
- return ErrClosed
+ return nil
case <-t.ctx.Done():
return t.ctx.Err()
- default:
- close(t.closed)
}
+}
- return nil
+// close starts the shutdown process.
+func (t *transport) close() {
+ t.once.Do(func() {
+ close(t.shutdown)
+ })
}