Blame


1 e342de7d 2015-11-11 adrien package p9p
2 e342de7d 2015-11-11 adrien
3 e342de7d 2015-11-11 adrien import (
4 e342de7d 2015-11-11 adrien "bufio"
5 e342de7d 2015-11-11 adrien "encoding/binary"
6 e342de7d 2015-11-11 adrien "fmt"
7 e342de7d 2015-11-11 adrien "io"
8 e342de7d 2015-11-11 adrien "io/ioutil"
9 e342de7d 2015-11-11 adrien "log"
10 e342de7d 2015-11-11 adrien "net"
11 e342de7d 2015-11-11 adrien "time"
12 e342de7d 2015-11-11 adrien
13 e342de7d 2015-11-11 adrien "golang.org/x/net/context"
14 e342de7d 2015-11-11 adrien )
15 e342de7d 2015-11-11 adrien
16 e342de7d 2015-11-11 adrien // Channel defines the operations necessary to implement a 9p message channel
17 e342de7d 2015-11-11 adrien // interface. Typically, message channels do no protocol processing except to
18 e342de7d 2015-11-11 adrien // send and receive message frames.
19 e342de7d 2015-11-11 adrien type Channel interface {
20 e342de7d 2015-11-11 adrien // ReadFcall reads one fcall frame into the provided fcall structure. The
21 e342de7d 2015-11-11 adrien // Fcall may be cleared whether there is an error or not. If the operation
22 e342de7d 2015-11-11 adrien // is successful, the contents of the fcall will be populated in the
23 e342de7d 2015-11-11 adrien // argument. ReadFcall cannot be called concurrently with other calls to
24 e342de7d 2015-11-11 adrien // ReadFcall. This both to preserve message ordering and to allow lockless
25 e342de7d 2015-11-11 adrien // buffer reusage.
26 e342de7d 2015-11-11 adrien ReadFcall(ctx context.Context, fcall *Fcall) error
27 e342de7d 2015-11-11 adrien
28 e342de7d 2015-11-11 adrien // WriteFcall writes the provided fcall to the channel. WriteFcall cannot
29 e342de7d 2015-11-11 adrien // be called concurrently with other calls to WriteFcall.
30 e342de7d 2015-11-11 adrien WriteFcall(ctx context.Context, fcall *Fcall) error
31 e342de7d 2015-11-11 adrien
32 e342de7d 2015-11-11 adrien // MSize returns the current msize for the channel.
33 e342de7d 2015-11-11 adrien MSize() int
34 e342de7d 2015-11-11 adrien
35 e342de7d 2015-11-11 adrien // SetMSize sets the maximum message size for the channel. This must never
36 e342de7d 2015-11-11 adrien // be called currently with ReadFcall or WriteFcall.
37 e342de7d 2015-11-11 adrien SetMSize(msize int)
38 e342de7d 2015-11-11 adrien }
39 e342de7d 2015-11-11 adrien
40 e342de7d 2015-11-11 adrien func NewChannel(conn net.Conn, msize int) Channel {
41 e342de7d 2015-11-11 adrien return newChannel(conn, codec9p{}, msize)
42 e342de7d 2015-11-11 adrien }
43 e342de7d 2015-11-11 adrien
44 e342de7d 2015-11-11 adrien const (
45 e342de7d 2015-11-11 adrien defaultRWTimeout = 1 * time.Second // default read/write timeout if not set in context
46 e342de7d 2015-11-11 adrien )
47 e342de7d 2015-11-11 adrien
48 e342de7d 2015-11-11 adrien // channel provides bidirectional protocol framing for 9p over net.Conn.
49 e342de7d 2015-11-11 adrien // Operations are not thread-safe but reads and writes may be carried out
50 e342de7d 2015-11-11 adrien // concurrently, supporting separate read and write loops.
51 e342de7d 2015-11-11 adrien //
52 e342de7d 2015-11-11 adrien // Lifecyle
53 e342de7d 2015-11-11 adrien //
54 e342de7d 2015-11-11 adrien // A connection, or message channel abstraction, has a lifecycle delineated by
55 e342de7d 2015-11-11 adrien // Tversion/Rversion request response cycles. For now, this is part of the
56 e342de7d 2015-11-11 adrien // channel itself but doesn't necessarily influence the channels state, except
57 e342de7d 2015-11-11 adrien // the msize. Visually, it might look something like this:
58 e342de7d 2015-11-11 adrien //
59 e342de7d 2015-11-11 adrien // [Established] -> [Version] -> [Session] -> [Version]---+
60 e342de7d 2015-11-11 adrien // ^ |
61 e342de7d 2015-11-11 adrien // |_________________________________|
62 e342de7d 2015-11-11 adrien //
63 e342de7d 2015-11-11 adrien // The connection is established, then we negotiate a version, run a session,
64 e342de7d 2015-11-11 adrien // then negotiate a version and so on. For most purposes, we are likely going
65 e342de7d 2015-11-11 adrien // to terminate the connection after the session but we may want to support
66 e342de7d 2015-11-11 adrien // connection pooling. Pooling may result in possible security leaks if the
67 e342de7d 2015-11-11 adrien // connections are shared among contexts, since the version is negotiated at
68 e342de7d 2015-11-11 adrien // the start of the session. To avoid this, we can actually use a "tombstone"
69 e342de7d 2015-11-11 adrien // version message which clears the server's session state without starting a
70 e342de7d 2015-11-11 adrien // new session. The next version message would then prepare the session
71 e342de7d 2015-11-11 adrien // without leaking any Fid's.
72 e342de7d 2015-11-11 adrien type channel struct {
73 e342de7d 2015-11-11 adrien conn net.Conn
74 e342de7d 2015-11-11 adrien codec Codec
75 e342de7d 2015-11-11 adrien brd *bufio.Reader
76 e342de7d 2015-11-11 adrien bwr *bufio.Writer
77 e342de7d 2015-11-11 adrien closed chan struct{}
78 e342de7d 2015-11-11 adrien msize int
79 e342de7d 2015-11-11 adrien rdbuf []byte
80 e342de7d 2015-11-11 adrien }
81 e342de7d 2015-11-11 adrien
82 e342de7d 2015-11-11 adrien func newChannel(conn net.Conn, codec Codec, msize int) *channel {
83 e342de7d 2015-11-11 adrien return &channel{
84 e342de7d 2015-11-11 adrien conn: conn,
85 e342de7d 2015-11-11 adrien codec: codec,
86 e342de7d 2015-11-11 adrien brd: bufio.NewReaderSize(conn, msize), // msize may not be optimal buffer size
87 e342de7d 2015-11-11 adrien bwr: bufio.NewWriterSize(conn, msize),
88 e342de7d 2015-11-11 adrien closed: make(chan struct{}),
89 e342de7d 2015-11-11 adrien msize: msize,
90 e342de7d 2015-11-11 adrien rdbuf: make([]byte, msize),
91 e342de7d 2015-11-11 adrien }
92 e342de7d 2015-11-11 adrien }
93 e342de7d 2015-11-11 adrien
94 e342de7d 2015-11-11 adrien func (ch *channel) MSize() int {
95 e342de7d 2015-11-11 adrien return ch.msize
96 e342de7d 2015-11-11 adrien }
97 e342de7d 2015-11-11 adrien
98 e342de7d 2015-11-11 adrien // setmsize resizes the buffers for use with a separate msize. This call must
99 e342de7d 2015-11-11 adrien // be protected by a mutex or made before passing to other goroutines.
100 e342de7d 2015-11-11 adrien func (ch *channel) SetMSize(msize int) {
101 e342de7d 2015-11-11 adrien // NOTE(stevvooe): We cannot safely resize the buffered reader and writer.
102 e342de7d 2015-11-11 adrien // Proceed assuming that original size is sufficient.
103 e342de7d 2015-11-11 adrien
104 e342de7d 2015-11-11 adrien ch.msize = msize
105 e342de7d 2015-11-11 adrien if msize < len(ch.rdbuf) {
106 e342de7d 2015-11-11 adrien // just change the cap
107 e342de7d 2015-11-11 adrien ch.rdbuf = ch.rdbuf[:msize]
108 e342de7d 2015-11-11 adrien return
109 e342de7d 2015-11-11 adrien }
110 e342de7d 2015-11-11 adrien
111 e342de7d 2015-11-11 adrien ch.rdbuf = make([]byte, msize)
112 e342de7d 2015-11-11 adrien }
113 e342de7d 2015-11-11 adrien
114 e342de7d 2015-11-11 adrien // ReadFcall reads the next message from the channel into fcall.
115 e342de7d 2015-11-11 adrien func (ch *channel) ReadFcall(ctx context.Context, fcall *Fcall) error {
116 e342de7d 2015-11-11 adrien select {
117 e342de7d 2015-11-11 adrien case <-ctx.Done():
118 e342de7d 2015-11-11 adrien return ctx.Err()
119 e342de7d 2015-11-11 adrien case <-ch.closed:
120 e342de7d 2015-11-11 adrien return ErrClosed
121 e342de7d 2015-11-11 adrien default:
122 e342de7d 2015-11-11 adrien }
123 e342de7d 2015-11-11 adrien
124 e342de7d 2015-11-11 adrien deadline, ok := ctx.Deadline()
125 e342de7d 2015-11-11 adrien if !ok {
126 e342de7d 2015-11-11 adrien deadline = time.Now().Add(defaultRWTimeout)
127 e342de7d 2015-11-11 adrien }
128 e342de7d 2015-11-11 adrien
129 e342de7d 2015-11-11 adrien if err := ch.conn.SetReadDeadline(deadline); err != nil {
130 e342de7d 2015-11-11 adrien log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
131 e342de7d 2015-11-11 adrien }
132 e342de7d 2015-11-11 adrien
133 e342de7d 2015-11-11 adrien n, err := readmsg(ch.brd, ch.rdbuf)
134 e342de7d 2015-11-11 adrien if err != nil {
135 e342de7d 2015-11-11 adrien // TODO(stevvooe): There may be more we can do here to detect partial
136 e342de7d 2015-11-11 adrien // reads. For now, we just propagate the error untouched.
137 e342de7d 2015-11-11 adrien return err
138 e342de7d 2015-11-11 adrien }
139 e342de7d 2015-11-11 adrien
140 e342de7d 2015-11-11 adrien if n > len(ch.rdbuf) {
141 e342de7d 2015-11-11 adrien // TODO(stevvooe): Make this error detectable and respond with error
142 e342de7d 2015-11-11 adrien // message.
143 e342de7d 2015-11-11 adrien return fmt.Errorf("message large than buffer:", n)
144 e342de7d 2015-11-11 adrien }
145 e342de7d 2015-11-11 adrien
146 e342de7d 2015-11-11 adrien // clear out the fcall
147 e342de7d 2015-11-11 adrien *fcall = Fcall{}
148 e342de7d 2015-11-11 adrien if err := ch.codec.Unmarshal(ch.rdbuf[:n], fcall); err != nil {
149 e342de7d 2015-11-11 adrien return err
150 e342de7d 2015-11-11 adrien }
151 e342de7d 2015-11-11 adrien log.Println("channel: recv", fcall)
152 e342de7d 2015-11-11 adrien return nil
153 e342de7d 2015-11-11 adrien }
154 e342de7d 2015-11-11 adrien
155 e342de7d 2015-11-11 adrien func (ch *channel) WriteFcall(ctx context.Context, fcall *Fcall) error {
156 e342de7d 2015-11-11 adrien select {
157 e342de7d 2015-11-11 adrien case <-ctx.Done():
158 e342de7d 2015-11-11 adrien return ctx.Err()
159 e342de7d 2015-11-11 adrien case <-ch.closed:
160 e342de7d 2015-11-11 adrien return ErrClosed
161 e342de7d 2015-11-11 adrien default:
162 e342de7d 2015-11-11 adrien }
163 e342de7d 2015-11-11 adrien log.Println("channel: send", fcall)
164 e342de7d 2015-11-11 adrien
165 e342de7d 2015-11-11 adrien deadline, ok := ctx.Deadline()
166 e342de7d 2015-11-11 adrien if !ok {
167 e342de7d 2015-11-11 adrien deadline = time.Now().Add(defaultRWTimeout)
168 e342de7d 2015-11-11 adrien }
169 e342de7d 2015-11-11 adrien
170 e342de7d 2015-11-11 adrien if err := ch.conn.SetWriteDeadline(deadline); err != nil {
171 e342de7d 2015-11-11 adrien log.Printf("transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
172 e342de7d 2015-11-11 adrien }
173 e342de7d 2015-11-11 adrien
174 e342de7d 2015-11-11 adrien p, err := ch.codec.Marshal(fcall)
175 e342de7d 2015-11-11 adrien if err != nil {
176 e342de7d 2015-11-11 adrien return err
177 e342de7d 2015-11-11 adrien }
178 e342de7d 2015-11-11 adrien
179 e342de7d 2015-11-11 adrien if err := sendmsg(ch.bwr, p); err != nil {
180 e342de7d 2015-11-11 adrien return err
181 e342de7d 2015-11-11 adrien }
182 e342de7d 2015-11-11 adrien
183 e342de7d 2015-11-11 adrien return ch.bwr.Flush()
184 e342de7d 2015-11-11 adrien }
185 e342de7d 2015-11-11 adrien
186 e342de7d 2015-11-11 adrien // readmsg reads a 9p message into p from rd, ensuring that all bytes are
187 e342de7d 2015-11-11 adrien // consumed from the size header. If the size header indicates the message is
188 e342de7d 2015-11-11 adrien // larger than p, the entire message will be discarded, leaving a truncated
189 e342de7d 2015-11-11 adrien // portion in p. Any error should be treated as a framing error unless n is
190 e342de7d 2015-11-11 adrien // zero. The caller must check that n is less than or equal to len(p) to
191 e342de7d 2015-11-11 adrien // ensure that a valid message has been read.
192 e342de7d 2015-11-11 adrien func readmsg(rd io.Reader, p []byte) (n int, err error) {
193 e342de7d 2015-11-11 adrien var msize uint32
194 e342de7d 2015-11-11 adrien
195 e342de7d 2015-11-11 adrien if err := binary.Read(rd, binary.LittleEndian, &msize); err != nil {
196 e342de7d 2015-11-11 adrien return 0, err
197 e342de7d 2015-11-11 adrien }
198 e342de7d 2015-11-11 adrien
199 e342de7d 2015-11-11 adrien n += binary.Size(msize)
200 e342de7d 2015-11-11 adrien mbody := int(msize) - 4
201 e342de7d 2015-11-11 adrien
202 e342de7d 2015-11-11 adrien if mbody < len(p) {
203 e342de7d 2015-11-11 adrien p = p[:mbody]
204 e342de7d 2015-11-11 adrien }
205 e342de7d 2015-11-11 adrien
206 e342de7d 2015-11-11 adrien np, err := io.ReadFull(rd, p)
207 e342de7d 2015-11-11 adrien if err != nil {
208 e342de7d 2015-11-11 adrien return np + n, err
209 e342de7d 2015-11-11 adrien }
210 e342de7d 2015-11-11 adrien n += np
211 e342de7d 2015-11-11 adrien
212 e342de7d 2015-11-11 adrien if mbody > len(p) {
213 e342de7d 2015-11-11 adrien // message has been read up to len(p) but we must consume the entire
214 e342de7d 2015-11-11 adrien // message. This is an error condition but is non-fatal if we can
215 e342de7d 2015-11-11 adrien // consume msize bytes.
216 e342de7d 2015-11-11 adrien nn, err := io.CopyN(ioutil.Discard, rd, int64(mbody-len(p)))
217 e342de7d 2015-11-11 adrien n += int(nn)
218 e342de7d 2015-11-11 adrien if err != nil {
219 e342de7d 2015-11-11 adrien return n, err
220 e342de7d 2015-11-11 adrien }
221 e342de7d 2015-11-11 adrien }
222 e342de7d 2015-11-11 adrien
223 e342de7d 2015-11-11 adrien return n, nil
224 e342de7d 2015-11-11 adrien }
225 e342de7d 2015-11-11 adrien
226 e342de7d 2015-11-11 adrien // sendmsg writes a message of len(p) to wr with a 9p size header. All errors
227 e342de7d 2015-11-11 adrien // should be considered terminal.
228 e342de7d 2015-11-11 adrien func sendmsg(wr io.Writer, p []byte) error {
229 e342de7d 2015-11-11 adrien size := uint32(len(p) + 4) // message size plus 4-bytes for size.
230 e342de7d 2015-11-11 adrien if err := binary.Write(wr, binary.LittleEndian, size); err != nil {
231 e342de7d 2015-11-11 adrien return nil
232 e342de7d 2015-11-11 adrien }
233 e342de7d 2015-11-11 adrien
234 e342de7d 2015-11-11 adrien // This assume partial writes to wr aren't possible. Not sure if this
235 e342de7d 2015-11-11 adrien // valid. Matters during timeout retries.
236 e342de7d 2015-11-11 adrien if n, err := wr.Write(p); err != nil {
237 e342de7d 2015-11-11 adrien return err
238 e342de7d 2015-11-11 adrien } else if n < len(p) {
239 e342de7d 2015-11-11 adrien return io.ErrShortWrite
240 e342de7d 2015-11-11 adrien }
241 e342de7d 2015-11-11 adrien
242 e342de7d 2015-11-11 adrien return nil
243 e342de7d 2015-11-11 adrien }