Blob


1 package p9p
3 import (
4 "bufio"
5 "context"
6 "encoding/binary"
7 "io"
8 "io/ioutil"
9 "log"
10 "net"
11 "time"
12 )
14 const (
15 // channelMessageHeaderSize is the overhead for sending the size of a
16 // message on the wire.
17 channelMessageHeaderSize = 4
18 )
20 // Channel defines the operations necessary to implement a 9p message channel
21 // interface. Typically, message channels do no protocol processing except to
22 // send and receive message frames.
23 type Channel interface {
24 // ReadFcall reads one fcall frame into the provided fcall structure. The
25 // Fcall may be cleared whether there is an error or not. If the operation
26 // is successful, the contents of the fcall will be populated in the
27 // argument. ReadFcall cannot be called concurrently with other calls to
28 // ReadFcall. This both to preserve message ordering and to allow lockless
29 // buffer reusage.
30 ReadFcall(ctx context.Context, fcall *Fcall) error
32 // WriteFcall writes the provided fcall to the channel. WriteFcall cannot
33 // be called concurrently with other calls to WriteFcall.
34 WriteFcall(ctx context.Context, fcall *Fcall) error
36 // MSize returns the current msize for the channel.
37 MSize() int
39 // SetMSize sets the maximum message size for the channel. This must never
40 // be called currently with ReadFcall or WriteFcall.
41 SetMSize(msize int)
42 }
44 // NewChannel returns a new channel to read and write Fcalls with the provided
45 // connection and message size.
46 func NewChannel(conn net.Conn, msize int) Channel {
47 return newChannel(conn, codec9p{}, msize)
48 }
50 const (
51 defaultRWTimeout = 30 * time.Second // default read/write timeout if not set in context
52 )
54 // channel provides bidirectional protocol framing for 9p over net.Conn.
55 // Operations are not thread-safe but reads and writes may be carried out
56 // concurrently, supporting separate read and write loops.
57 //
58 // Lifecyle
59 //
60 // A connection, or message channel abstraction, has a lifecycle delineated by
61 // Tversion/Rversion request response cycles. For now, this is part of the
62 // channel itself but doesn't necessarily influence the channels state, except
63 // the msize. Visually, it might look something like this:
64 //
65 // [Established] -> [Version] -> [Session] -> [Version]---+
66 // ^ |
67 // |_________________________________|
68 //
69 // The connection is established, then we negotiate a version, run a session,
70 // then negotiate a version and so on. For most purposes, we are likely going
71 // to terminate the connection after the session but we may want to support
72 // connection pooling. Pooling may result in possible security leaks if the
73 // connections are shared among contexts, since the version is negotiated at
74 // the start of the session. To avoid this, we can actually use a "tombstone"
75 // version message which clears the server's session state without starting a
76 // new session. The next version message would then prepare the session
77 // without leaking any Fid's.
78 type channel struct {
79 conn net.Conn
80 codec Codec
81 brd *bufio.Reader
82 bwr *bufio.Writer
83 closed chan struct{}
84 msize int
85 rdbuf []byte
86 }
88 func newChannel(conn net.Conn, codec Codec, msize int) *channel {
89 return &channel{
90 conn: conn,
91 codec: codec,
92 brd: bufio.NewReaderSize(conn, msize), // msize may not be optimal buffer size
93 bwr: bufio.NewWriterSize(conn, msize),
94 closed: make(chan struct{}),
95 msize: msize,
96 rdbuf: make([]byte, msize),
97 }
98 }
100 func (ch *channel) MSize() int {
101 return ch.msize
104 // setmsize resizes the buffers for use with a separate msize. This call must
105 // be protected by a mutex or made before passing to other goroutines.
106 func (ch *channel) SetMSize(msize int) {
107 // NOTE(stevvooe): We cannot safely resize the buffered reader and writer.
108 // Proceed assuming that original size is sufficient.
110 ch.msize = msize
111 if msize < len(ch.rdbuf) {
112 // just change the cap
113 ch.rdbuf = ch.rdbuf[:msize]
114 return
117 ch.rdbuf = make([]byte, msize)
120 // ReadFcall reads the next message from the channel into fcall.
121 //
122 // If the incoming message overflows the msize, Overflow(err) will return
123 // nonzero with the number of bytes overflowed.
124 func (ch *channel) ReadFcall(ctx context.Context, fcall *Fcall) error {
125 select {
126 case <-ctx.Done():
127 return ctx.Err()
128 case <-ch.closed:
129 return ErrClosed
130 default:
133 deadline, ok := ctx.Deadline()
134 if !ok {
135 deadline = time.Now().Add(defaultRWTimeout)
138 if err := ch.conn.SetReadDeadline(deadline); err != nil {
139 log.Printf("p9p: transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
142 n, err := readmsg(ch.brd, ch.rdbuf)
143 if err != nil {
144 // TODO(stevvooe): There may be more we can do here to detect partial
145 // reads. For now, we just propagate the error untouched.
146 return err
149 if n > len(ch.rdbuf) {
150 return overflowErr{size: n - len(ch.rdbuf)}
153 // clear out the fcall
154 *fcall = Fcall{}
155 if err := ch.codec.Unmarshal(ch.rdbuf[:n], fcall); err != nil {
156 return err
159 if err := ch.maybeTruncate(fcall); err != nil {
160 return err
163 return nil
166 // WriteFcall writes the message to the connection.
167 //
168 // If a message destined for the wire will overflow MSize, an Overflow error
169 // may be returned. For Twrite calls, the buffer will simply be truncated to
170 // the optimal msize, with the caller detecting this condition with
171 // Rwrite.Count.
172 func (ch *channel) WriteFcall(ctx context.Context, fcall *Fcall) error {
173 select {
174 case <-ctx.Done():
175 return ctx.Err()
176 case <-ch.closed:
177 return ErrClosed
178 default:
181 deadline, ok := ctx.Deadline()
182 if !ok {
183 deadline = time.Now().Add(defaultRWTimeout)
186 if err := ch.conn.SetWriteDeadline(deadline); err != nil {
187 log.Printf("p9p: transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
190 if err := ch.maybeTruncate(fcall); err != nil {
191 return err
194 p, err := ch.codec.Marshal(fcall)
195 if err != nil {
196 return err
199 if err := sendmsg(ch.bwr, p); err != nil {
200 return err
203 return ch.bwr.Flush()
206 // maybeTruncate will truncate the message to fit into msize on the wire, if
207 // possible, or modify the message to ensure the response won't overflow.
208 //
209 // If the message cannot be truncated, an error will be returned and the
210 // message should not be sent.
211 //
212 // A nil return value means the message can be sent without
213 func (ch *channel) maybeTruncate(fcall *Fcall) error {
215 // for certain message types, just remove the extra bytes from the data portion.
216 switch msg := fcall.Message.(type) {
217 // TODO(stevvooe): There is one more problematic message type:
218 //
219 // Rread: while we can employ the same truncation fix as Twrite, we
220 // need to make it observable to upstream handlers.
222 case MessageTread:
223 // We can rewrite msg.Count so that a return message will be under
224 // msize. This is more defensive than anything but will ensure that
225 // calls don't fail on sloppy servers.
227 // first, craft the shape of the response message
228 resp := newFcall(fcall.Tag, MessageRread{})
229 overflow := uint32(ch.msgmsize(resp)) + msg.Count - uint32(ch.msize)
231 if msg.Count < overflow {
232 // Let the bad thing happen; msize too small to even support valid
233 // rewrite. This will result in a Terror from the server-side or
234 // just work.
235 return nil
238 msg.Count -= overflow
239 fcall.Message = msg
241 return nil
242 case MessageTwrite:
243 // If we are going to overflow the msize, we need to truncate the write to
244 // appropriate size or throw an error in all other conditions.
245 size := ch.msgmsize(fcall)
246 if size <= ch.msize {
247 return nil
250 // overflow the msize, including the channel message size fields.
251 overflow := size - ch.msize
253 if len(msg.Data) < overflow {
254 // paranoid: if msg.Data is not big enough to handle the
255 // overflow, we should get an overflow error. MSize would have
256 // to be way too small to be realistic.
257 return overflowErr{size: overflow}
260 // The truncation is reflected in the return message (Rwrite) by
261 // the server, so we don't need a return value or error condition
262 // to communicate it.
263 msg.Data = msg.Data[:len(msg.Data)-overflow]
264 fcall.Message = msg // since we have a local copy
266 return nil
267 default:
268 size := ch.msgmsize(fcall)
269 if size > ch.msize {
270 // overflow the msize, including the channel message size fields.
271 return overflowErr{size: size - ch.msize}
274 return nil
279 // msgmsize returns the on-wire msize of the Fcall, including the size header.
280 // Typically, this can be used to detect whether or not the message overflows
281 // the msize buffer.
282 func (ch *channel) msgmsize(fcall *Fcall) int {
283 return channelMessageHeaderSize + ch.codec.Size(fcall)
286 // readmsg reads a 9p message into p from rd, ensuring that all bytes are
287 // consumed from the size header. If the size header indicates the message is
288 // larger than p, the entire message will be discarded, leaving a truncated
289 // portion in p. Any error should be treated as a framing error unless n is
290 // zero. The caller must check that n is less than or equal to len(p) to
291 // ensure that a valid message has been read.
292 func readmsg(rd io.Reader, p []byte) (n int, err error) {
293 var msize uint32
295 if err := binary.Read(rd, binary.LittleEndian, &msize); err != nil {
296 return 0, err
299 n += binary.Size(msize)
300 mbody := int(msize) - 4
302 if mbody < len(p) {
303 p = p[:mbody]
306 np, err := io.ReadFull(rd, p)
307 if err != nil {
308 return np + n, err
310 n += np
312 if mbody > len(p) {
313 // message has been read up to len(p) but we must consume the entire
314 // message. This is an error condition but is non-fatal if we can
315 // consume msize bytes.
316 nn, err := io.CopyN(ioutil.Discard, rd, int64(mbody-len(p)))
317 n += int(nn)
318 if err != nil {
319 return n, err
323 return n, nil
326 // sendmsg writes a message of len(p) to wr with a 9p size header. All errors
327 // should be considered terminal.
328 func sendmsg(wr io.Writer, p []byte) error {
329 size := uint32(len(p) + 4) // message size plus 4-bytes for size.
330 if err := binary.Write(wr, binary.LittleEndian, size); err != nil {
331 return err
334 // This assume partial writes to wr aren't possible. Not sure if this
335 // valid. Matters during timeout retries.
336 if n, err := wr.Write(p); err != nil {
337 return err
338 } else if n < len(p) {
339 return io.ErrShortWrite
342 return nil