diff --git a/async.go b/async.go new file mode 100644 index 0000000..4873a9e --- /dev/null +++ b/async.go @@ -0,0 +1,17 @@ +package jsonrpc2 + +import "context" + +// AsyncHandler wraps a Handler such that it each request is handled in its +// own goroutine. It is a convenience wrapper. +func AsyncHandler(h Handler) Handler { + return asyncHandler{h} +} + +type asyncHandler struct { + Handler +} + +func (h asyncHandler) Handle(ctx context.Context, conn *Conn, req *Request) { + go h.Handler.Handle(ctx, conn, req) +} diff --git a/jsonrpc2.go b/jsonrpc2.go index 45c367e..fb95f7a 100644 --- a/jsonrpc2.go +++ b/jsonrpc2.go @@ -226,7 +226,10 @@ const ( // Handler handles JSON-RPC requests and notifications. type Handler interface { - // Handle is called to handle a request. + // Handle is called to handle a request. No other requests are handled + // until it returns. If you do not require strict ordering behaviour + // of received RPCs, it is suggested to wrap your handler in + // AsyncHandler. Handle(context.Context, *Conn, *Request) } @@ -498,7 +501,7 @@ func (c *Conn) readMessages(ctx context.Context) { if c.onRecv != nil { c.onRecv(m.request, nil) } - go c.h.Handle(ctx, c, m.request) + c.h.Handle(ctx, c, m.request) case m.response != nil: resp := m.response diff --git a/jsonrpc2_test.go b/jsonrpc2_test.go index 6bcf2c2..795f5d3 100644 --- a/jsonrpc2_test.go +++ b/jsonrpc2_test.go @@ -202,10 +202,18 @@ func testClientServer(ctx context.Context, t *testing.T, stream jsonrpc2.ObjectS } time.Sleep(100 * time.Millisecond) hb.mu.Lock() - if len(hb.got) != n { + got := hb.got + hb.mu.Unlock() + if len(got) != n { t.Errorf("testHandlerB got %d notifications, want %d", len(hb.got), n) } - hb.mu.Unlock() + // Ensure messages are in order since we are not using the async handler. + for i, s := range got { + want := fmt.Sprintf(`"notif for #%d"`, i) + if s != want { + t.Fatalf("out of order response. got %q, want %q", s, want) + } + } } func inMemoryPeerConns() (io.ReadWriteCloser, io.ReadWriteCloser) { @@ -278,6 +286,49 @@ func TestPickID(t *testing.T) { } } +func TestHandlerBlocking(t *testing.T) { + // We send N notifications with an increasing parameter. Since the + // handler is blocking, we expect to process the notifications in the + // order they are sent. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + a, b := inMemoryPeerConns() + defer a.Close() + defer b.Close() + + var wg sync.WaitGroup + var params []int + handler := handlerFunc(func(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { + var i int + _ = json.Unmarshal(*req.Params, &i) + // don't need to synchronize access to ids since we should be blocking + params = append(params, i) + wg.Done() + }) + connA := jsonrpc2.NewConn(ctx, jsonrpc2.NewBufferedStream(a, jsonrpc2.VSCodeObjectCodec{}), handler) + connB := jsonrpc2.NewConn(ctx, jsonrpc2.NewBufferedStream(b, jsonrpc2.VSCodeObjectCodec{}), noopHandler{}) + defer connA.Close() + defer connB.Close() + + const n = 100 + for i := 0; i < n; i++ { + wg.Add(1) + if err := connB.Notify(ctx, "f", i); err != nil { + t.Fatal(err) + } + } + wg.Wait() + if len(params) < n { + t.Fatalf("want %d params, got %d", n, len(params)) + } + for want, got := range params { + if want != got { + t.Fatalf("want param %d, got %d", want, got) + } + } +} + type noopHandler struct{} func (noopHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {}