15 // channelMessageHeaderSize is the overhead for sending the size of a
16 // message on the wire.
17 channelMessageHeaderSize = 4
20 // Channel defines the operations necessary to implement a 9p message channel
21 // interface. Typically, message channels do no protocol processing except to
22 // send and receive message frames.
23 type Channel interface {
24 // ReadFcall reads one fcall frame into the provided fcall structure. The
25 // Fcall may be cleared whether there is an error or not. If the operation
26 // is successful, the contents of the fcall will be populated in the
27 // argument. ReadFcall cannot be called concurrently with other calls to
28 // ReadFcall. This both to preserve message ordering and to allow lockless
30 ReadFcall(ctx context.Context, fcall *Fcall) error
32 // WriteFcall writes the provided fcall to the channel. WriteFcall cannot
33 // be called concurrently with other calls to WriteFcall.
34 WriteFcall(ctx context.Context, fcall *Fcall) error
36 // MSize returns the current msize for the channel.
39 // SetMSize sets the maximum message size for the channel. This must never
40 // be called currently with ReadFcall or WriteFcall.
44 // NewChannel returns a new channel to read and write Fcalls with the provided
45 // connection and message size.
46 func NewChannel(conn net.Conn, msize int) Channel {
47 return newChannel(conn, codec9p{}, msize)
51 defaultRWTimeout = 30 * time.Second // default read/write timeout if not set in context
54 // channel provides bidirectional protocol framing for 9p over net.Conn.
55 // Operations are not thread-safe but reads and writes may be carried out
56 // concurrently, supporting separate read and write loops.
60 // A connection, or message channel abstraction, has a lifecycle delineated by
61 // Tversion/Rversion request response cycles. For now, this is part of the
62 // channel itself but doesn't necessarily influence the channels state, except
63 // the msize. Visually, it might look something like this:
65 // [Established] -> [Version] -> [Session] -> [Version]---+
67 // |_________________________________|
69 // The connection is established, then we negotiate a version, run a session,
70 // then negotiate a version and so on. For most purposes, we are likely going
71 // to terminate the connection after the session but we may want to support
72 // connection pooling. Pooling may result in possible security leaks if the
73 // connections are shared among contexts, since the version is negotiated at
74 // the start of the session. To avoid this, we can actually use a "tombstone"
75 // version message which clears the server's session state without starting a
76 // new session. The next version message would then prepare the session
77 // without leaking any Fid's.
88 func newChannel(conn net.Conn, codec Codec, msize int) *channel {
92 brd: bufio.NewReaderSize(conn, msize), // msize may not be optimal buffer size
93 bwr: bufio.NewWriterSize(conn, msize),
94 closed: make(chan struct{}),
96 rdbuf: make([]byte, msize),
100 func (ch *channel) MSize() int {
104 // setmsize resizes the buffers for use with a separate msize. This call must
105 // be protected by a mutex or made before passing to other goroutines.
106 func (ch *channel) SetMSize(msize int) {
107 // NOTE(stevvooe): We cannot safely resize the buffered reader and writer.
108 // Proceed assuming that original size is sufficient.
111 if msize < len(ch.rdbuf) {
112 // just change the cap
113 ch.rdbuf = ch.rdbuf[:msize]
117 ch.rdbuf = make([]byte, msize)
120 // ReadFcall reads the next message from the channel into fcall.
122 // If the incoming message overflows the msize, Overflow(err) will return
123 // nonzero with the number of bytes overflowed.
124 func (ch *channel) ReadFcall(ctx context.Context, fcall *Fcall) error {
133 deadline, ok := ctx.Deadline()
135 deadline = time.Now().Add(defaultRWTimeout)
138 if err := ch.conn.SetReadDeadline(deadline); err != nil {
139 log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
142 n, err := readmsg(ch.brd, ch.rdbuf)
144 // TODO(stevvooe): There may be more we can do here to detect partial
145 // reads. For now, we just propagate the error untouched.
149 if n > len(ch.rdbuf) {
150 return overflowErr{size: n - len(ch.rdbuf)}
153 // clear out the fcall
155 if err := ch.codec.Unmarshal(ch.rdbuf[:n], fcall); err != nil {
159 if err := ch.maybeTruncate(fcall); err != nil {
166 // WriteFcall writes the message to the connection.
168 // If a message destined for the wire will overflow MSize, an Overflow error
169 // may be returned. For Twrite calls, the buffer will simply be truncated to
170 // the optimal msize, with the caller detecting this condition with
172 func (ch *channel) WriteFcall(ctx context.Context, fcall *Fcall) error {
181 deadline, ok := ctx.Deadline()
183 deadline = time.Now().Add(defaultRWTimeout)
186 if err := ch.conn.SetWriteDeadline(deadline); err != nil {
187 log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
190 if err := ch.maybeTruncate(fcall); err != nil {
194 p, err := ch.codec.Marshal(fcall)
199 if err := sendmsg(ch.bwr, p); err != nil {
203 return ch.bwr.Flush()
206 // maybeTruncate will truncate the message to fit into msize on the wire, if
207 // possible, or modify the message to ensure the response won't overflow.
209 // If the message cannot be truncated, an error will be returned and the
210 // message should not be sent.
212 // A nil return value means the message can be sent without
213 func (ch *channel) maybeTruncate(fcall *Fcall) error {
215 // for certain message types, just remove the extra bytes from the data portion.
216 switch msg := fcall.Message.(type) {
217 // TODO(stevvooe): There is one more problematic message type:
219 // Rread: while we can employ the same truncation fix as Twrite, we
220 // need to make it observable to upstream handlers.
223 // We can rewrite msg.Count so that a return message will be under
224 // msize. This is more defensive than anything but will ensure that
225 // calls don't fail on sloppy servers.
227 // first, craft the shape of the response message
228 resp := newFcall(fcall.Tag, MessageRread{})
229 overflow := uint32(ch.msgmsize(resp)) + msg.Count - uint32(ch.msize)
231 if msg.Count < overflow {
232 // Let the bad thing happen; msize too small to even support valid
233 // rewrite. This will result in a Terror from the server-side or
238 msg.Count -= overflow
243 // If we are going to overflow the msize, we need to truncate the write to
244 // appropriate size or throw an error in all other conditions.
245 size := ch.msgmsize(fcall)
246 if size <= ch.msize {
250 // overflow the msize, including the channel message size fields.
251 overflow := size - ch.msize
253 if len(msg.Data) < overflow {
254 // paranoid: if msg.Data is not big enough to handle the
255 // overflow, we should get an overflow error. MSize would have
256 // to be way too small to be realistic.
257 return overflowErr{size: overflow}
260 // The truncation is reflected in the return message (Rwrite) by
261 // the server, so we don't need a return value or error condition
262 // to communicate it.
263 msg.Data = msg.Data[:len(msg.Data)-overflow]
264 fcall.Message = msg // since we have a local copy
268 size := ch.msgmsize(fcall)
270 // overflow the msize, including the channel message size fields.
271 return overflowErr{size: size - ch.msize}
279 // msgmsize returns the on-wire msize of the Fcall, including the size header.
280 // Typically, this can be used to detect whether or not the message overflows
282 func (ch *channel) msgmsize(fcall *Fcall) int {
283 return channelMessageHeaderSize + ch.codec.Size(fcall)
286 // readmsg reads a 9p message into p from rd, ensuring that all bytes are
287 // consumed from the size header. If the size header indicates the message is
288 // larger than p, the entire message will be discarded, leaving a truncated
289 // portion in p. Any error should be treated as a framing error unless n is
290 // zero. The caller must check that n is less than or equal to len(p) to
291 // ensure that a valid message has been read.
292 func readmsg(rd io.Reader, p []byte) (n int, err error) {
295 if err := binary.Read(rd, binary.LittleEndian, &msize); err != nil {
299 n += binary.Size(msize)
300 mbody := int(msize) - 4
306 np, err := io.ReadFull(rd, p)
313 // message has been read up to len(p) but we must consume the entire
314 // message. This is an error condition but is non-fatal if we can
315 // consume msize bytes.
316 nn, err := io.CopyN(ioutil.Discard, rd, int64(mbody-len(p)))
326 // sendmsg writes a message of len(p) to wr with a 9p size header. All errors
327 // should be considered terminal.
328 func sendmsg(wr io.Writer, p []byte) error {
329 size := uint32(len(p) + 4) // message size plus 4-bytes for size.
330 if err := binary.Write(wr, binary.LittleEndian, size); err != nil {
334 // This assume partial writes to wr aren't possible. Not sure if this
335 // valid. Matters during timeout retries.
336 if n, err := wr.Write(p); err != nil {
338 } else if n < len(p) {
339 return io.ErrShortWrite