Commit Diff


commit - 4f41ff66898913697507fdf8ebb75d811715b81a
commit + b9021eb7ca84c51453365c9814d3a04afa909bb9
blob - 8a88fa3b9a090adadc8d61c1c98894b0c917e6d6
blob + fc3f1aff6c17e0634c0806bf1be9f04524bff27c
--- transport.go
+++ transport.go
@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"log"
 	"net"
+	"sync"
 
 	"golang.org/x/net/context"
 )
@@ -24,6 +25,9 @@ type transport struct {
 	ctx      context.Context
 	ch       Channel
 	requests chan *fcallRequest
+
+	shutdown chan struct{}
+	once     sync.Once // protect closure of shutdown
 	closed   chan struct{}
 
 	tags uint16
@@ -36,6 +40,7 @@ func newTransport(ctx context.Context, ch Channel) rou
 		ctx:      ctx,
 		ch:       ch,
 		requests: make(chan *fcallRequest),
+		shutdown: make(chan struct{}),
 		closed:   make(chan struct{}),
 	}
 
@@ -124,8 +129,9 @@ func allocateTag(r *fcallRequest, m map[Tag]*fcallRequ
 func (t *transport) handle() {
 	defer func() {
 		log.Println("exited handle loop")
-		t.Close()
+		close(t.closed)
 	}()
+
 	// the following variable block are protected components owned by this thread.
 	var (
 		responses = make(chan *Fcall)
@@ -138,7 +144,7 @@ func (t *transport) handle() {
 	go func() {
 		defer func() {
 			log.Println("exited read loop")
-			t.Close()
+			t.close() // single main loop
 		}()
 	loop:
 		for {
@@ -156,13 +162,11 @@ func (t *transport) handle() {
 				}
 
 				log.Println("fatal error reading msg:", err)
-				t.Close()
 				return
 			}
 
 			select {
 			case <-t.ctx.Done():
-				log.Println("ctx done")
 				return
 			case <-t.closed:
 				log.Println("transport closed")
@@ -213,10 +217,10 @@ func (t *transport) handle() {
 			req.response <- b
 
 			// TODO(stevvooe): Reclaim tag id.
+		case <-t.shutdown:
+			return
 		case <-t.ctx.Done():
 			return
-		case <-t.closed:
-			return
 		}
 	}
 }
@@ -228,14 +232,19 @@ func (t *transport) flush(ctx context.Context, tag Tag
 }
 
 func (t *transport) Close() error {
+	t.close()
+
 	select {
 	case <-t.closed:
-		return ErrClosed
+		return nil
 	case <-t.ctx.Done():
 		return t.ctx.Err()
-	default:
-		close(t.closed)
 	}
+}
 
-	return nil
+// close starts the shutdown process.
+func (t *transport) close() {
+	t.once.Do(func() {
+		close(t.shutdown)
+	})
 }