mirror of
https://github.com/sourcegraph/jsonrpc2.git
synced 2026-06-16 04:04:56 +02:00
Handle is blocking (#12)
NOTE: This is a breaking change to the expected contract of Handler. Please update your implementation to use AsyncHandler if needs be. We have strict ordering requirements of how we handle FileSystem requests in LSP. As such relying on the ordering the goroutine scheduler runs requests in leads to potential out of order mutations to the FS. As such we update the jsonrpc2 implementation to by default block until Handle returns (note it can still respond to the request at a later stage). For more simple use cases we provide the AsyncHandler which will work like the previous implementation. * Ensure handle is blocking
This commit is contained in:
parent
277d2464cf
commit
3a7c446248
3 changed files with 75 additions and 4 deletions
17
async.go
Normal file
17
async.go
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
package jsonrpc2
|
||||
|
||||
import "context"
|
||||
|
||||
// AsyncHandler wraps a Handler such that it each request is handled in its
|
||||
// own goroutine. It is a convenience wrapper.
|
||||
func AsyncHandler(h Handler) Handler {
|
||||
return asyncHandler{h}
|
||||
}
|
||||
|
||||
type asyncHandler struct {
|
||||
Handler
|
||||
}
|
||||
|
||||
func (h asyncHandler) Handle(ctx context.Context, conn *Conn, req *Request) {
|
||||
go h.Handler.Handle(ctx, conn, req)
|
||||
}
|
||||
|
|
@ -226,7 +226,10 @@ const (
|
|||
|
||||
// Handler handles JSON-RPC requests and notifications.
|
||||
type Handler interface {
|
||||
// Handle is called to handle a request.
|
||||
// Handle is called to handle a request. No other requests are handled
|
||||
// until it returns. If you do not require strict ordering behaviour
|
||||
// of received RPCs, it is suggested to wrap your handler in
|
||||
// AsyncHandler.
|
||||
Handle(context.Context, *Conn, *Request)
|
||||
}
|
||||
|
||||
|
|
@ -498,7 +501,7 @@ func (c *Conn) readMessages(ctx context.Context) {
|
|||
if c.onRecv != nil {
|
||||
c.onRecv(m.request, nil)
|
||||
}
|
||||
go c.h.Handle(ctx, c, m.request)
|
||||
c.h.Handle(ctx, c, m.request)
|
||||
|
||||
case m.response != nil:
|
||||
resp := m.response
|
||||
|
|
|
|||
|
|
@ -202,10 +202,18 @@ func testClientServer(ctx context.Context, t *testing.T, stream jsonrpc2.ObjectS
|
|||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
hb.mu.Lock()
|
||||
if len(hb.got) != n {
|
||||
got := hb.got
|
||||
hb.mu.Unlock()
|
||||
if len(got) != n {
|
||||
t.Errorf("testHandlerB got %d notifications, want %d", len(hb.got), n)
|
||||
}
|
||||
hb.mu.Unlock()
|
||||
// Ensure messages are in order since we are not using the async handler.
|
||||
for i, s := range got {
|
||||
want := fmt.Sprintf(`"notif for #%d"`, i)
|
||||
if s != want {
|
||||
t.Fatalf("out of order response. got %q, want %q", s, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func inMemoryPeerConns() (io.ReadWriteCloser, io.ReadWriteCloser) {
|
||||
|
|
@ -278,6 +286,49 @@ func TestPickID(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHandlerBlocking(t *testing.T) {
|
||||
// We send N notifications with an increasing parameter. Since the
|
||||
// handler is blocking, we expect to process the notifications in the
|
||||
// order they are sent.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
a, b := inMemoryPeerConns()
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var params []int
|
||||
handler := handlerFunc(func(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
|
||||
var i int
|
||||
_ = json.Unmarshal(*req.Params, &i)
|
||||
// don't need to synchronize access to ids since we should be blocking
|
||||
params = append(params, i)
|
||||
wg.Done()
|
||||
})
|
||||
connA := jsonrpc2.NewConn(ctx, jsonrpc2.NewBufferedStream(a, jsonrpc2.VSCodeObjectCodec{}), handler)
|
||||
connB := jsonrpc2.NewConn(ctx, jsonrpc2.NewBufferedStream(b, jsonrpc2.VSCodeObjectCodec{}), noopHandler{})
|
||||
defer connA.Close()
|
||||
defer connB.Close()
|
||||
|
||||
const n = 100
|
||||
for i := 0; i < n; i++ {
|
||||
wg.Add(1)
|
||||
if err := connB.Notify(ctx, "f", i); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
if len(params) < n {
|
||||
t.Fatalf("want %d params, got %d", n, len(params))
|
||||
}
|
||||
for want, got := range params {
|
||||
if want != got {
|
||||
t.Fatalf("want param %d, got %d", want, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type noopHandler struct{}
|
||||
|
||||
func (noopHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue