From a896fc3eac98195e9f144050320d8d485564de45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Nowotnik?= Date: Mon, 11 Jul 2022 15:43:39 +0200 Subject: [PATCH] [#57] Fix and deprecate PlainObjectCodec (#58) This change fixes a bug that causes PlainObjectCodec to lose additional messages from stream. json.Decoder has an internal buffer that reads more than one message, but is discarded after every use. Now PlainObjectCodec reuses encoder and decoder within a buffered stream, however using it directly in your code retains the old, incorrect behaviour. A user should now use plainObjectStream if he needs plain JSON-RPC 2.0 stream without headers. `NewPlainObjectStream` method has been added for this reason. --- jsonrpc2_test.go | 101 +++++++++++++++++++++++++++++------------------ stream.go | 55 ++++++++++++++++++++++++-- 2 files changed, 114 insertions(+), 42 deletions(-) diff --git a/jsonrpc2_test.go b/jsonrpc2_test.go index f9dd950..ca600db 100644 --- a/jsonrpc2_test.go +++ b/jsonrpc2_test.go @@ -112,44 +112,67 @@ func (h *testHandlerB) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jso h.t.Errorf("testHandlerB got unexpected request %+v", req) } +type streamMaker func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream + +func testClientServerForCodec(t *testing.T, streamMaker streamMaker) { + ctx := context.Background() + done := make(chan struct{}) + + lis, err := net.Listen("tcp", "127.0.0.1:0") // any available address + if err != nil { + t.Fatal("Listen:", err) + } + defer func() { + if lis == nil { + return // already closed + } + if err = lis.Close(); err != nil { + if !strings.HasSuffix(err.Error(), "use of closed network connection") { + t.Fatal(err) + } + } + }() + + ha := testHandlerA{t: t} + go func() { + if err = serve(ctx, lis, &ha, streamMaker); err != nil { + if !strings.HasSuffix(err.Error(), "use of closed network connection") { + t.Error(err) + } + } + close(done) + }() + + conn, err := net.Dial("tcp", lis.Addr().String()) + if err != nil { + t.Fatal("Dial:", err) + } + testClientServer(ctx, t, streamMaker(conn)) + + lis.Close() + <-done // ensure Serve's error return (if any) is caught by this test +} + func TestClientServer(t *testing.T) { - t.Run("tcp", func(t *testing.T) { - ctx := context.Background() - done := make(chan struct{}) - - lis, err := net.Listen("tcp", "127.0.0.1:0") // any available address - if err != nil { - t.Fatal("Listen:", err) - } - defer func() { - if lis == nil { - return // already closed - } - if err = lis.Close(); err != nil { - if !strings.HasSuffix(err.Error(), "use of closed network connection") { - t.Fatal(err) - } - } - }() - - ha := testHandlerA{t: t} - go func() { - if err = serve(ctx, lis, &ha); err != nil { - if !strings.HasSuffix(err.Error(), "use of closed network connection") { - t.Error(err) - } - } - close(done) - }() - - conn, err := net.Dial("tcp", lis.Addr().String()) - if err != nil { - t.Fatal("Dial:", err) - } - testClientServer(ctx, t, jsonrpc2.NewBufferedStream(conn, jsonrpc2.VarintObjectCodec{})) - - lis.Close() - <-done // ensure Serve's error return (if any) is caught by this test + t.Run("tcp-varint-object-codec", func(t *testing.T) { + testClientServerForCodec(t, func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream { + return jsonrpc2.NewBufferedStream(conn, jsonrpc2.VarintObjectCodec{}) + }) + }) + t.Run("tcp-vscode-object-codec", func(t *testing.T) { + testClientServerForCodec(t, func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream { + return jsonrpc2.NewBufferedStream(conn, jsonrpc2.VSCodeObjectCodec{}) + }) + }) + t.Run("tcp-plain-object-codec", func(t *testing.T) { + testClientServerForCodec(t, func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream { + return jsonrpc2.NewBufferedStream(conn, jsonrpc2.PlainObjectCodec{}) + }) + }) + t.Run("tcp-plain-object-stream", func(t *testing.T) { + testClientServerForCodec(t, func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream { + return jsonrpc2.NewPlainObjectStream(conn) + }) }) t.Run("websocket", func(t *testing.T) { ctx := context.Background() @@ -367,12 +390,12 @@ func TestConn_Close_waitingForResponse(t *testing.T) { <-done } -func serve(ctx context.Context, lis net.Listener, h jsonrpc2.Handler, opts ...jsonrpc2.ConnOpt) error { +func serve(ctx context.Context, lis net.Listener, h jsonrpc2.Handler, streamMaker streamMaker, opts ...jsonrpc2.ConnOpt) error { for { conn, err := lis.Accept() if err != nil { return err } - jsonrpc2.NewConn(ctx, jsonrpc2.NewBufferedStream(conn, jsonrpc2.VarintObjectCodec{}), h, opts...) + jsonrpc2.NewConn(ctx, streamMaker(conn), h, opts...) } } diff --git a/stream.go b/stream.go index 7023c65..ff24d0f 100644 --- a/stream.go +++ b/stream.go @@ -40,6 +40,12 @@ type bufferedObjectStream struct { // objectStream is used to produce the bytes to write to the stream // for the JSON-RPC 2.0 objects. func NewBufferedStream(conn io.ReadWriteCloser, codec ObjectCodec) ObjectStream { + switch v := codec.(type) { + case PlainObjectCodec: + v.decoder = json.NewDecoder(conn) + v.encoder = json.NewEncoder(conn) + codec = v + } return &bufferedObjectStream{ conn: conn, w: bufio.NewWriter(conn), @@ -164,14 +170,57 @@ func (VSCodeObjectCodec) ReadObject(stream *bufio.Reader, v interface{}) error { } // PlainObjectCodec reads/writes plain JSON-RPC 2.0 objects without a header. -type PlainObjectCodec struct{} +// +// Deprecated: use NewPlainObjectStream +type PlainObjectCodec struct { + decoder *json.Decoder + encoder *json.Encoder +} // WriteObject implements ObjectCodec. -func (PlainObjectCodec) WriteObject(stream io.Writer, v interface{}) error { +func (c PlainObjectCodec) WriteObject(stream io.Writer, v interface{}) error { + if c.encoder != nil { + return c.encoder.Encode(v) + } return json.NewEncoder(stream).Encode(v) } // ReadObject implements ObjectCodec. -func (PlainObjectCodec) ReadObject(stream *bufio.Reader, v interface{}) error { +func (c PlainObjectCodec) ReadObject(stream *bufio.Reader, v interface{}) error { + if c.decoder != nil { + return c.decoder.Decode(v) + } return json.NewDecoder(stream).Decode(v) } + +// plainObjectStream reads/writes plain JSON-RPC 2.0 objects without a header. +type plainObjectStream struct { + conn io.Closer + decoder *json.Decoder + encoder *json.Encoder +} + +// NewPlainObjectStream creates a buffered stream from a network +// connection (or other similar interface). The underlying +// objectStream produces plain JSON-RPC 2.0 objects without a header. +func NewPlainObjectStream(conn io.ReadWriteCloser) ObjectStream { + return &plainObjectStream{ + conn: conn, + encoder: json.NewEncoder(conn), + decoder: json.NewDecoder(conn), + } +} + +func (os *plainObjectStream) ReadObject(v interface{}) error { + return os.decoder.Decode(v) +} + +// WriteObject serializes a value to JSON and writes it to a stream. +// Not thread-safe, a user must synchronize writes in a multithreaded environment. +func (os *plainObjectStream) WriteObject(v interface{}) error { + return os.encoder.Encode(v) +} + +func (os *plainObjectStream) Close() error { + return os.conn.Close() +}