mirror of
https://github.com/sourcegraph/jsonrpc2.git
synced 2026-07-04 16:23:41 +02:00
synchronize writes in BufferedObjectStream (#9)
Running various applications that use jsonrpc2 with the Go race detector shows that there is a race condition where `WriteObject` can be called from concurrent goroutines (e.g., 1 sending a request, 1 writing a response).
This commit is contained in:
parent
4f06164e9a
commit
0ad9fd8995
1 changed files with 9 additions and 4 deletions
13
stream.go
13
stream.go
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
// An ObjectStream is a bidirectional stream of JSON-RPC 2.0 objects.
|
// An ObjectStream is a bidirectional stream of JSON-RPC 2.0 objects.
|
||||||
|
|
@ -30,6 +31,8 @@ type bufferedObjectStream struct {
|
||||||
r *bufio.Reader
|
r *bufio.Reader
|
||||||
|
|
||||||
codec ObjectCodec
|
codec ObjectCodec
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBufferedStream creates a buffered stream from a network
|
// NewBufferedStream creates a buffered stream from a network
|
||||||
|
|
@ -37,7 +40,7 @@ type bufferedObjectStream struct {
|
||||||
// objectStream is used to produce the bytes to write to the stream
|
// objectStream is used to produce the bytes to write to the stream
|
||||||
// for the JSON-RPC 2.0 objects.
|
// for the JSON-RPC 2.0 objects.
|
||||||
func NewBufferedStream(conn io.ReadWriteCloser, codec ObjectCodec) ObjectStream {
|
func NewBufferedStream(conn io.ReadWriteCloser, codec ObjectCodec) ObjectStream {
|
||||||
return bufferedObjectStream{
|
return &bufferedObjectStream{
|
||||||
conn: conn,
|
conn: conn,
|
||||||
w: bufio.NewWriter(conn),
|
w: bufio.NewWriter(conn),
|
||||||
r: bufio.NewReader(conn),
|
r: bufio.NewReader(conn),
|
||||||
|
|
@ -46,7 +49,9 @@ func NewBufferedStream(conn io.ReadWriteCloser, codec ObjectCodec) ObjectStream
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteObject implements ObjectStream.
|
// WriteObject implements ObjectStream.
|
||||||
func (t bufferedObjectStream) WriteObject(obj interface{}) error {
|
func (t *bufferedObjectStream) WriteObject(obj interface{}) error {
|
||||||
|
t.mu.Lock()
|
||||||
|
defer t.mu.Unlock()
|
||||||
if err := t.codec.WriteObject(t.w, obj); err != nil {
|
if err := t.codec.WriteObject(t.w, obj); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -54,12 +59,12 @@ func (t bufferedObjectStream) WriteObject(obj interface{}) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadObject implements ObjectStream.
|
// ReadObject implements ObjectStream.
|
||||||
func (t bufferedObjectStream) ReadObject(v interface{}) error {
|
func (t *bufferedObjectStream) ReadObject(v interface{}) error {
|
||||||
return t.codec.ReadObject(t.r, v)
|
return t.codec.ReadObject(t.r, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close implements ObjectStream.
|
// Close implements ObjectStream.
|
||||||
func (t bufferedObjectStream) Close() error {
|
func (t *bufferedObjectStream) Close() error {
|
||||||
return t.conn.Close()
|
return t.conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue