mirror of
https://github.com/sourcegraph/jsonrpc2.git
synced 2026-06-23 03:03:40 +02:00
conn: do not lock sending when closing (#70)
The sending mutex may be blocked due to the underlying conn blocking. If that happens then we can't call close since close also attempts to hold the sending mutex. Sending mutex is only used for serializing sends and doesn't protect the fields close reads/writes. I believe we introduced locking it so we would return ErrClosed. This commit instead introduces a check in send which rechecks if we have since closed while attempting to send. Test Plan: expanded the test coverage
This commit is contained in:
parent
040dc22f8a
commit
5d80b29f44
2 changed files with 81 additions and 33 deletions
13
conn.go
13
conn.go
|
|
@ -166,9 +166,7 @@ func (c *Conn) SendResponse(ctx context.Context, resp *Response) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) close(cause error) error {
|
func (c *Conn) close(cause error) error {
|
||||||
c.sending.Lock()
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.sending.Unlock()
|
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
if c.closed {
|
if c.closed {
|
||||||
|
|
@ -249,6 +247,17 @@ func (c *Conn) send(_ context.Context, m *anyMessage, wait bool) (cc *call, err
|
||||||
c.sending.Lock()
|
c.sending.Lock()
|
||||||
defer c.sending.Unlock()
|
defer c.sending.Unlock()
|
||||||
|
|
||||||
|
// double check the error isn't due to being closed while sending.
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
c.mu.Lock()
|
||||||
|
if c.closed {
|
||||||
|
err = ErrClosed
|
||||||
|
}
|
||||||
|
c.mu.Unlock()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// m.request.ID could be changed, so we store a copy to correctly
|
// m.request.ID could be changed, so we store a copy to correctly
|
||||||
// clean up pending
|
// clean up pending
|
||||||
var id ID
|
var id ID
|
||||||
|
|
|
||||||
101
conn_test.go
101
conn_test.go
|
|
@ -118,38 +118,77 @@ func TestConn_DisconnectNotify(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConn_Close(t *testing.T) {
|
func TestConn_Close(t *testing.T) {
|
||||||
t.Run("waiting for response", func(t *testing.T) {
|
cases := []struct {
|
||||||
connA, connB := net.Pipe()
|
name string
|
||||||
nodeA := jsonrpc2.NewConn(
|
run func(*testing.T, context.Context, *jsonrpc2.Conn)
|
||||||
context.Background(),
|
}{{
|
||||||
jsonrpc2.NewPlainObjectStream(connA), noopHandler{},
|
name: "during Call",
|
||||||
)
|
run: func(t *testing.T, ctx context.Context, conn *jsonrpc2.Conn) {
|
||||||
defer nodeA.Close()
|
ready := make(chan struct{})
|
||||||
nodeB := jsonrpc2.NewConn(
|
done := make(chan struct{})
|
||||||
context.Background(),
|
go func() {
|
||||||
jsonrpc2.NewPlainObjectStream(connB),
|
close(ready)
|
||||||
noopHandler{},
|
err := conn.Call(ctx, "m", nil, nil)
|
||||||
)
|
if err != jsonrpc2.ErrClosed {
|
||||||
defer nodeB.Close()
|
t.Errorf("got error %v, want %v", err, jsonrpc2.ErrClosed)
|
||||||
|
}
|
||||||
ready := make(chan struct{})
|
close(done)
|
||||||
done := make(chan struct{})
|
}()
|
||||||
go func() {
|
// Wait for the request to be sent before we close the connection.
|
||||||
close(ready)
|
<-ready
|
||||||
err := nodeB.Call(context.Background(), "m", nil, nil)
|
if err := conn.Close(); err != nil && err != jsonrpc2.ErrClosed {
|
||||||
if err != jsonrpc2.ErrClosed {
|
t.Error(err)
|
||||||
t.Errorf("got error %v, want %v", err, jsonrpc2.ErrClosed)
|
|
||||||
}
|
}
|
||||||
close(done)
|
<-done
|
||||||
}()
|
},
|
||||||
// Wait for the request to be sent before we close the connection.
|
}, {
|
||||||
<-ready
|
name: "during Wait",
|
||||||
if err := nodeB.Close(); err != nil && err != jsonrpc2.ErrClosed {
|
run: func(t *testing.T, ctx context.Context, conn *jsonrpc2.Conn) {
|
||||||
t.Error(err)
|
call, err := conn.DispatchCall(ctx, "m", nil, nil)
|
||||||
}
|
if err != nil {
|
||||||
assertDisconnect(t, nodeB, connB)
|
t.Fatal(err)
|
||||||
<-done
|
}
|
||||||
})
|
if err := conn.Close(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := call.Wait(ctx, nil); err != jsonrpc2.ErrClosed {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
name: "during Dispatch",
|
||||||
|
run: func(t *testing.T, ctx context.Context, conn *jsonrpc2.Conn) {
|
||||||
|
if err := conn.Close(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if _, err := conn.DispatchCall(ctx, "m", nil, nil); err != jsonrpc2.ErrClosed {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
connA, connB := net.Pipe()
|
||||||
|
nodeA := jsonrpc2.NewConn(
|
||||||
|
ctx,
|
||||||
|
jsonrpc2.NewPlainObjectStream(connA), noopHandler{},
|
||||||
|
)
|
||||||
|
defer nodeA.Close()
|
||||||
|
nodeB := jsonrpc2.NewConn(
|
||||||
|
ctx,
|
||||||
|
jsonrpc2.NewPlainObjectStream(connB),
|
||||||
|
noopHandler{},
|
||||||
|
)
|
||||||
|
defer nodeB.Close()
|
||||||
|
|
||||||
|
tc.run(t, ctx, nodeB)
|
||||||
|
|
||||||
|
assertDisconnect(t, nodeB, connB)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testParams(t *testing.T, want *json.RawMessage, fn func(c *jsonrpc2.Conn) error) {
|
func testParams(t *testing.T, want *json.RawMessage, fn func(c *jsonrpc2.Conn) error) {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue