From 3a7c446248199a2abc2dff3cf97bb4f3c0028e5f Mon Sep 17 00:00:00 2001 From: Keegan Carruthers-Smith Date: Tue, 21 Feb 2017 14:25:50 +0200 Subject: [PATCH] Handle is blocking (#12) NOTE: This is a breaking change to the expected contract of Handler. Please update your implementation to use AsyncHandler if needs be. We have strict ordering requirements of how we handle FileSystem requests in LSP. As such relying on the ordering the goroutine scheduler runs requests in leads to potential out of order mutations to the FS. As such we update the jsonrpc2 implementation to by default block until Handle returns (note it can still respond to the request at a later stage). For more simple use cases we provide the AsyncHandler which will work like the previous implementation. * Ensure handle is blocking --- async.go | 17 +++++++++++++++ jsonrpc2.go | 7 ++++-- jsonrpc2_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 75 insertions(+), 4 deletions(-) create mode 100644 async.go diff --git a/async.go b/async.go new file mode 100644 index 0000000..4873a9e --- /dev/null +++ b/async.go @@ -0,0 +1,17 @@ +package jsonrpc2 + +import "context" + +// AsyncHandler wraps a Handler such that it each request is handled in its +// own goroutine. It is a convenience wrapper. +func AsyncHandler(h Handler) Handler { + return asyncHandler{h} +} + +type asyncHandler struct { + Handler +} + +func (h asyncHandler) Handle(ctx context.Context, conn *Conn, req *Request) { + go h.Handler.Handle(ctx, conn, req) +} diff --git a/jsonrpc2.go b/jsonrpc2.go index 45c367e..fb95f7a 100644 --- a/jsonrpc2.go +++ b/jsonrpc2.go @@ -226,7 +226,10 @@ const ( // Handler handles JSON-RPC requests and notifications. type Handler interface { - // Handle is called to handle a request. + // Handle is called to handle a request. No other requests are handled + // until it returns. If you do not require strict ordering behaviour + // of received RPCs, it is suggested to wrap your handler in + // AsyncHandler. Handle(context.Context, *Conn, *Request) } @@ -498,7 +501,7 @@ func (c *Conn) readMessages(ctx context.Context) { if c.onRecv != nil { c.onRecv(m.request, nil) } - go c.h.Handle(ctx, c, m.request) + c.h.Handle(ctx, c, m.request) case m.response != nil: resp := m.response diff --git a/jsonrpc2_test.go b/jsonrpc2_test.go index 6bcf2c2..795f5d3 100644 --- a/jsonrpc2_test.go +++ b/jsonrpc2_test.go @@ -202,10 +202,18 @@ func testClientServer(ctx context.Context, t *testing.T, stream jsonrpc2.ObjectS } time.Sleep(100 * time.Millisecond) hb.mu.Lock() - if len(hb.got) != n { + got := hb.got + hb.mu.Unlock() + if len(got) != n { t.Errorf("testHandlerB got %d notifications, want %d", len(hb.got), n) } - hb.mu.Unlock() + // Ensure messages are in order since we are not using the async handler. + for i, s := range got { + want := fmt.Sprintf(`"notif for #%d"`, i) + if s != want { + t.Fatalf("out of order response. got %q, want %q", s, want) + } + } } func inMemoryPeerConns() (io.ReadWriteCloser, io.ReadWriteCloser) { @@ -278,6 +286,49 @@ func TestPickID(t *testing.T) { } } +func TestHandlerBlocking(t *testing.T) { + // We send N notifications with an increasing parameter. Since the + // handler is blocking, we expect to process the notifications in the + // order they are sent. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + a, b := inMemoryPeerConns() + defer a.Close() + defer b.Close() + + var wg sync.WaitGroup + var params []int + handler := handlerFunc(func(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { + var i int + _ = json.Unmarshal(*req.Params, &i) + // don't need to synchronize access to ids since we should be blocking + params = append(params, i) + wg.Done() + }) + connA := jsonrpc2.NewConn(ctx, jsonrpc2.NewBufferedStream(a, jsonrpc2.VSCodeObjectCodec{}), handler) + connB := jsonrpc2.NewConn(ctx, jsonrpc2.NewBufferedStream(b, jsonrpc2.VSCodeObjectCodec{}), noopHandler{}) + defer connA.Close() + defer connB.Close() + + const n = 100 + for i := 0; i < n; i++ { + wg.Add(1) + if err := connB.Notify(ctx, "f", i); err != nil { + t.Fatal(err) + } + } + wg.Wait() + if len(params) < n { + t.Fatalf("want %d params, got %d", n, len(params)) + } + for want, got := range params { + if want != got { + t.Fatalf("want param %d, got %d", want, got) + } + } +} + type noopHandler struct{} func (noopHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {}