From 6e06d561ec88594028846aa2a4af17a9aa0c87c4 Mon Sep 17 00:00:00 2001 From: Quinn Slack Date: Sat, 17 Dec 2016 18:35:34 -0800 Subject: [PATCH] Add pluggable transport interface + WebSocket support simplify API by using interface{}, rename from transport -> stream add WebSocket transport in websocket subpackage do not buffer in ReadObject, rename GetObjectReader/ReadObject -> NextObjectReader use xtest (jsonrpc2_test) package to allow us to test subpackages that depend on us (in a future change) factor out vscode-specific transport code and allow pluggable transports remove Server (unused) and Serve (unnecessary): The Serve func had nothing specific to JSON-RPC; it was just a loop around (net.Listener).Accept. It added no value. --- codec_test.go | 34 ++++++++ jsonrpc2.go | 106 +++++------------------ jsonrpc2_test.go | 205 ++++++++++++++++++++------------------------ object_test.go | 59 +++++++++++++ stream.go | 162 ++++++++++++++++++++++++++++++++++ websocket/stream.go | 44 ++++++++++ 6 files changed, 416 insertions(+), 194 deletions(-) create mode 100644 codec_test.go create mode 100644 object_test.go create mode 100644 stream.go create mode 100644 websocket/stream.go diff --git a/codec_test.go b/codec_test.go new file mode 100644 index 0000000..6415415 --- /dev/null +++ b/codec_test.go @@ -0,0 +1,34 @@ +package jsonrpc2 + +import ( + "bufio" + "bytes" + "strings" + "testing" +) + +func TestVarintObjectCodec(t *testing.T) { + want := 789 + var buf bytes.Buffer + if err := (VarintObjectCodec{}).WriteObject(&buf, want); err != nil { + t.Fatal(err) + } + var v int + if err := (VarintObjectCodec{}).ReadObject(bufio.NewReader(&buf), &v); err != nil { + t.Fatal(err) + } + if want := want; v != want { + t.Errorf("got %v, want %v", v, want) + } +} + +func TestVSCodeObjectCodec_ReadObject(t *testing.T) { + s := "Content-Type: foo\r\nContent-Length: 123\r\n\r\n789" + var v int + if err := (VSCodeObjectCodec{}).ReadObject(bufio.NewReader(strings.NewReader(s)), &v); err != nil { + t.Fatal(err) + } + if want := 789; v != want { + t.Errorf("got %v, want %v", v, want) + } +} diff --git a/jsonrpc2.go b/jsonrpc2.go index 7ed2eb9..3c3fa76 100644 --- a/jsonrpc2.go +++ b/jsonrpc2.go @@ -3,16 +3,13 @@ package jsonrpc2 import ( - "bufio" "context" "encoding/json" "errors" "fmt" "io" "log" - "net" "strconv" - "strings" "sync" ) @@ -243,8 +240,7 @@ func (id *ID) UnmarshalJSON(data []byte) error { // is symmetric, so a Conn runs on both ends of a client-server // connection. type Conn struct { - conn io.Closer // all writes should go through w, all reads through readMessages - w *bufio.Writer + stream ObjectStream h Handler @@ -276,10 +272,9 @@ var ErrClosed = errors.New("jsonrpc2: connection is closed") // // NewClient consumes conn, so you should call Close on the returned // client not on the given conn. -func NewConn(ctx context.Context, conn io.ReadWriteCloser, h Handler, opt ...ConnOpt) *Conn { +func NewConn(ctx context.Context, stream ObjectStream, h Handler, opt ...ConnOpt) *Conn { c := &Conn{ - conn: conn, - w: bufio.NewWriter(conn), + stream: stream, h: h, pending: map[ID]*call{}, disconnect: make(chan struct{}), @@ -287,7 +282,7 @@ func NewConn(ctx context.Context, conn io.ReadWriteCloser, h Handler, opt ...Con for _, opt := range opt { opt(c) } - go c.readMessages(ctx, bufio.NewReader(conn)) + go c.readMessages(ctx) return c } @@ -301,10 +296,10 @@ func (c *Conn) Close() error { } c.closing = true c.mu.Unlock() - return c.conn.Close() + return c.stream.Close() } -func (c *Conn) send(ctx context.Context, m *anyMessage, wait bool) (*call, error) { +func (c *Conn) send(ctx context.Context, m *anyMessage, wait bool) (cc *call, err error) { c.sending.Lock() defer c.sending.Unlock() @@ -316,7 +311,6 @@ func (c *Conn) send(ctx context.Context, m *anyMessage, wait bool) (*call, error // Store requests so we can later associate them with incoming // responses. - var cc *call if m.request != nil && wait { cc = &call{request: m.request, seq: c.seq, done: make(chan error, 1)} c.pending[ID{Num: c.seq}] = cc // use next seq as call ID @@ -334,17 +328,23 @@ func (c *Conn) send(ctx context.Context, m *anyMessage, wait bool) (*call, error } } - err := marshalHeadersAndBody(c.w, m) - if err != nil { - c.w.Flush() - if cc != nil { - c.mu.Lock() - delete(c.pending, ID{Num: cc.seq}) - c.mu.Unlock() + // From here on, if we fail to send this, then we need to remove + // this from the pending map so we don't block on it or pile up + // pending entries for unsent messages. + defer func() { + if err != nil { + if cc != nil { + c.mu.Lock() + delete(c.pending, ID{Num: cc.seq}) + c.mu.Unlock() + } } + }() + + if err := c.stream.WriteObject(m); err != nil { return nil, err } - return cc, c.w.Flush() + return cc, nil } // Call initiates a JSON-RPC call using the specified method and @@ -431,16 +431,11 @@ func (c *Conn) DisconnectNotify() <-chan struct{} { return c.disconnect } -func (c *Conn) readMessages(ctx context.Context, r *bufio.Reader) { +func (c *Conn) readMessages(ctx context.Context) { var err error for err == nil { var m anyMessage - - var n uint32 - n, err = readHeaderContentLength(r) - if err == nil { - err = json.NewDecoder(io.LimitReader(r, int64(n))).Decode(&m) - } + err = c.stream.ReadObject(&m) if err != nil { break } @@ -513,20 +508,6 @@ func (c *Conn) readMessages(ctx context.Context, r *bufio.Reader) { close(c.disconnect) } -// Server is a JSON-RPC server. -type Server struct{} - -// Serve starts a new JSON-RPC server. -func Serve(ctx context.Context, lis net.Listener, h Handler, opt ...ConnOpt) error { - for { - conn, err := lis.Accept() - if err != nil { - return err - } - NewConn(ctx, conn, h, opt...) - } -} - // call represents a JSON-RPC call over its entire lifecycle. type call struct { request *Request @@ -612,49 +593,6 @@ func (m *anyMessage) UnmarshalJSON(data []byte) error { return json.Unmarshal(data, v) } -func readHeaderContentLength(r *bufio.Reader) (contentLength uint32, err error) { - for { - line, err := r.ReadString('\r') - if err != nil { - return 0, err - } - b, err := r.ReadByte() - if err != nil { - return 0, err - } - if b != '\n' { - return 0, fmt.Errorf(`jsonrpc2: line endings must be \r\n`) - } - if line == "\r" { - break - } - if strings.HasPrefix(line, "Content-Length: ") { - line = strings.TrimPrefix(line, "Content-Length: ") - line = strings.TrimSpace(line) - n, err := strconv.ParseUint(line, 10, 32) - if err != nil { - return 0, err - } - contentLength = uint32(n) - } - } - if contentLength == 0 { - err = fmt.Errorf("jsonrpc2: no Content-Length header found") - } - return -} - -func marshalHeadersAndBody(w io.Writer, v interface{}) error { - body, err := json.Marshal(v) - if err != nil { - return err - } - fmt.Fprintf(w, "Content-Length: %d\r\n", len(body)) - fmt.Fprint(w, "Content-Type: application/vscode-jsonrpc; charset=utf8\r\n\r\n") - _, err = w.Write(body) - return err -} - var ( errInvalidRequestJSON = errors.New("jsonrpc2: request must be either a JSON object or JSON array") errInvalidResponseJSON = errors.New("jsonrpc2: response must be either a JSON object or JSON array") diff --git a/jsonrpc2_test.go b/jsonrpc2_test.go index 167c2e1..ab36d8d 100644 --- a/jsonrpc2_test.go +++ b/jsonrpc2_test.go @@ -1,22 +1,26 @@ -package jsonrpc2 +package jsonrpc2_test import ( - "bufio" "bytes" "context" "encoding/json" "fmt" "io" "net" - "reflect" + "net/http" + "net/http/httptest" "strings" "sync" "testing" "time" + + "github.com/gorilla/websocket" + "github.com/sourcegraph/jsonrpc2" + websocketjsonrpc2 "github.com/sourcegraph/jsonrpc2/websocket" ) func TestRequest_MarshalJSON_jsonrpc(t *testing.T) { - b, err := json.Marshal(&Request{}) + b, err := json.Marshal(&jsonrpc2.Request{}) if err != nil { t.Fatal(err) } @@ -26,7 +30,7 @@ func TestRequest_MarshalJSON_jsonrpc(t *testing.T) { } func TestResponse_MarshalJSON_jsonrpc(t *testing.T) { - b, err := json.Marshal(&Response{}) + b, err := json.Marshal(&jsonrpc2.Response{}) if err != nil { t.Fatal(err) } @@ -36,12 +40,12 @@ func TestResponse_MarshalJSON_jsonrpc(t *testing.T) { } func TestResponseMarshalJSON_Notif(t *testing.T) { - tests := map[*Request]bool{ - &Request{ID: ID{Num: 0}}: true, - &Request{ID: ID{Num: 1}}: true, - &Request{ID: ID{Str: "", IsString: true}}: true, - &Request{ID: ID{Str: "a", IsString: true}}: true, - &Request{Notif: true}: false, + tests := map[*jsonrpc2.Request]bool{ + &jsonrpc2.Request{ID: jsonrpc2.ID{Num: 0}}: true, + &jsonrpc2.Request{ID: jsonrpc2.ID{Num: 1}}: true, + &jsonrpc2.Request{ID: jsonrpc2.ID{Str: "", IsString: true}}: true, + &jsonrpc2.Request{ID: jsonrpc2.ID{Str: "a", IsString: true}}: true, + &jsonrpc2.Request{Notif: true}: false, } for r, wantIDKey := range tests { b, err := json.Marshal(r) @@ -64,7 +68,7 @@ func TestResponseUnmarshalJSON_Notif(t *testing.T) { `{"method":"f"}`: true, } for s, want := range tests { - var r Request + var r jsonrpc2.Request if err := json.Unmarshal([]byte(s), &r); err != nil { t.Fatal(err) } @@ -77,7 +81,7 @@ func TestResponseUnmarshalJSON_Notif(t *testing.T) { // testHandlerA is the "server" handler. type testHandlerA struct{ t *testing.T } -func (h *testHandlerA) Handle(ctx context.Context, conn *Conn, req *Request) { +func (h *testHandlerA) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { if req.Notif { return // notification } @@ -97,7 +101,7 @@ type testHandlerB struct { got []string } -func (h *testHandlerB) Handle(ctx context.Context, conn *Conn, req *Request) { +func (h *testHandlerB) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { if req.Notif { h.mu.Lock() defer h.mu.Unlock() @@ -108,39 +112,76 @@ func (h *testHandlerB) Handle(ctx context.Context, conn *Conn, req *Request) { } func TestClientServer(t *testing.T) { - ctx := context.Background() - done := make(chan struct{}) - lis, err := net.Listen("tcp", "127.0.0.1:0") // any available address - if err != nil { - t.Fatal("Listen:", err) - } - defer func() { - if lis == nil { - return // already closed + t.Run("tcp", func(t *testing.T) { + ctx := context.Background() + done := make(chan struct{}) + + lis, err := net.Listen("tcp", "127.0.0.1:0") // any available address + if err != nil { + t.Fatal("Listen:", err) } - if err := lis.Close(); err != nil { - if !strings.HasSuffix(err.Error(), "use of closed network connection") { + defer func() { + if lis == nil { + return // already closed + } + if err := lis.Close(); err != nil { + if !strings.HasSuffix(err.Error(), "use of closed network connection") { + t.Fatal(err) + } + } + }() + + ha := testHandlerA{t: t} + go func() { + if err := serve(ctx, lis, &ha); err != nil { + if !strings.HasSuffix(err.Error(), "use of closed network connection") { + t.Error(err) + } + } + close(done) + }() + + conn, err := net.Dial("tcp", lis.Addr().String()) + if err != nil { + t.Fatal("Dial:", err) + } + testClientServer(ctx, t, jsonrpc2.NewBufferedStream(conn, jsonrpc2.VarintObjectCodec{})) + + lis.Close() + <-done // ensure Serve's error return (if any) is caught by this test + }) + t.Run("websocket", func(t *testing.T) { + ctx := context.Background() + done := make(chan struct{}) + + ha := testHandlerA{t: t} + upgrader := websocket.Upgrader{ReadBufferSize: 1024, WriteBufferSize: 1024} + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { t.Fatal(err) } - } - }() + defer c.Close() + jc := jsonrpc2.NewConn(r.Context(), websocketjsonrpc2.NewObjectStream(c), &ha) + <-jc.DisconnectNotify() + close(done) + })) + defer s.Close() - ha := testHandlerA{t: t} - go func() { - if err := Serve(ctx, lis, &ha); err != nil { - if !strings.HasSuffix(err.Error(), "use of closed network connection") { - t.Error(err) - } + c, _, err := websocket.DefaultDialer.Dial(strings.Replace(s.URL, "http:", "ws:", 1), nil) + if err != nil { + t.Fatal(err) } - close(done) - }() + defer c.Close() + testClientServer(ctx, t, websocketjsonrpc2.NewObjectStream(c)) - conn, err := net.Dial("tcp", lis.Addr().String()) - if err != nil { - t.Fatal("Dial:", err) - } + <-done // keep the test running until the WebSocket disconnects (to avoid missing errors) + }) +} + +func testClientServer(ctx context.Context, t *testing.T, stream jsonrpc2.ObjectStream) { hb := testHandlerB{t: t} - cc := NewConn(ctx, conn, &hb) + cc := jsonrpc2.NewConn(ctx, stream, &hb) defer func() { if err := cc.Close(); err != nil { t.Fatal(err) @@ -164,14 +205,11 @@ func TestClientServer(t *testing.T) { t.Errorf("testHandlerB got %d notifications, want %d", len(hb.got), n) } hb.mu.Unlock() - - lis.Close() - <-done // ensure Serve's error return (if any) is caught by this test } type noopHandler struct{} -func (noopHandler) Handle(ctx context.Context, conn *Conn, req *Request) {} +func (noopHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {} type readWriteCloser struct { read, write func(p []byte) (n int, err error) @@ -192,7 +230,7 @@ func eof(p []byte) (n int, err error) { } func TestConn_DisconnectNotify_EOF(t *testing.T) { - c := NewConn(context.Background(), &readWriteCloser{eof, eof}, nil) + c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewBufferedStream(&readWriteCloser{eof, eof}, jsonrpc2.VarintObjectCodec{}), nil) select { case <-c.DisconnectNotify(): case <-time.After(200 * time.Millisecond): @@ -201,7 +239,7 @@ func TestConn_DisconnectNotify_EOF(t *testing.T) { } func TestConn_DisconnectNotify_Close(t *testing.T) { - c := NewConn(context.Background(), &readWriteCloser{eof, eof}, nil) + c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewBufferedStream(&readWriteCloser{eof, eof}, jsonrpc2.VarintObjectCodec{}), nil) if err := c.Close(); err != nil { t.Error(err) } @@ -214,9 +252,9 @@ func TestConn_DisconnectNotify_Close(t *testing.T) { func TestConn_DisconnectNotify_Close_async(t *testing.T) { done := make(chan struct{}) - c := NewConn(context.Background(), &readWriteCloser{eof, eof}, nil) + c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewBufferedStream(&readWriteCloser{eof, eof}, jsonrpc2.VarintObjectCodec{}), nil) go func() { - if err := c.Close(); err != nil && err != ErrClosed { + if err := c.Close(); err != nil && err != jsonrpc2.ErrClosed { t.Error(err) } close(done) @@ -230,15 +268,15 @@ func TestConn_DisconnectNotify_Close_async(t *testing.T) { } func TestConn_Close_waitingForResponse(t *testing.T) { - c := NewConn(context.Background(), &readWriteCloser{eof, eof}, noopHandler{}) + c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewBufferedStream(&readWriteCloser{eof, eof}, jsonrpc2.VarintObjectCodec{}), noopHandler{}) done := make(chan struct{}) go func() { - if err := c.Call(context.Background(), "m", nil, nil); err != ErrClosed { - t.Errorf("got error %v, want %v", err, ErrClosed) + if err := c.Call(context.Background(), "m", nil, nil); err != jsonrpc2.ErrClosed { + t.Errorf("got error %v, want %v", err, jsonrpc2.ErrClosed) } close(done) }() - if err := c.Close(); err != nil && err != ErrClosed { + if err := c.Close(); err != nil && err != jsonrpc2.ErrClosed { t.Error(err) } select { @@ -249,65 +287,12 @@ func TestConn_Close_waitingForResponse(t *testing.T) { <-done } -func TestAnyMessage(t *testing.T) { - tests := map[string]struct { - request, response bool - }{ - // Single messages - `{}`: {}, - `{"foo":"bar"}`: {}, - `{"method":"m"}`: {request: true}, - `{"result":123}`: {response: true}, - `{"error":{"code":456,"message":"m"}}`: {response: true}, - } - for s, want := range tests { - var m anyMessage - json.Unmarshal([]byte(s), &m) - if (m.request != nil) != want.request { - t.Errorf("%s: got request %v, want %v", s, m.request != nil, want.request) - } - if (m.response != nil) != want.response { - t.Errorf("%s: got response %v, want %v", s, m.response != nil, want.response) - } - } -} - -func TestMessageCodec(t *testing.T) { - tests := []struct { - v, vempty interface{} - }{ - { - v: &Request{ID: ID{Num: 123}}, - vempty: &Request{ID: ID{Num: 123}}, - }, - { - v: &Response{ID: ID{Num: 123}}, - vempty: &Response{ID: ID{Num: 123}}, - }, - } - for _, test := range tests { - b, err := json.Marshal(test.v) +func serve(ctx context.Context, lis net.Listener, h jsonrpc2.Handler, opt ...jsonrpc2.ConnOpt) error { + for { + conn, err := lis.Accept() if err != nil { - t.Fatal(err) - } - - if err := json.Unmarshal(b, test.vempty); err != nil { - t.Fatal(err) - } - - if !reflect.DeepEqual(test.vempty, test.v) { - t.Errorf("got %+v, want %+v", test.vempty, test.v) + return err } - } -} - -func TestReadHeaderContentLength(t *testing.T) { - s := "Content-Type: foo\r\nContent-Length: 123\r\n\r\n{}" - n, err := readHeaderContentLength(bufio.NewReader(strings.NewReader(s))) - if err != nil { - t.Fatal(err) - } - if want := uint32(123); n != want { - t.Errorf("got %d, want %d", n, want) + jsonrpc2.NewConn(ctx, jsonrpc2.NewBufferedStream(conn, jsonrpc2.VarintObjectCodec{}), h, opt...) } } diff --git a/object_test.go b/object_test.go new file mode 100644 index 0000000..593ceb2 --- /dev/null +++ b/object_test.go @@ -0,0 +1,59 @@ +package jsonrpc2 + +import ( + "encoding/json" + "reflect" + "testing" +) + +func TestAnyMessage(t *testing.T) { + tests := map[string]struct { + request, response bool + }{ + // Single messages + `{}`: {}, + `{"foo":"bar"}`: {}, + `{"method":"m"}`: {request: true}, + `{"result":123}`: {response: true}, + `{"error":{"code":456,"message":"m"}}`: {response: true}, + } + for s, want := range tests { + var m anyMessage + json.Unmarshal([]byte(s), &m) + if (m.request != nil) != want.request { + t.Errorf("%s: got request %v, want %v", s, m.request != nil, want.request) + } + if (m.response != nil) != want.response { + t.Errorf("%s: got response %v, want %v", s, m.response != nil, want.response) + } + } +} + +func TestMessageCodec(t *testing.T) { + tests := []struct { + v, vempty interface{} + }{ + { + v: &Request{ID: ID{Num: 123}}, + vempty: &Request{ID: ID{Num: 123}}, + }, + { + v: &Response{ID: ID{Num: 123}}, + vempty: &Response{ID: ID{Num: 123}}, + }, + } + for _, test := range tests { + b, err := json.Marshal(test.v) + if err != nil { + t.Fatal(err) + } + + if err := json.Unmarshal(b, test.vempty); err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(test.vempty, test.v) { + t.Errorf("got %+v, want %+v", test.vempty, test.v) + } + } +} diff --git a/stream.go b/stream.go new file mode 100644 index 0000000..3095592 --- /dev/null +++ b/stream.go @@ -0,0 +1,162 @@ +package jsonrpc2 + +import ( + "bufio" + "encoding/binary" + "encoding/json" + "fmt" + "io" + "strconv" + "strings" +) + +// An ObjectStream is a bidirectional stream of JSON-RPC 2.0 objects. +type ObjectStream interface { + // WriteObject writes a JSON-RPC 2.0 object to the stream. + WriteObject(obj interface{}) error + + // ReadObject reads the next JSON-RPC 2.0 object from the stream + // and stores it in the value pointed to by v. + ReadObject(v interface{}) error + + io.Closer +} + +// A bufferedObjectStream is an ObjectStream that uses a buffered +// io.ReadWriteCloser to send and receive objects. +type bufferedObjectStream struct { + conn io.Closer // all writes should go through w, all reads through r + w *bufio.Writer + r *bufio.Reader + + codec ObjectCodec +} + +// NewBufferedStream creates a buffered stream from a network +// connection (or other similar interface). The underlying +// objectStream is used to produce the bytes to write to the stream +// for the JSON-RPC 2.0 objects. +func NewBufferedStream(conn io.ReadWriteCloser, codec ObjectCodec) ObjectStream { + return bufferedObjectStream{ + conn: conn, + w: bufio.NewWriter(conn), + r: bufio.NewReader(conn), + codec: codec, + } +} + +// WriteObject implements ObjectStream. +func (t bufferedObjectStream) WriteObject(obj interface{}) error { + if err := t.codec.WriteObject(t.w, obj); err != nil { + return err + } + return t.w.Flush() +} + +// ReadObject implements ObjectStream. +func (t bufferedObjectStream) ReadObject(v interface{}) error { + return t.codec.ReadObject(t.r, v) +} + +// Close implements ObjectStream. +func (t bufferedObjectStream) Close() error { + return t.conn.Close() +} + +// An ObjectCodec specifies how to encoed and decode a JSON-RPC 2.0 +// object in a stream. +type ObjectCodec interface { + // WriteObject writes a JSON-RPC 2.0 object to the stream. + WriteObject(stream io.Writer, obj interface{}) error + + // ReadObject reads the next JSON-RPC 2.0 object from the stream + // and stores it in the value pointed to by v. + ReadObject(stream *bufio.Reader, v interface{}) error +} + +// VarintObjectCodec reads/writes JSON-RPC 2.0 objects with a varint +// header that encodes the byte length. +type VarintObjectCodec struct{} + +// WriteObject implements ObjectCodec. +func (VarintObjectCodec) WriteObject(stream io.Writer, obj interface{}) error { + data, err := json.Marshal(obj) + if err != nil { + return err + } + var buf [binary.MaxVarintLen64]byte + b := binary.PutUvarint(buf[:], uint64(len(data))) + if _, err := stream.Write(buf[:b]); err != nil { + return err + } + if _, err := stream.Write(data); err != nil { + return err + } + return nil +} + +// ReadObject implements ObjectCodec. +func (VarintObjectCodec) ReadObject(stream *bufio.Reader, v interface{}) error { + b, err := binary.ReadUvarint(stream) + if err != nil { + return err + } + return json.NewDecoder(io.LimitReader(stream, int64(b))).Decode(v) +} + +// VSCodeObjectCodec reads/writes JSON-RPC 2.0 objects with +// Content-Length and Content-Type headers, as specified by +// https://github.com/Microsoft/language-server-protocol/blob/master/protocol.md#base-protocol. +type VSCodeObjectCodec struct{} + +// WriteObject implements ObjectCodec. +func (VSCodeObjectCodec) WriteObject(stream io.Writer, obj interface{}) error { + data, err := json.Marshal(obj) + if err != nil { + return err + } + if _, err := fmt.Fprintf(stream, "Content-Length: %d\r\n", len(data)); err != nil { + return err + } + if _, err := fmt.Fprint(stream, "Content-Type: application/vscode-jsonrpc; charset=utf8\r\n\r\n"); err != nil { + return err + } + if _, err := stream.Write(data); err != nil { + return err + } + return nil +} + +// ReadObject implements ObjectCodec. +func (VSCodeObjectCodec) ReadObject(stream *bufio.Reader, v interface{}) error { + var contentLength uint64 + for { + line, err := stream.ReadString('\r') + if err != nil { + return err + } + b, err := stream.ReadByte() + if err != nil { + return err + } + if b != '\n' { + return fmt.Errorf(`jsonrpc2: line endings must be \r\n`) + } + if line == "\r" { + break + } + if strings.HasPrefix(line, "Content-Length: ") { + line = strings.TrimPrefix(line, "Content-Length: ") + line = strings.TrimSpace(line) + var err error + contentLength, err = strconv.ParseUint(line, 10, 32) + if err != nil { + return err + } + } + } + if contentLength == 0 { + return fmt.Errorf("jsonrpc2: no Content-Length header found") + } + return json.NewDecoder(io.LimitReader(stream, int64(contentLength))).Decode(v) +} diff --git a/websocket/stream.go b/websocket/stream.go new file mode 100644 index 0000000..26313a0 --- /dev/null +++ b/websocket/stream.go @@ -0,0 +1,44 @@ +// Package websocket provides WebSocket transport support for JSON-RPC +// 2.0. +package websocket + +import ( + "io" + + "github.com/gorilla/websocket" +) + +// A ObjectStream is a jsonrpc2.ObjectStream that uses a WebSocket to +// send and receive JSON-RPC 2.0 objects. +type ObjectStream struct { + conn *websocket.Conn +} + +// NewObjectStream creates a new jsonrpc2.ObjectStream for sending and +// receiving JSON-RPC 2.0 objects over a WebSocket. +func NewObjectStream(conn *websocket.Conn) ObjectStream { + return ObjectStream{conn: conn} +} + +// WriteObject implements jsonrpc2.ObjectStream. +func (t ObjectStream) WriteObject(obj interface{}) error { + return t.conn.WriteJSON(obj) +} + +// ReadObject implements jsonrpc2.ObjectStream. +func (t ObjectStream) ReadObject(v interface{}) error { + err := t.conn.ReadJSON(v) + if e, ok := err.(*websocket.CloseError); ok { + if e.Code == websocket.CloseAbnormalClosure && e.Text == io.ErrUnexpectedEOF.Error() { + // Suppress a noisy (but harmless) log message by + // unwrapping this error. + err = io.ErrUnexpectedEOF + } + } + return err +} + +// Close implements jsonrpc2.ObjectStream. +func (t ObjectStream) Close() error { + return t.conn.Close() +}