Blob


1 package p9pnew
3 import (
4 "bufio"
5 "encoding/binary"
6 "fmt"
7 "io"
8 "io/ioutil"
9 "log"
10 "net"
11 "time"
13 "golang.org/x/net/context"
14 )
16 // Channel defines the operations necessary to implement a 9p message channel
17 // interface. Typically, message channels do no protocol processing except to
18 // send and receive message frames.
19 type Channel interface {
20 // ReadFcall reads one fcall frame into the provided fcall structure. The
21 // Fcall may be cleared whether there is an error or not. If the operation
22 // is successful, the contents of the fcall will be populated in the
23 // argument. ReadFcall cannot be called concurrently with other calls to
24 // ReadFcall. This both to preserve message ordering and to allow lockless
25 // buffer reusage.
26 ReadFcall(ctx context.Context, fcall *Fcall) error
28 // WriteFcall writes the provided fcall to the channel. WriteFcall cannot
29 // be called concurrently with other calls to WriteFcall.
30 WriteFcall(ctx context.Context, fcall *Fcall) error
32 // MSize returns the current msize for the channel.
33 MSize() int
35 // SetMSize sets the maximum message size for the channel. This must never
36 // be called currently with ReadFcall or WriteFcall.
37 SetMSize(msize int)
38 }
40 func NewChannel(conn net.Conn, msize int) Channel {
41 return newChannel(conn, codec9p{}, msize)
42 }
44 const (
45 defaultRWTimeout = 1 * time.Second // default read/write timeout if not set in context
46 )
48 // channel provides bidirectional protocol framing for 9p over net.Conn.
49 // Operations are not thread-safe but reads and writes may be carried out
50 // concurrently, supporting separate read and write loops.
51 //
52 // Lifecyle
53 //
54 // A connection, or message channel abstraction, has a lifecycle delineated by
55 // Tversion/Rversion request response cycles. For now, this is part of the
56 // channel itself but doesn't necessarily influence the channels state, except
57 // the msize. Visually, it might look something like this:
58 //
59 // [Established] -> [Version] -> [Session] -> [Version]---+
60 // ^ |
61 // |_________________________________|
62 //
63 // The connection is established, then we negotiate a version, run a session,
64 // then negotiate a version and so on. For most purposes, we are likely going
65 // to terminate the connection after the session but we may want to support
66 // connection pooling. Pooling may result in possible security leaks if the
67 // connections are shared among contexts, since the version is negotiated at
68 // the start of the session. To avoid this, we can actually use a "tombstone"
69 // version message which clears the server's session state without starting a
70 // new session. The next version message would then prepare the session
71 // without leaking any Fid's.
72 type channel struct {
73 conn net.Conn
74 codec Codec
75 brd *bufio.Reader
76 bwr *bufio.Writer
77 closed chan struct{}
78 msize int
79 rdbuf []byte
80 wrbuf []byte
81 }
83 func newChannel(conn net.Conn, codec Codec, msize int) *channel {
84 return &channel{
85 conn: conn,
86 codec: codec,
87 brd: bufio.NewReaderSize(conn, msize), // msize may not be optimal buffer size
88 bwr: bufio.NewWriterSize(conn, msize),
89 closed: make(chan struct{}),
90 msize: msize,
91 rdbuf: make([]byte, msize),
92 wrbuf: make([]byte, msize),
93 }
94 }
96 func (ch *channel) MSize() int {
97 return ch.msize
98 }
100 // setmsize resizes the buffers for use with a separate msize. This call must
101 // be protected by a mutex or made before passing to other goroutines.
102 func (ch *channel) SetMSize(msize int) {
103 // NOTE(stevvooe): We cannot safely resize the buffered reader and writer.
104 // Proceed assuming that original size is sufficient.
106 ch.msize = msize
107 if msize < len(ch.rdbuf) {
108 // just change the cap
109 ch.rdbuf = ch.rdbuf[:msize]
110 ch.wrbuf = ch.wrbuf[:msize]
111 return
114 ch.rdbuf = make([]byte, msize)
115 ch.wrbuf = make([]byte, msize)
118 // ReadFcall reads the next message from the channel into fcall.
119 func (ch *channel) ReadFcall(ctx context.Context, fcall *Fcall) error {
120 select {
121 case <-ctx.Done():
122 return ctx.Err()
123 case <-ch.closed:
124 return ErrClosed
125 default:
128 deadline, ok := ctx.Deadline()
129 if !ok {
130 deadline = time.Now().Add(defaultRWTimeout)
133 if err := ch.conn.SetReadDeadline(deadline); err != nil {
134 log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
137 n, err := readmsg(ch.brd, ch.rdbuf)
138 if err != nil {
139 // TODO(stevvooe): There may be more we can do here to detect partial
140 // reads. For now, we just propagate the error untouched.
141 return err
144 if n > len(ch.rdbuf) {
145 // TODO(stevvooe): Make this error detectable and respond with error
146 // message.
147 return fmt.Errorf("message large than buffer:", n)
150 // clear out the fcall
151 *fcall = Fcall{}
152 if err := ch.codec.Unmarshal(ch.rdbuf[:n], fcall); err != nil {
153 return err
155 log.Println("channel: recv", fcall)
156 return nil
159 func (ch *channel) WriteFcall(ctx context.Context, fcall *Fcall) error {
160 select {
161 case <-ctx.Done():
162 return ctx.Err()
163 case <-ch.closed:
164 return ErrClosed
165 default:
167 log.Println("channel: send", fcall)
169 deadline, ok := ctx.Deadline()
170 if !ok {
171 deadline = time.Now().Add(defaultRWTimeout)
174 if err := ch.conn.SetWriteDeadline(deadline); err != nil {
175 log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
178 n, err := ch.codec.Marshal(ch.wrbuf, fcall)
179 if err != nil {
180 return err
183 p := ch.wrbuf[:n]
185 if err := sendmsg(ch.bwr, p); err != nil {
186 return err
189 return ch.bwr.Flush()
192 // readmsg reads a 9p message into p from rd, ensuring that all bytes are
193 // consumed from the size header. If the size header indicates the message is
194 // larger than p, the entire message will be discarded, leaving a truncated
195 // portion in p. Any error should be treated as a framing error unless n is
196 // zero. The caller must check that n is less than or equal to len(p) to
197 // ensure that a valid message has been read.
198 func readmsg(rd io.Reader, p []byte) (n int, err error) {
199 var msize uint32
201 if err := binary.Read(rd, binary.LittleEndian, &msize); err != nil {
202 return 0, err
205 n += binary.Size(msize)
206 mbody := int(msize) - 4
208 if mbody < len(p) {
209 p = p[:mbody]
212 np, err := io.ReadFull(rd, p)
213 if err != nil {
214 return np + n, err
216 n += np
218 if mbody > len(p) {
219 // message has been read up to len(p) but we must consume the entire
220 // message. This is an error condition but is non-fatal if we can
221 // consume msize bytes.
222 nn, err := io.CopyN(ioutil.Discard, rd, int64(mbody-len(p)))
223 n += int(nn)
224 if err != nil {
225 return n, err
229 return n, nil
232 // sendmsg writes a message of len(p) to wr with a 9p size header. All errors
233 // should be considered terminal.
234 func sendmsg(wr io.Writer, p []byte) error {
235 size := uint32(len(p) + 4) // message size plus 4-bytes for size.
236 if err := binary.Write(wr, binary.LittleEndian, size); err != nil {
237 return nil
240 // This assume partial writes to wr aren't possible. Not sure if this
241 // valid. Matters during timeout retries.
242 if n, err := wr.Write(p); err != nil {
243 return err
244 } else if n < len(p) {
245 return io.ErrShortWrite
248 return nil