Skip to content

Commit

Permalink
Implement more of the API for WASM
Browse files Browse the repository at this point in the history
Realized I can at least make the Reader/Writer/SetReadLimit methods
work as expected even if they're not perfect.
  • Loading branch information
nhooyr committed Sep 23, 2019
1 parent 8c54bd9 commit c381928
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 157 deletions.
22 changes: 4 additions & 18 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Conn struct {
msgReadLimit int64

// Used to ensure a previous writer is not used after being closed.
activeWriter *messageWriter
activeWriter atomic.Value
// messageWriter state.
writeMsgOpcode opcode
writeMsgCtx context.Context
Expand Down Expand Up @@ -526,16 +526,6 @@ func (c *Conn) readFramePayload(ctx context.Context, p []byte) (int, error) {
return n, err
}

// SetReadLimit sets the max number of bytes to read for a single message.
// It applies to the Reader and Read methods.
//
// By default, the connection has a message read limit of 32768 bytes.
//
// When the limit is hit, the connection will be closed with StatusMessageTooBig.
func (c *Conn) SetReadLimit(n int64) {
c.msgReadLimit = n
}

// Read is a convenience method to read a single message from the connection.
//
// See the Reader method if you want to be able to reuse buffers or want to stream a message.
Expand Down Expand Up @@ -575,7 +565,7 @@ func (c *Conn) writer(ctx context.Context, typ MessageType) (io.WriteCloser, err
w := &messageWriter{
c: c,
}
c.activeWriter = w
c.activeWriter.Store(w)
return w, nil
}

Expand Down Expand Up @@ -607,7 +597,7 @@ type messageWriter struct {
}

func (w *messageWriter) closed() bool {
return w != w.c.activeWriter
return w != w.c.activeWriter.Load()
}

// Write writes the given bytes to the WebSocket connection.
Expand Down Expand Up @@ -645,7 +635,7 @@ func (w *messageWriter) close() error {
if w.closed() {
return fmt.Errorf("cannot use closed writer")
}
w.c.activeWriter = nil
w.c.activeWriter.Store((*messageWriter)(nil))

_, err := w.c.writeFrame(w.c.writeMsgCtx, true, w.c.writeMsgOpcode, nil)
if err != nil {
Expand Down Expand Up @@ -925,7 +915,3 @@ func (c *Conn) extractBufioWriterBuf(w io.Writer) {

c.bw.Reset(w)
}

func (c *netConn) netConnReader(ctx context.Context) (MessageType, io.Reader, error) {
return c.c.Reader(c.readContext)
}
15 changes: 14 additions & 1 deletion netconn.go → conn_common.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// This file contains *Conn symbols relevant to both
// WASM and non WASM builds.

package websocket

import (
Expand Down Expand Up @@ -99,7 +102,7 @@ func (c *netConn) Read(p []byte) (int, error) {
}

if c.reader == nil {
typ, r, err := c.netConnReader(c.readContext)
typ, r, err := c.c.Reader(c.readContext)
if err != nil {
var ce CloseError
if errors.As(err, &ce) && (ce.Code == StatusNormalClosure) || (ce.Code == StatusGoingAway) {
Expand Down Expand Up @@ -189,3 +192,13 @@ func (c *Conn) CloseRead(ctx context.Context) context.Context {
}()
return ctx
}

// SetReadLimit sets the max number of bytes to read for a single message.
// It applies to the Reader and Read methods.
//
// By default, the connection has a message read limit of 32768 bytes.
//
// When the limit is hit, the connection will be closed with StatusMessageTooBig.
func (c *Conn) SetReadLimit(n int64) {
c.msgReadLimit = n
}
12 changes: 11 additions & 1 deletion doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,23 @@
// See https://developer.mozilla.org/en-US/docs/Web/API/WebSocket
//
// Thus the unsupported features (not compiled in) for WASM are:
//
// - Accept and AcceptOptions
// - Conn's Reader, Writer, SetReadLimit and Ping methods
// - Conn.Ping
// - HTTPClient and HTTPHeader fields in DialOptions
//
// The *http.Response returned by Dial will always either be nil or &http.Response{} as
// we do not have access to the handshake response in the browser.
//
// The Writer method on the Conn buffers everything in memory and then sends it as a message
// when the writer is closed.
//
// The Reader method also reads the entire response and then returns a reader that
// reads from the byte slice.
//
// SetReadLimit cannot actually limit the number of bytes read from the connection so instead
// when a message beyond the limit is fully read, it throws an error.
//
// Writes are also always async so the passed context is no-op.
//
// Everything else is fully supported. This includes the wsjson and wspb helper packages.
Expand Down
66 changes: 64 additions & 2 deletions websocket_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ import (
"sync/atomic"
"syscall/js"

"nhooyr.io/websocket/internal/bpool"
"nhooyr.io/websocket/internal/wsjs"
)

// Conn provides a wrapper around the browser WebSocket API.
type Conn struct {
ws wsjs.WebSocket

msgReadLimit int64

readClosed int64
closeOnce sync.Once
closed chan struct{}
Expand All @@ -43,6 +46,7 @@ func (c *Conn) close(err error) {
func (c *Conn) init() {
c.closed = make(chan struct{})
c.readch = make(chan wsjs.MessageEvent, 1)
c.msgReadLimit = 32768

c.releaseOnClose = c.ws.OnClose(func(e wsjs.CloseEvent) {
cerr := CloseError{
Expand Down Expand Up @@ -77,6 +81,10 @@ func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) {
if err != nil {
return 0, nil, fmt.Errorf("failed to read: %w", err)
}
if int64(len(p)) > c.msgReadLimit {
c.Close(StatusMessageTooBig, fmt.Sprintf("read limited at %v bytes", c.msgReadLimit))
return 0, nil, c.closeErr
}
return typ, p, nil
}

Expand Down Expand Up @@ -106,6 +114,11 @@ func (c *Conn) read(ctx context.Context) (MessageType, []byte, error) {
func (c *Conn) Write(ctx context.Context, typ MessageType, p []byte) error {
err := c.write(ctx, typ, p)
if err != nil {
// Have to ensure the WebSocket is closed after a write error
// to match the Go API. It can only error if the message type
// is unexpected or the passed bytes contain invalid UTF-8 for
// MessageText.
c.Close(StatusInternalError, "something went wrong")
return fmt.Errorf("failed to write: %w", err)
}
return nil
Expand Down Expand Up @@ -216,8 +229,10 @@ func dial(ctx context.Context, url string, opts *DialOptions) (*Conn, *http.Resp
return c, &http.Response{}, nil
}

func (c *netConn) netConnReader(ctx context.Context) (MessageType, io.Reader, error) {
typ, p, err := c.c.Read(ctx)
// Reader attempts to read a message from the connection.
// The maximum time spent waiting is bounded by the context.
func (c *Conn) Reader(ctx context.Context) (MessageType, io.Reader, error) {
typ, p, err := c.Read(ctx)
if err != nil {
return 0, nil, err
}
Expand All @@ -228,3 +243,50 @@ func (c *netConn) netConnReader(ctx context.Context) (MessageType, io.Reader, er
func (c *Conn) reader(ctx context.Context) {
c.read(ctx)
}

// Writer returns a writer to write a WebSocket data message to the connection.
// It buffers the entire message in memory and then sends it when the writer
// is closed.
func (c *Conn) Writer(ctx context.Context, typ MessageType) (io.WriteCloser, error) {
return writer{
c: c,
ctx: ctx,
typ: typ,
b: bpool.Get(),
}, nil
}

type writer struct {
closed bool

c *Conn
ctx context.Context
typ MessageType

b *bytes.Buffer
}

func (w writer) Write(p []byte) (int, error) {
if w.closed {
return 0, errors.New("cannot write to closed writer")
}
n, err := w.b.Write(p)
if err != nil {
return n, fmt.Errorf("failed to write message: %w", err)
}
return n, nil
}

func (w writer) Close() error {
if w.closed {
return errors.New("cannot close closed writer")
}
w.closed = true
defer bpool.Put(w.b)

err := w.c.Write(w.ctx, w.typ, w.b.Bytes())
if err != nil {
return fmt.Errorf("failed to close writer: %w", err)
}
return nil
}
6 changes: 1 addition & 5 deletions wsjson/wsjson.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// +build !js

// Package wsjson provides websocket helpers for JSON messages.
package wsjson // import "nhooyr.io/websocket/wsjson"

Expand Down Expand Up @@ -34,9 +32,7 @@ func read(ctx context.Context, c *websocket.Conn, v interface{}) error {
}

b := bpool.Get()
defer func() {
bpool.Put(b)
}()
defer bpool.Put(b)

_, err = b.ReadFrom(r)
if err != nil {
Expand Down
58 changes: 0 additions & 58 deletions wsjson/wsjson_js.go

This file was deleted.

6 changes: 1 addition & 5 deletions wspb/wspb.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// +build !js

// Package wspb provides websocket helpers for protobuf messages.
package wspb // import "nhooyr.io/websocket/wspb"

Expand Down Expand Up @@ -36,9 +34,7 @@ func read(ctx context.Context, c *websocket.Conn, v proto.Message) error {
}

b := bpool.Get()
defer func() {
bpool.Put(b)
}()
defer bpool.Put(b)

_, err = b.ReadFrom(r)
if err != nil {
Expand Down
67 changes: 0 additions & 67 deletions wspb/wspb_js.go

This file was deleted.

0 comments on commit c381928

Please sign in to comment.