1
0
Fork 0
mirror of https://github.com/sourcegraph/jsonrpc2.git synced 2026-06-16 04:04:56 +02:00

[#57] Fix and deprecate PlainObjectCodec

This change fixes a bug that causes PlainObjectCodec to
lose additional messages from stream. json.Decoder has
an internal buffer that reads more than one message, but
is discarded after every use. Now PlainObjectCodec reuses
encoder and decoder within a buffered stream, however
using it directly in your code retains the old, incorrect
behaviour.

A user should now use plainObjectStream if he needs
plain JSON-RPC 2.0 stream without headers.
`NewPlainObjectStream` method has been added for this reason.

fixes #57
This commit is contained in:
Michał Nowotnik 2022-07-08 13:38:31 +02:00
parent c9c77b6bb9
commit f498bfa550
2 changed files with 110 additions and 42 deletions

View file

@ -112,44 +112,67 @@ func (h *testHandlerB) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jso
h.t.Errorf("testHandlerB got unexpected request %+v", req)
}
type streamMaker func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream
func testClientServerForCodec(t *testing.T, streamMaker streamMaker) {
ctx := context.Background()
done := make(chan struct{})
lis, err := net.Listen("tcp", "127.0.0.1:0") // any available address
if err != nil {
t.Fatal("Listen:", err)
}
defer func() {
if lis == nil {
return // already closed
}
if err = lis.Close(); err != nil {
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Fatal(err)
}
}
}()
ha := testHandlerA{t: t}
go func() {
if err = serve(ctx, lis, &ha, streamMaker); err != nil {
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Error(err)
}
}
close(done)
}()
conn, err := net.Dial("tcp", lis.Addr().String())
if err != nil {
t.Fatal("Dial:", err)
}
testClientServer(ctx, t, streamMaker(conn))
lis.Close()
<-done // ensure Serve's error return (if any) is caught by this test
}
func TestClientServer(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
ctx := context.Background()
done := make(chan struct{})
lis, err := net.Listen("tcp", "127.0.0.1:0") // any available address
if err != nil {
t.Fatal("Listen:", err)
}
defer func() {
if lis == nil {
return // already closed
}
if err = lis.Close(); err != nil {
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Fatal(err)
}
}
}()
ha := testHandlerA{t: t}
go func() {
if err = serve(ctx, lis, &ha); err != nil {
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Error(err)
}
}
close(done)
}()
conn, err := net.Dial("tcp", lis.Addr().String())
if err != nil {
t.Fatal("Dial:", err)
}
testClientServer(ctx, t, jsonrpc2.NewBufferedStream(conn, jsonrpc2.VarintObjectCodec{}))
lis.Close()
<-done // ensure Serve's error return (if any) is caught by this test
t.Run("tcp-varint-object-codec", func(t *testing.T) {
testClientServerForCodec(t, func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream {
return jsonrpc2.NewBufferedStream(conn, jsonrpc2.VarintObjectCodec{})
})
})
t.Run("tcp-vscode-object-codec", func(t *testing.T) {
testClientServerForCodec(t, func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream {
return jsonrpc2.NewBufferedStream(conn, jsonrpc2.VSCodeObjectCodec{})
})
})
t.Run("tcp-plain-object-codec", func(t *testing.T) {
testClientServerForCodec(t, func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream {
return jsonrpc2.NewBufferedStream(conn, jsonrpc2.PlainObjectCodec{})
})
})
t.Run("tcp-plain-object-stream", func(t *testing.T) {
testClientServerForCodec(t, func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream {
return jsonrpc2.NewPlainObjectStream(conn)
})
})
t.Run("websocket", func(t *testing.T) {
ctx := context.Background()
@ -367,12 +390,12 @@ func TestConn_Close_waitingForResponse(t *testing.T) {
<-done
}
func serve(ctx context.Context, lis net.Listener, h jsonrpc2.Handler, opts ...jsonrpc2.ConnOpt) error {
func serve(ctx context.Context, lis net.Listener, h jsonrpc2.Handler, streamMaker streamMaker, opts ...jsonrpc2.ConnOpt) error {
for {
conn, err := lis.Accept()
if err != nil {
return err
}
jsonrpc2.NewConn(ctx, jsonrpc2.NewBufferedStream(conn, jsonrpc2.VarintObjectCodec{}), h, opts...)
jsonrpc2.NewConn(ctx, streamMaker(conn), h, opts...)
}
}

View file

@ -40,6 +40,12 @@ type bufferedObjectStream struct {
// objectStream is used to produce the bytes to write to the stream
// for the JSON-RPC 2.0 objects.
func NewBufferedStream(conn io.ReadWriteCloser, codec ObjectCodec) ObjectStream {
switch v := codec.(type) {
case PlainObjectCodec:
v.decoder = json.NewDecoder(conn)
v.encoder = json.NewEncoder(conn)
codec = v
}
return &bufferedObjectStream{
conn: conn,
w: bufio.NewWriter(conn),
@ -163,15 +169,54 @@ func (VSCodeObjectCodec) ReadObject(stream *bufio.Reader, v interface{}) error {
return json.NewDecoder(io.LimitReader(stream, int64(contentLength))).Decode(v)
}
// DEPRECATED: use NewPlainObjectStream
// PlainObjectCodec reads/writes plain JSON-RPC 2.0 objects without a header.
type PlainObjectCodec struct{}
type PlainObjectCodec struct {
decoder *json.Decoder
encoder *json.Encoder
}
// WriteObject implements ObjectCodec.
func (PlainObjectCodec) WriteObject(stream io.Writer, v interface{}) error {
func (c PlainObjectCodec) WriteObject(stream io.Writer, v interface{}) error {
if c.encoder != nil {
return c.encoder.Encode(v)
}
return json.NewEncoder(stream).Encode(v)
}
// ReadObject implements ObjectCodec.
func (PlainObjectCodec) ReadObject(stream *bufio.Reader, v interface{}) error {
func (c PlainObjectCodec) ReadObject(stream *bufio.Reader, v interface{}) error {
if c.decoder != nil {
return c.decoder.Decode(v)
}
return json.NewDecoder(stream).Decode(v)
}
type plainObjectStream struct {
conn io.Closer
decoder *json.Decoder
encoder *json.Encoder
mu sync.Mutex
}
// plainObjectStream reads/writes plain JSON-RPC 2.0 objects without a header.
func NewPlainObjectStream(conn io.ReadWriteCloser) ObjectStream {
os := &plainObjectStream{conn: conn}
os.encoder = json.NewEncoder(conn)
os.decoder = json.NewDecoder(conn)
return os
}
func (os *plainObjectStream) ReadObject(v interface{}) error {
return os.decoder.Decode(v)
}
func (os *plainObjectStream) WriteObject(v interface{}) error {
os.mu.Lock()
defer os.mu.Unlock()
return os.encoder.Encode(v)
}
func (os *plainObjectStream) Close() error {
return os.conn.Close()
}