Commit Diff


commit - dfdc90e8217fbe1023860c90daff5415d2bc0ab9
commit + ceb907e462028ec71f0eff0bc477c39be2c7f440
blob - 52a62823d787f4e4969c511b5a6c31cba9079b3b
blob + db9416a562dc32d1b484f84c03327f59afcacb8e
--- fcall.go
+++ fcall.go
@@ -106,9 +106,7 @@ type Fcall struct {
 	Message Message
 }
 
-func newFcall(msg Message) *Fcall {
-	var tag Tag
-
+func newFcall(tag Tag, msg Message) *Fcall {
 	switch msg.Type() {
 	case Tversion, Rversion:
 		tag = NOTAG
blob - 4be04253703a1b05cad060ebd8df239a38e272fb
blob + a26cd69172fb7be642c1163f04d269b242745a9a
--- server.go
+++ server.go
@@ -52,120 +52,160 @@ type server struct {
 	closed  chan struct{}
 }
 
-type activeTag struct {
-	ctx       context.Context
-	request   *Fcall
-	cancel    context.CancelFunc
-	responded bool // true, if some response was sent (Response or Rflush/Rerror)
+// activeRequest includes information about the active request.
+type activeRequest struct {
+	ctx     context.Context
+	request *Fcall
+	cancel  context.CancelFunc
 }
 
 func (s *server) run() {
-	tags := map[Tag]*activeTag{} // active requests
+	tags := map[Tag]*activeRequest{} // active requests
 
-	log.Println("server.run()")
-	for {
-		select {
-		case <-s.ctx.Done():
-			log.Println("server: context done")
-			return
-		case <-s.closed:
-			log.Println("server: shutdown")
-		default:
-		}
+	requests := make(chan *Fcall) // sync, read-limited
+	responses := make(chan *Fcall)
+	completed := make(chan *Fcall, 1)
 
-		// BUG(stevvooe): This server blocks on reads, calls to handlers and
-		// write, effectively single tracking fcalls through a target
-		// dispatcher. There is no reason we couldn't parallelize these
-		// requests out to the dispatcher to get massive performance
-		// improvements.
-
-		log.Println("server:", "wait")
-		req := new(Fcall)
-		if err := s.ch.ReadFcall(s.ctx, req); err != nil {
-			if err, ok := err.(net.Error); ok {
-				if err.Timeout() || err.Temporary() {
-					continue
+	// read loop
+	go func() {
+		for {
+			req := new(Fcall)
+			if err := s.ch.ReadFcall(s.ctx, req); err != nil {
+				if err, ok := err.(net.Error); ok {
+					if err.Timeout() || err.Temporary() {
+						continue
+					}
 				}
+
+				log.Println("server: error reading fcall", err)
+				return
 			}
 
-			log.Println("server: error reading fcall", err)
-			return
-		}
-
-		if _, ok := tags[req.Tag]; ok {
-			resp := newErrorFcall(req.Tag, ErrDuptag)
-			if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
-				log.Printf("error sending duplicate tag response: %v", err)
+			select {
+			case requests <- req:
+			case <-s.ctx.Done():
+				log.Println("server: context done")
+				return
+			case <-s.closed:
+				log.Println("server: shutdown")
+				return
 			}
-			continue
 		}
+	}()
 
-		// handle flush calls. The tag never makes it into active from here.
-		if mf, ok := req.Message.(MessageTflush); ok {
-			log.Println("flushing message", mf.Oldtag)
-
-			// check if we have actually know about the requested flush
-			active, ok := tags[mf.Oldtag]
-			if ok {
-				active.cancel() // cancel the context
-
-				resp := newFcall(MessageRflush{})
-				resp.Tag = req.Tag
+	// write loop
+	go func() {
+		for {
+			select {
+			case resp := <-responses:
 				if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
-					log.Printf("error responding to flush: %v", err)
+					log.Println("server: error writing fcall:", err)
 				}
-				active.responded = true
-			} else {
-				resp := newErrorFcall(req.Tag, ErrUnknownTag)
-				if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
-					log.Printf("error responding to flush: %v", err)
-				}
+			case <-s.ctx.Done():
+				log.Println("server: context done")
+				return
+			case <-s.closed:
+				log.Println("server: shutdown")
+				return
 			}
-
-			continue
 		}
+	}()
 
-		// TODO(stevvooe): Add handler timeout here, as well, if we desire.
+	log.Println("server.run()")
+	for {
+		log.Println("server:", "wait")
+		select {
+		case req := <-requests:
+			log.Println("request", req)
+			if _, ok := tags[req.Tag]; ok {
+				select {
+				case responses <- newErrorFcall(req.Tag, ErrDuptag):
+					// Send to responses, bypass tag management.
+				case <-s.ctx.Done():
+					return
+				case <-s.closed:
+					return
+				}
+				continue
+			}
 
-		// Allows us to signal handlers to cancel processing of the fcall
-		// through context.
-		ctx, cancel := context.WithCancel(s.ctx)
+			switch msg := req.Message.(type) {
+			case MessageTflush:
+				log.Println("server: flushing message", msg.Oldtag)
 
-		tags[req.Tag] = &activeTag{
-			ctx:     ctx,
-			request: req,
-			cancel:  cancel,
-		}
+				var resp *Fcall
+				// check if we have actually know about the requested flush
+				active, ok := tags[msg.Oldtag]
+				if ok {
+					active.cancel() // cancel the context of oldtag
+					resp = newFcall(req.Tag, MessageRflush{})
+				} else {
+					resp = newErrorFcall(req.Tag, ErrUnknownTag)
+				}
 
-		var resp *Fcall
-		msg, err := s.handler.Handle(ctx, req.Message)
-		if err != nil {
-			// all handler errors are forwarded as protocol errors.
-			resp = newErrorFcall(req.Tag, err)
-		} else {
-			resp = newFcall(msg)
-		}
-		resp.Tag = req.Tag
+				select {
+				case responses <- resp:
+					// bypass tag management in completed.
+				case <-s.ctx.Done():
+					return
+				case <-s.closed:
+					return
+				}
+			default:
+				// Allows us to session handlers to cancel processing of the fcall
+				// through context.
+				ctx, cancel := context.WithCancel(s.ctx)
 
-		if err := ctx.Err(); err != nil {
-			// NOTE(stevvooe): We aren't really getting our moneys worth for
-			// how this is being handled. We really need to dispatch each
-			// request handler to a separate thread.
+				// The contents of these instances are only writable in the main
+				// server loop. The value of tag will not change.
+				tags[req.Tag] = &activeRequest{
+					ctx:     ctx,
+					request: req,
+					cancel:  cancel,
+				}
 
-			// the context was canceled for some reason, perhaps timeout or
-			// due to a flush call. We treat this as a condition where a
-			// response should not be sent.
-			log.Println("context error:", err)
-			continue
-		}
+				go func(ctx context.Context, req *Fcall) {
+					var resp *Fcall
+					msg, err := s.handler.Handle(ctx, req.Message)
+					if err != nil {
+						// all handler errors are forwarded as protocol errors.
+						resp = newErrorFcall(req.Tag, err)
+					} else {
+						resp = newFcall(req.Tag, msg)
+					}
 
-		if !tags[req.Tag].responded {
-			if err := s.ch.WriteFcall(ctx, resp); err != nil {
-				log.Println("server: error writing fcall:", err)
-				continue
+					select {
+					case completed <- resp:
+					case <-ctx.Done():
+						return
+					case <-s.closed:
+						return
+					}
+				}(ctx, req)
 			}
-		}
+		case resp := <-completed:
+			log.Println("completed", resp)
+			// only responses that flip the tag state traverse this section.
+			active, ok := tags[resp.Tag]
+			if !ok {
+				panic("BUG: unbalanced tag")
+			}
 
-		delete(tags, req.Tag)
+			select {
+			case responses <- resp:
+			case <-active.ctx.Done():
+				// the context was canceled for some reason, perhaps timeout or
+				// due to a flush call. We treat this as a condition where a
+				// response should not be sent.
+				log.Println("canceled", resp, active.ctx.Err())
+			}
+			delete(tags, resp.Tag)
+		case <-s.ctx.Done():
+			log.Println("server: context done")
+			return
+		case <-s.closed:
+			log.Println("server: shutdown")
+			return
+		}
 	}
 }
blob - 441111086f8fac97bbe5efa2a66485e1d18edb9c
blob + 4bbb78b65e3074494cf8d0fb6e53d1839f628936
--- transport.go
+++ transport.go
@@ -43,25 +43,25 @@ func newTransport(ctx context.Context, ch *channel) ro
 	return t
 }
 
+// fcallRequest encompasses the request to send a message via fcall.
 type fcallRequest struct {
 	ctx      context.Context
-	fcall    *Fcall
+	message  Message
 	response chan *Fcall
 	err      chan error
 }
 
-func newFcallRequest(ctx context.Context, fcall *Fcall) *fcallRequest {
+func newFcallRequest(ctx context.Context, msg Message) *fcallRequest {
 	return &fcallRequest{
 		ctx:      ctx,
-		fcall:    fcall,
+		message:  msg,
 		response: make(chan *Fcall, 1),
 		err:      make(chan error, 1),
 	}
 }
 
 func (t *transport) send(ctx context.Context, msg Message) (Message, error) {
-	fcall := newFcall(msg)
-	req := newFcallRequest(ctx, fcall)
+	req := newFcallRequest(ctx, msg)
 
 	// dispatch the request.
 	select {
@@ -151,34 +151,21 @@ func (t *transport) handle() {
 		log.Println("wait...")
 		select {
 		case req := <-t.requests:
-			if req.fcall.Tag == NOTAG {
-				// NOTE(stevvooe): We disallow fcalls with NOTAG to come
-				// through this path since we can't join the tagged response
-				// with the waiting caller. This is typically used for the
-				// Tversion/Rversion round trip to setup a session.
-				//
-				// It may be better to allow these through but block all
-				// requests until a notag message has a response.
-
-				req.err <- fmt.Errorf("disallowed tag through transport")
-				continue
-			}
-
 			// BUG(stevvooe): This is an awful tag allocation procedure.
 			// Replace this with something that let's us allocate tags and
 			// associate data with them, returning to them to a pool when
 			// complete. Such a system would provide a lot of information
 			// about outstanding requests.
 			tags++
-			req.fcall.Tag = tags
-			outstanding[req.fcall.Tag] = req
+			fcall := newFcall(tags, req.message)
+			outstanding[fcall.Tag] = req
 
 			// TODO(stevvooe): Consider the case of requests that never
 			// receive a response. We need to remove the fcall context from
 			// the tag map and dealloc the tag. We may also want to send a
 			// flush for the tag.
-			if err := t.ch.WriteFcall(req.ctx, req.fcall); err != nil {
-				delete(outstanding, req.fcall.Tag)
+			if err := t.ch.WriteFcall(req.ctx, fcall); err != nil {
+				delete(outstanding, fcall.Tag)
 				req.err <- err
 			}
 		case b := <-responses:
@@ -186,8 +173,12 @@ func (t *transport) handle() {
 			if !ok {
 				panic("unknown tag received")
 			}
-			delete(outstanding, req.fcall.Tag)
 
+			// BUG(stevvooe): Must detect duplicate tag and ensure that we are
+			// waking up the right caller. If a duplicate is received, the
+			// entry should not be deleted.
+			delete(outstanding, b.Tag)
+
 			req.response <- b
 
 			// TODO(stevvooe): Reclaim tag id.
blob - d23b4aae4ce948d568c577af4bbea35ecfa92f74
blob + ac20fb9dfd45e42713513aee43a5ad975d41c0cc
--- version.go
+++ version.go
@@ -16,7 +16,7 @@ import (
 // until a response is received. The received value will be the version
 // implemented by the server.
 func clientnegotiate(ctx context.Context, ch Channel, version string) (string, error) {
-	req := newFcall(MessageTversion{
+	req := newFcall(NOTAG, MessageTversion{
 		MSize:   uint32(ch.MSize()),
 		Version: version,
 	})
@@ -93,7 +93,7 @@ func servernegotiate(ctx context.Context, ch Channel, 
 		respmsg.MSize = uint32(ch.MSize())
 	}
 
-	resp := newFcall(respmsg)
+	resp := newFcall(NOTAG, respmsg)
 	if err := ch.WriteFcall(ctx, resp); err != nil {
 		return err
 	}