mirror of
https://github.com/sourcegraph/jsonrpc2.git
synced 2026-06-16 04:04:56 +02:00
remove incomplete JSON-RPC 2.0 batch support
This commit is contained in:
parent
2e4214c77b
commit
0328ebe20e
3 changed files with 31 additions and 224 deletions
|
|
@ -8,4 +8,4 @@ This package is **experimental** until further notice.
|
|||
|
||||
## Known issues
|
||||
|
||||
* Batch requests and responses are not yet supported. A handler will panic if it receives a batch request. Because of this, you should not expose any server using this package to external, untrusted traffic (yet).
|
||||
* Batch requests and responses are not yet supported.
|
||||
|
|
|
|||
198
jsonrpc2.go
198
jsonrpc2.go
|
|
@ -4,7 +4,6 @@ package jsonrpc2
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
|
|
@ -274,26 +273,18 @@ func (c *Conn) send(ctx context.Context, m *anyMessage, wait bool) (*call, error
|
|||
var cc *call
|
||||
if m.request != nil && wait {
|
||||
cc = &call{request: m.request, seq: c.seq, done: make(chan error)}
|
||||
c.pending[c.seq] = cc // use first seq as call ID for batch
|
||||
for _, req := range m.requests() {
|
||||
req.ID = c.seq
|
||||
c.seq++
|
||||
}
|
||||
c.pending[c.seq] = cc // use next seq as call ID
|
||||
m.request.ID = c.seq
|
||||
c.seq++
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
if c.onSend != nil {
|
||||
switch {
|
||||
case m.request != nil:
|
||||
if m.request.batch != nil {
|
||||
panic("batching not yet implemented")
|
||||
}
|
||||
c.onSend(m.request.single, nil)
|
||||
c.onSend(m.request, nil)
|
||||
case m.response != nil:
|
||||
if m.response.batch != nil {
|
||||
panic("batching not yet implemented")
|
||||
}
|
||||
c.onSend(nil, m.response.single)
|
||||
c.onSend(nil, m.response)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -324,7 +315,7 @@ func (c *Conn) Call(ctx context.Context, method string, params, result interface
|
|||
return err
|
||||
}
|
||||
}
|
||||
call, err := c.send(ctx, &anyMessage{request: &requestOrRequestBatch{single: req}}, true)
|
||||
call, err := c.send(ctx, &anyMessage{request: req}, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -336,9 +327,9 @@ func (c *Conn) Call(ctx context.Context, method string, params, result interface
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if result != nil && call.response.single.Result != nil {
|
||||
if result != nil && call.response.Result != nil {
|
||||
// TODO(sqs): error handling
|
||||
if err := json.Unmarshal(*call.response.single.Result, result); err != nil {
|
||||
if err := json.Unmarshal(*call.response.Result, result); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -362,7 +353,7 @@ func (c *Conn) Notify(ctx context.Context, method string, params interface{}, op
|
|||
return err
|
||||
}
|
||||
}
|
||||
_, err := c.send(ctx, &anyMessage{request: &requestOrRequestBatch{single: req}}, false)
|
||||
_, err := c.send(ctx, &anyMessage{request: req}, false)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -372,19 +363,19 @@ func (c *Conn) Reply(ctx context.Context, id uint64, result interface{}) error {
|
|||
if err := resp.SetResult(result); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := c.send(ctx, &anyMessage{response: &responseOrResponseBatch{single: resp}}, false)
|
||||
_, err := c.send(ctx, &anyMessage{response: resp}, false)
|
||||
return err
|
||||
}
|
||||
|
||||
// ReplyWithError sends a response with an error.
|
||||
func (c *Conn) ReplyWithError(ctx context.Context, id uint64, respErr *Error) error {
|
||||
_, err := c.send(ctx, &anyMessage{response: &responseOrResponseBatch{single: &Response{ID: id, Error: respErr}}}, false)
|
||||
_, err := c.send(ctx, &anyMessage{response: &Response{ID: id, Error: respErr}}, false)
|
||||
return err
|
||||
}
|
||||
|
||||
// SendResponse sends resp to the peer. It is lower level than (*Conn).Reply.
|
||||
func (c *Conn) SendResponse(ctx context.Context, resp *Response) error {
|
||||
_, err := c.send(ctx, &anyMessage{response: &responseOrResponseBatch{single: resp}}, false)
|
||||
_, err := c.send(ctx, &anyMessage{response: resp}, false)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -410,20 +401,14 @@ func (c *Conn) readMessages(ctx context.Context, r *bufio.Reader) {
|
|||
|
||||
switch {
|
||||
case m.request != nil:
|
||||
switch {
|
||||
case m.request.batch != nil:
|
||||
panic("batching not yet implemented")
|
||||
|
||||
case m.request.single != nil:
|
||||
if c.onRecv != nil {
|
||||
c.onRecv(m.request.single, nil)
|
||||
}
|
||||
go c.h.Handle(ctx, c, m.request.single)
|
||||
if c.onRecv != nil {
|
||||
c.onRecv(m.request, nil)
|
||||
}
|
||||
go c.h.Handle(ctx, c, m.request)
|
||||
|
||||
case m.response != nil:
|
||||
resp := *m.response
|
||||
if resp := resp.single; resp != nil {
|
||||
resp := m.response
|
||||
if resp != nil {
|
||||
seq := resp.ID
|
||||
c.mu.Lock()
|
||||
call := c.pending[seq]
|
||||
|
|
@ -431,17 +416,14 @@ func (c *Conn) readMessages(ctx context.Context, r *bufio.Reader) {
|
|||
c.mu.Unlock()
|
||||
|
||||
if call != nil {
|
||||
call.response = &responseOrResponseBatch{single: resp}
|
||||
call.response = resp
|
||||
}
|
||||
|
||||
if c.onRecv != nil {
|
||||
var req *Request
|
||||
|
||||
if call != nil {
|
||||
if call.request.batch != nil {
|
||||
panic("batching not yet implemented")
|
||||
}
|
||||
req = call.request.single
|
||||
req = call.request
|
||||
}
|
||||
c.onRecv(req, resp)
|
||||
}
|
||||
|
|
@ -458,8 +440,6 @@ func (c *Conn) readMessages(ctx context.Context, r *bufio.Reader) {
|
|||
call.done <- nil
|
||||
close(call.done)
|
||||
}
|
||||
} else {
|
||||
panic("batches are not yet implemented") // TODO(sqs): support batches
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -501,70 +481,18 @@ func Serve(ctx context.Context, lis net.Listener, h Handler, opt ...ConnOpt) err
|
|||
}
|
||||
}
|
||||
|
||||
// mapRespsToReq returns a slice whose i'th element reports the index
|
||||
// in reqs of the i'th response in resps.
|
||||
//
|
||||
// It returns an error if a response's ID does not refer to that of a
|
||||
// request in reqs, or if two responses have the same ID, or if there
|
||||
// is a request (non-notification) that does not have a corresponding
|
||||
// response.
|
||||
func mapRespsToReq(reqs []*Request, resps []*Response) ([]int, error) {
|
||||
reqIndexByID := make(map[uint64]int, len(reqs))
|
||||
for i, req := range reqs {
|
||||
if !req.Notif {
|
||||
reqIndexByID[req.ID] = i
|
||||
}
|
||||
}
|
||||
|
||||
if len(resps) != len(reqIndexByID) {
|
||||
return nil, fmt.Errorf("jsonrpc2: response batch too small: %d responses for %d non-notification requests", len(resps), len(reqIndexByID))
|
||||
}
|
||||
|
||||
m := make([]int, len(resps))
|
||||
seenIDs := make(map[uint64]struct{}, len(resps))
|
||||
for i, resp := range resps {
|
||||
reqIndex, present := reqIndexByID[resp.ID]
|
||||
if !present {
|
||||
return nil, fmt.Errorf("jsonrpc2: response batch contains response with ID %d that doesn't match any IDs in request batch", resp.ID)
|
||||
}
|
||||
m[i] = reqIndex
|
||||
|
||||
if _, seen := seenIDs[resp.ID]; seen {
|
||||
return nil, fmt.Errorf("jsonrpc2: response batch contains multiple responses with same ID %d", resp.ID)
|
||||
}
|
||||
seenIDs[resp.ID] = struct{}{}
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// call represents a JSON-RPC call over its entire lifecycle.
|
||||
type call struct {
|
||||
request *requestOrRequestBatch
|
||||
response *responseOrResponseBatch
|
||||
seq uint64 // the seq of the request (or first request for a batch)
|
||||
request *Request
|
||||
response *Response
|
||||
seq uint64 // the seq of the request
|
||||
done chan error
|
||||
}
|
||||
|
||||
// anyMessage represents either a JSON Request or Response, or a batch
|
||||
// thereof.
|
||||
// anyMessage represents either a JSON Request or Response.
|
||||
type anyMessage struct {
|
||||
request *requestOrRequestBatch
|
||||
response *responseOrResponseBatch
|
||||
}
|
||||
|
||||
func (m *anyMessage) requests() []*Request {
|
||||
if m.request.single != nil {
|
||||
return []*Request{m.request.single}
|
||||
}
|
||||
return m.request.batch
|
||||
}
|
||||
|
||||
func (m *anyMessage) responses() []*Response {
|
||||
if m.response.single != nil {
|
||||
return []*Response{m.response.single}
|
||||
}
|
||||
return m.response.batch
|
||||
request *Request
|
||||
response *Response
|
||||
}
|
||||
|
||||
func (m *anyMessage) MarshalJSON() ([]byte, error) {
|
||||
|
|
@ -578,7 +506,7 @@ func (m *anyMessage) MarshalJSON() ([]byte, error) {
|
|||
if v != nil {
|
||||
return json.Marshal(v)
|
||||
}
|
||||
return nil, errors.New("jsonrpc2: message (or each message in batch) must have exactly one of the request or response fields set")
|
||||
return nil, errors.New("jsonrpc2: message must have exactly one of the request or response fields set")
|
||||
}
|
||||
|
||||
func (m *anyMessage) UnmarshalJSON(data []byte) error {
|
||||
|
|
@ -638,80 +566,6 @@ func (m *anyMessage) UnmarshalJSON(data []byte) error {
|
|||
return json.Unmarshal(data, v)
|
||||
}
|
||||
|
||||
type requestOrRequestBatch struct {
|
||||
batch []*Request
|
||||
single *Request
|
||||
}
|
||||
|
||||
func (v *requestOrRequestBatch) MarshalJSON() ([]byte, error) {
|
||||
if v.single != nil {
|
||||
return json.Marshal(v.single)
|
||||
}
|
||||
return json.Marshal(v.batch)
|
||||
}
|
||||
|
||||
func (v *requestOrRequestBatch) UnmarshalJSON(data []byte) error {
|
||||
data = bytes.TrimLeft(data, " \t\n\r")
|
||||
if len(data) == 0 {
|
||||
return errInvalidRequestJSON
|
||||
}
|
||||
switch data[0] {
|
||||
case '[':
|
||||
*v = requestOrRequestBatch{}
|
||||
if err := json.Unmarshal(data, &v.batch); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
case '{':
|
||||
*v = requestOrRequestBatch{}
|
||||
if err := json.Unmarshal(data, &v.single); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
default:
|
||||
return errInvalidRequestJSON
|
||||
}
|
||||
}
|
||||
|
||||
type responseOrResponseBatch struct {
|
||||
batch []*Response
|
||||
single *Response
|
||||
}
|
||||
|
||||
func (v *responseOrResponseBatch) MarshalJSON() ([]byte, error) {
|
||||
if v.single != nil {
|
||||
return json.Marshal(v.single)
|
||||
}
|
||||
return json.Marshal(v.batch)
|
||||
}
|
||||
|
||||
func (v *responseOrResponseBatch) UnmarshalJSON(data []byte) error {
|
||||
data = bytes.TrimLeft(data, " \t\n\r")
|
||||
if len(data) == 0 {
|
||||
return errInvalidResponseJSON
|
||||
}
|
||||
switch data[0] {
|
||||
case '[':
|
||||
*v = responseOrResponseBatch{}
|
||||
if err := json.Unmarshal(data, &v.batch); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
case '{':
|
||||
*v = responseOrResponseBatch{}
|
||||
if err := json.Unmarshal(data, &v.single); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
default:
|
||||
return errInvalidResponseJSON
|
||||
}
|
||||
}
|
||||
|
||||
func readHeaderContentLength(r *bufio.Reader) (contentLength uint32, err error) {
|
||||
for {
|
||||
line, err := r.ReadString('\r')
|
||||
|
|
|
|||
|
|
@ -255,15 +255,6 @@ func TestAnyMessage(t *testing.T) {
|
|||
`{"method":"m"}`: {request: true},
|
||||
`{"result":123}`: {response: true},
|
||||
`{"error":{"code":456,"message":"m"}}`: {response: true},
|
||||
|
||||
// Batches
|
||||
`[{"method":"m"}]`: {request: true},
|
||||
`[{"method":"m"},{"foo":"bar"}]`: {},
|
||||
`[{"method":"m"},{"result":123}]`: {},
|
||||
`[{"result":123},{"method":"foo"}]`: {},
|
||||
`[{"result":123}]`: {response: true},
|
||||
`[{"error":{"code":456,"message":"m"}}]`: {response: true},
|
||||
`[{"result":123},{"error":{"code":456,"message":"m"}}]`: {response: true},
|
||||
}
|
||||
for s, want := range tests {
|
||||
var m anyMessage
|
||||
|
|
@ -282,12 +273,12 @@ func TestMessageCodec(t *testing.T) {
|
|||
v, vempty interface{}
|
||||
}{
|
||||
{
|
||||
v: &requestOrRequestBatch{single: &Request{ID: 123}},
|
||||
vempty: &requestOrRequestBatch{single: &Request{ID: 123}},
|
||||
v: &Request{ID: 123},
|
||||
vempty: &Request{ID: 123},
|
||||
},
|
||||
{
|
||||
v: &responseOrResponseBatch{single: &Response{ID: 123}},
|
||||
vempty: &responseOrResponseBatch{},
|
||||
v: &Response{ID: 123},
|
||||
vempty: &Response{ID: 123},
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
|
|
@ -306,44 +297,6 @@ func TestMessageCodec(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMapRespsToReq(t *testing.T) {
|
||||
tests := []struct {
|
||||
reqs []*Request
|
||||
resps []*Response
|
||||
want []int
|
||||
wantError bool
|
||||
}{
|
||||
{
|
||||
reqs: nil, resps: nil, want: []int{}, wantError: false,
|
||||
},
|
||||
{
|
||||
reqs: []*Request{{ID: 1}}, resps: []*Response{{ID: 1}}, want: []int{0},
|
||||
},
|
||||
{
|
||||
reqs: []*Request{{ID: 2}}, resps: []*Response{}, wantError: true,
|
||||
},
|
||||
{
|
||||
reqs: []*Request{}, resps: []*Response{{ID: 3}}, wantError: true,
|
||||
},
|
||||
{
|
||||
reqs: []*Request{{ID: 4}}, resps: []*Response{{ID: 4}, {ID: 4}}, wantError: true,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
m, err := mapRespsToReq(test.reqs, test.resps)
|
||||
if (err != nil) != test.wantError {
|
||||
t.Errorf("got error %v, wantError %v", err, test.wantError)
|
||||
continue
|
||||
}
|
||||
if test.wantError {
|
||||
continue
|
||||
}
|
||||
if !reflect.DeepEqual(m, test.want) {
|
||||
t.Errorf("got %v, want %v", m, test.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadHeaderContentLength(t *testing.T) {
|
||||
s := "Content-Type: foo\r\nContent-Length: 123\r\n\r\n{}"
|
||||
n, err := readHeaderContentLength(bufio.NewReader(strings.NewReader(s)))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue