Commit Diff


commit - 6350d27908b0b1a459b7208b9fae7b274384efe4
commit + 3bf22e5860ff26ad641dc222b8121a2bcae316c7
blob - a9453687ff9c6bcaec2512db06f1fbd224dd936a
blob + c74ca513e2ac3883f696294c0d4f0852bcc092f6
--- channel.go
+++ channel.go
@@ -29,11 +29,18 @@ type Channel interface {
 	// be called concurrently with other calls to WriteFcall.
 	WriteFcall(ctx context.Context, fcall *Fcall) error
 
+	// MSize returns the current msize for the channel.
+	MSize() int
+
 	// SetMSize sets the maximum message size for the channel. This must never
 	// be called currently with ReadFcall or WriteFcall.
 	SetMSize(msize int)
 }
 
+func NewChannel(conn net.Conn, msize int) Channel {
+	return newChannel(conn, codec9p{}, msize)
+}
+
 const (
 	defaultRWTimeout = 1 * time.Second // default read/write timeout if not set in context
 )
@@ -86,9 +93,13 @@ func newChannel(conn net.Conn, codec Codec, msize int)
 	}
 }
 
+func (ch *channel) MSize() int {
+	return ch.msize
+}
+
 // setmsize resizes the buffers for use with a separate msize. This call must
 // be protected by a mutex or made before passing to other goroutines.
-func (ch *channel) setmsize(msize int) {
+func (ch *channel) SetMSize(msize int) {
 	// NOTE(stevvooe): We cannot safely resize the buffered reader and writer.
 	// Proceed assuming that original size is sufficient.
 
@@ -104,88 +115,8 @@ func (ch *channel) setmsize(msize int) {
 	ch.wrbuf = make([]byte, msize)
 }
 
-// version negiotiates the protocol version using channel, blocking until a
-// response is received. The received values can be used to set msize for the
-// channel or assist in client setup.
-func (ch *channel) version(ctx context.Context, msize uint32, version string) (uint32, string, error) {
-	req := newFcall(MessageTversion{
-		MSize:   uint32(msize),
-		Version: version,
-	})
-
-	if err := ch.writeFcall(ctx, req); err != nil {
-		return 0, "", err
-	}
-
-	resp := new(Fcall)
-	if err := ch.readFcall(ctx, resp); err != nil {
-		return 0, "", err
-	}
-
-	mv, ok := resp.Message.(*MessageRversion)
-	if !ok {
-		return 0, "", fmt.Errorf("invalid rpc response for version message: %v", resp)
-	}
-
-	return mv.MSize, mv.Version, nil
-}
-
-// negotiate blocks until a version message is received or a timeout occurs.
-// The msize for the tranport will be set from the negotiation. If negotiate
-// returns nil, a server may proceed with the connection.
-//
-// In the future, it might be better to handle the version messages in a
-// separate object that manages the session. Each set of version requests
-// effectively "reset" a connection, meaning all fids get clunked and all
-// outstanding IO is aborted. This is probably slightly racy, in practice with
-// a misbehaved client. The main issue is that we cannot tell which session
-// messages belong to.
-func (ch *channel) negotiate(ctx context.Context, version string) error {
-	// wait for the version message over the transport.
-	req := new(Fcall)
-	if err := ch.readFcall(ctx, req); err != nil {
-		return err
-	}
-
-	mv, ok := req.Message.(*MessageTversion)
-	if !ok {
-		return fmt.Errorf("expected version message: %v", mv)
-	}
-
-	respmsg := MessageRversion{
-		Version: version,
-	}
-
-	if mv.Version != version {
-		// TODO(stevvooe): Not the best place to do version handling. We need
-		// to have a way to pass supported versions into this method then have
-		// it return the actual version. For now, respond with unknown for
-		// anything that doesn't match the provided version string.
-		respmsg.Version = "unknown"
-	}
-
-	if int(mv.MSize) < ch.msize {
-		// if the server msize is too large, use the client's suggested msize.
-		ch.setmsize(int(mv.MSize))
-		respmsg.MSize = mv.MSize
-	} else {
-		respmsg.MSize = uint32(ch.msize)
-	}
-
-	resp := newFcall(respmsg)
-	if err := ch.writeFcall(ctx, resp); err != nil {
-		return err
-	}
-
-	if respmsg.Version == "unknown" {
-		return fmt.Errorf("bad version negotiation")
-	}
-
-	return nil
-}
-
 // ReadFcall reads the next message from the channel into fcall.
-func (ch *channel) readFcall(ctx context.Context, fcall *Fcall) error {
+func (ch *channel) ReadFcall(ctx context.Context, fcall *Fcall) error {
 	select {
 	case <-ctx.Done():
 		return ctx.Err()
@@ -225,7 +156,7 @@ func (ch *channel) readFcall(ctx context.Context, fcal
 	return nil
 }
 
-func (ch *channel) writeFcall(ctx context.Context, fcall *Fcall) error {
+func (ch *channel) WriteFcall(ctx context.Context, fcall *Fcall) error {
 	select {
 	case <-ctx.Done():
 		return ctx.Err()
blob - 9e1f5f447816c2a1ba015bc2513c4dc1638d1d4f
blob + a1b80f10d12ed6dc1acfecf66b41eb22345519e2
--- client.go
+++ client.go
@@ -26,21 +26,11 @@ func NewSession(ctx context.Context, conn net.Conn) (S
 	ch := newChannel(conn, codec9p{}, msize) // sets msize, effectively.
 
 	// negotiate the protocol version
-	smsize, svers, err := ch.version(ctx, msize, vers)
+	_, err := clientnegotiate(ctx, ch, vers)
 	if err != nil {
 		return nil, err
 	}
 
-	if svers != vers {
-		// TODO(stevvooe): A stubborn client indeed!
-		return nil, fmt.Errorf("unsupported server version: %v", vers)
-	}
-
-	if smsize > msize {
-		// upgrade msize if server differs.
-		ch.setmsize(int(smsize))
-	}
-
 	return &client{
 		version:   vers,
 		msize:     msize,
blob - cb90f4c988eb50b54f9e78cd447dbe74b543727c
blob + 1c2cb5811dd66ef98bf1501ca06ad8ce6b6f6d8a
--- server.go
+++ server.go
@@ -22,7 +22,7 @@ func Serve(ctx context.Context, conn net.Conn, session
 	// do this outside of this function and then pass in a ready made channel.
 	// We are not really ready to export the channel type yet.
 
-	if err := ch.negotiate(negctx, vers); err != nil {
+	if err := servernegotiate(negctx, ch, vers); err != nil {
 		// TODO(stevvooe): Need better error handling and retry support here.
 		// For now, we silently ignore the failure.
 		log.Println("error negotiating version:", err)
@@ -42,7 +42,7 @@ func Serve(ctx context.Context, conn net.Conn, session
 type server struct {
 	ctx     context.Context
 	session Session
-	ch      *channel
+	ch      Channel
 	handler handler
 	closed  chan struct{}
 }
@@ -75,14 +75,14 @@ func (s *server) run() {
 
 		log.Println("server:", "wait")
 		req := new(Fcall)
-		if err := s.ch.readFcall(s.ctx, req); err != nil {
+		if err := s.ch.ReadFcall(s.ctx, req); err != nil {
 			log.Println("server: error reading fcall", err)
 			continue
 		}
 
 		if _, ok := tags[req.Tag]; ok {
 			resp := newErrorFcall(req.Tag, ErrDuptag)
-			if err := s.ch.writeFcall(s.ctx, resp); err != nil {
+			if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
 				log.Printf("error sending duplicate tag response: %v", err)
 			}
 			continue
@@ -99,13 +99,13 @@ func (s *server) run() {
 
 				resp := newFcall(MessageRflush{})
 				resp.Tag = req.Tag
-				if err := s.ch.writeFcall(s.ctx, resp); err != nil {
+				if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
 					log.Printf("error responding to flush: %v", err)
 				}
 				active.responded = true
 			} else {
 				resp := newErrorFcall(req.Tag, ErrUnknownTag)
-				if err := s.ch.writeFcall(s.ctx, resp); err != nil {
+				if err := s.ch.WriteFcall(s.ctx, resp); err != nil {
 					log.Printf("error responding to flush: %v", err)
 				}
 			}
@@ -145,7 +145,7 @@ func (s *server) run() {
 		}
 
 		if !tags[req.Tag].responded {
-			if err := s.ch.writeFcall(ctx, resp); err != nil {
+			if err := s.ch.WriteFcall(ctx, resp); err != nil {
 				log.Println("server: error writing fcall:", err)
 				continue
 			}
blob - b7b890057006eb9f329f5e4fbc253f9e15b20fce
blob + 3caf076db643d3f72ce17f7e3d2b78ebcb38c588
--- transport.go
+++ transport.go
@@ -21,7 +21,7 @@ type roundTripper interface {
 // send. On the whole, transport is thread-safe for calling send
 type transport struct {
 	ctx      context.Context
-	ch       *channel
+	ch       Channel
 	requests chan *fcallRequest
 	closed   chan struct{}
 
@@ -118,7 +118,7 @@ func (t *transport) handle() {
 	loop:
 		for {
 			fcall := new(Fcall)
-			if err := t.ch.readFcall(t.ctx, fcall); err != nil {
+			if err := t.ch.ReadFcall(t.ctx, fcall); err != nil {
 				switch err := err.(type) {
 				case net.Error:
 					if err.Timeout() || err.Temporary() {
@@ -177,7 +177,7 @@ func (t *transport) handle() {
 			// receive a response. We need to remove the fcall context from
 			// the tag map and dealloc the tag. We may also want to send a
 			// flush for the tag.
-			if err := t.ch.writeFcall(req.ctx, req.fcall); err != nil {
+			if err := t.ch.WriteFcall(req.ctx, req.fcall); err != nil {
 				log.Println("error writing fcall", err, req.fcall)
 				delete(outstanding, req.fcall.Tag)
 				req.err <- err
blob - /dev/null
blob + c4c7133658b6c3bd3b428135ca0b61e56b00b033 (mode 644)
--- /dev/null
+++ version.go
@@ -0,0 +1,106 @@
+package p9pnew
+
+import (
+	"fmt"
+
+	"golang.org/x/net/context"
+)
+
+// NOTE(stevvooe): This file contains functions for negotiating version on the
+// client and server. There are some nasty details to get right for
+// downgrading the connection on the server-side that are not present yet.
+// Really, these should be refactored into some sort of channel type that can
+// support resets through version messages during the protocol exchange.
+
+// clientnegotiate negiotiates the protocol version using channel, blocking
+// until a response is received. The received value will be the version
+// implemented by the server.
+func clientnegotiate(ctx context.Context, ch Channel, version string) (string, error) {
+	req := newFcall(MessageTversion{
+		MSize:   uint32(ch.MSize()),
+		Version: version,
+	})
+
+	if err := ch.WriteFcall(ctx, req); err != nil {
+		return "", err
+	}
+
+	resp := new(Fcall)
+	if err := ch.ReadFcall(ctx, resp); err != nil {
+		return "", err
+	}
+
+	switch v := resp.Message.(type) {
+	case *MessageRversion:
+
+		if v.Version != version {
+			// TODO(stevvooe): A stubborn client indeed!
+			return "", fmt.Errorf("unsupported server version: %v", version)
+		}
+
+		if int(v.MSize) > ch.MSize() {
+			// upgrade msize if server differs.
+			ch.SetMSize(int(v.MSize))
+		}
+
+		return v.Version, nil
+	case error:
+		return "", v
+	default:
+		return "", fmt.Errorf("invalid rpc response for version message: %v", resp)
+	}
+}
+
+// servernegotiate blocks until a version message is received or a timeout
+// occurs. The msize for the tranport will be set from the negotiation. If
+// negotiate returns nil, a server may proceed with the connection.
+//
+// In the future, it might be better to handle the version messages in a
+// separate object that manages the session. Each set of version requests
+// effectively "reset" a connection, meaning all fids get clunked and all
+// outstanding IO is aborted. This is probably slightly racy, in practice with
+// a misbehaved client. The main issue is that we cannot tell which session
+// messages belong to.
+func servernegotiate(ctx context.Context, ch Channel, version string) error {
+	// wait for the version message over the transport.
+	req := new(Fcall)
+	if err := ch.ReadFcall(ctx, req); err != nil {
+		return err
+	}
+
+	mv, ok := req.Message.(*MessageTversion)
+	if !ok {
+		return fmt.Errorf("expected version message: %v", mv)
+	}
+
+	respmsg := MessageRversion{
+		Version: version,
+	}
+
+	if mv.Version != version {
+		// TODO(stevvooe): Not the best place to do version handling. We need
+		// to have a way to pass supported versions into this method then have
+		// it return the actual version. For now, respond with unknown for
+		// anything that doesn't match the provided version string.
+		respmsg.Version = "unknown"
+	}
+
+	if int(mv.MSize) < ch.MSize() {
+		// if the server msize is too large, use the client's suggested msize.
+		ch.SetMSize(int(mv.MSize))
+		respmsg.MSize = mv.MSize
+	} else {
+		respmsg.MSize = uint32(ch.MSize())
+	}
+
+	resp := newFcall(respmsg)
+	if err := ch.WriteFcall(ctx, resp); err != nil {
+		return err
+	}
+
+	if respmsg.Version == "unknown" {
+		return fmt.Errorf("bad version negotiation")
+	}
+
+	return nil
+}