mirror of
https://github.com/sourcegraph/jsonrpc2.git
synced 2026-06-16 04:04:56 +02:00
transparently simplify control flow (#83)
This commit is contained in:
parent
534fd43609
commit
2cc94179e1
4 changed files with 72 additions and 89 deletions
105
conn.go
105
conn.go
|
|
@ -187,15 +187,17 @@ func (c *Conn) close(cause error) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) readMessages(ctx context.Context) {
|
func (c *Conn) readMessages(ctx context.Context) {
|
||||||
var err error
|
for {
|
||||||
for err == nil {
|
|
||||||
var m anyMessage
|
var m anyMessage
|
||||||
err = c.stream.ReadObject(&m)
|
err := c.stream.ReadObject(&m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
break
|
c.close(err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
|
// TODO: handle the case where both request and response are nil.
|
||||||
|
|
||||||
case m.request != nil:
|
case m.request != nil:
|
||||||
for _, onRecv := range c.onRecv {
|
for _, onRecv := range c.onRecv {
|
||||||
onRecv(m.request, nil)
|
onRecv(m.request, nil)
|
||||||
|
|
@ -204,43 +206,36 @@ func (c *Conn) readMessages(ctx context.Context) {
|
||||||
|
|
||||||
case m.response != nil:
|
case m.response != nil:
|
||||||
resp := m.response
|
resp := m.response
|
||||||
if resp != nil {
|
id := resp.ID
|
||||||
id := resp.ID
|
c.mu.Lock()
|
||||||
c.mu.Lock()
|
call := c.pending[id]
|
||||||
call := c.pending[id]
|
delete(c.pending, id)
|
||||||
delete(c.pending, id)
|
c.mu.Unlock()
|
||||||
c.mu.Unlock()
|
|
||||||
|
|
||||||
if call != nil {
|
var req *Request
|
||||||
call.response = resp
|
if call != nil {
|
||||||
}
|
call.response = resp
|
||||||
|
req = call.request
|
||||||
if len(c.onRecv) > 0 {
|
|
||||||
var req *Request
|
|
||||||
if call != nil {
|
|
||||||
req = call.request
|
|
||||||
}
|
|
||||||
for _, onRecv := range c.onRecv {
|
|
||||||
onRecv(req, resp)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
switch {
|
|
||||||
case call == nil:
|
|
||||||
c.logger.Printf("jsonrpc2: ignoring response #%s with no corresponding request\n", id)
|
|
||||||
|
|
||||||
case resp.Error != nil:
|
|
||||||
call.done <- resp.Error
|
|
||||||
close(call.done)
|
|
||||||
|
|
||||||
default:
|
|
||||||
call.done <- nil
|
|
||||||
close(call.done)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, onRecv := range c.onRecv {
|
||||||
|
onRecv(req, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
if call == nil {
|
||||||
|
c.logger.Printf("jsonrpc2: ignoring response #%s with no corresponding request\n", id)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
if resp.Error != nil {
|
||||||
|
err = resp.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
call.done <- err
|
||||||
|
close(call.done)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.close(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) send(_ context.Context, m *anyMessage, wait bool) (cc *call, err error) {
|
func (c *Conn) send(_ context.Context, m *anyMessage, wait bool) (cc *call, err error) {
|
||||||
|
|
@ -339,25 +334,20 @@ type Waiter struct {
|
||||||
// error is returned.
|
// error is returned.
|
||||||
func (w Waiter) Wait(ctx context.Context, result interface{}) error {
|
func (w Waiter) Wait(ctx context.Context, result interface{}) error {
|
||||||
select {
|
select {
|
||||||
case err, ok := <-w.call.done:
|
|
||||||
if !ok {
|
|
||||||
err = ErrClosed
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if result != nil {
|
|
||||||
if w.call.response.Result == nil {
|
|
||||||
w.call.response.Result = &jsonNull
|
|
||||||
}
|
|
||||||
if err := json.Unmarshal(*w.call.response.Result, result); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
|
|
||||||
|
case err, ok := <-w.call.done:
|
||||||
|
if !ok {
|
||||||
|
return ErrClosed
|
||||||
|
}
|
||||||
|
if err != nil || result == nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if w.call.response.Result == nil {
|
||||||
|
w.call.response.Result = &jsonNull
|
||||||
|
}
|
||||||
|
return json.Unmarshal(*w.call.response.Result, result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -423,12 +413,7 @@ func (m *anyMessage) UnmarshalJSON(data []byte) error {
|
||||||
return errors.New("jsonrpc2: invalid empty batch")
|
return errors.New("jsonrpc2: invalid empty batch")
|
||||||
}
|
}
|
||||||
for i := range msgs {
|
for i := range msgs {
|
||||||
if err := checkType(&msg{
|
if err := checkType(&msgs[i]); err != nil {
|
||||||
ID: msgs[i].ID,
|
|
||||||
Method: msgs[i].Method,
|
|
||||||
Result: msgs[i].Result,
|
|
||||||
Error: msgs[i].Error,
|
|
||||||
}); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -44,11 +44,9 @@ func LogMessages(logger Logger) ConnOpt {
|
||||||
OnRecv(func(req *Request, resp *Response) {
|
OnRecv(func(req *Request, resp *Response) {
|
||||||
switch {
|
switch {
|
||||||
case resp != nil:
|
case resp != nil:
|
||||||
var method string
|
method := "(no matching request)"
|
||||||
if req != nil {
|
if req != nil {
|
||||||
method = req.Method
|
method = req.Method
|
||||||
} else {
|
|
||||||
method = "(no matching request)"
|
|
||||||
}
|
}
|
||||||
switch {
|
switch {
|
||||||
case resp.Result != nil:
|
case resp.Result != nil:
|
||||||
|
|
|
||||||
|
|
@ -30,20 +30,16 @@ func (h *HandlerWithErrorConfigurer) Handle(ctx context.Context, conn *Conn, req
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = resp.SetResult(result)
|
err = resp.SetResult(result)
|
||||||
}
|
}
|
||||||
if err != nil {
|
|
||||||
if e, ok := err.(*Error); ok {
|
if e, ok := err.(*Error); ok {
|
||||||
resp.Error = e
|
resp.Error = e
|
||||||
} else {
|
} else if err != nil {
|
||||||
resp.Error = &Error{Message: err.Error()}
|
resp.Error = &Error{Message: err.Error()}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !req.Notif {
|
err = conn.SendResponse(ctx, resp)
|
||||||
if err := conn.SendResponse(ctx, resp); err != nil {
|
if err != nil && (err != ErrClosed || !h.suppressErrClosed) {
|
||||||
if err != ErrClosed || !h.suppressErrClosed {
|
conn.logger.Printf("jsonrpc2 handler: sending response %s: %v\n", resp.ID, err)
|
||||||
conn.logger.Printf("jsonrpc2 handler: sending response %s: %v\n", resp.ID, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
32
request.go
32
request.go
|
|
@ -55,6 +55,10 @@ func (r Request) MarshalJSON() ([]byte, error) {
|
||||||
// UnmarshalJSON implements json.Unmarshaler.
|
// UnmarshalJSON implements json.Unmarshaler.
|
||||||
func (r *Request) UnmarshalJSON(data []byte) error {
|
func (r *Request) UnmarshalJSON(data []byte) error {
|
||||||
r2 := make(map[string]interface{})
|
r2 := make(map[string]interface{})
|
||||||
|
pop := func(key string) interface{} {
|
||||||
|
defer delete(r2, key)
|
||||||
|
return r2[key]
|
||||||
|
}
|
||||||
|
|
||||||
// Detect if the "params" or "meta" fields are JSON "null" or just not
|
// Detect if the "params" or "meta" fields are JSON "null" or just not
|
||||||
// present by seeing if the field gets overwritten to nil.
|
// present by seeing if the field gets overwritten to nil.
|
||||||
|
|
@ -68,36 +72,37 @@ func (r *Request) UnmarshalJSON(data []byte) error {
|
||||||
if err := decoder.Decode(&r2); err != nil {
|
if err := decoder.Decode(&r2); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var ok bool
|
var ok bool
|
||||||
r.Method, ok = r2["method"].(string)
|
r.Method, ok = pop("method").(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("missing method field")
|
return errors.New("missing method field")
|
||||||
}
|
}
|
||||||
switch {
|
switch params := pop("params"); params {
|
||||||
case r2["params"] == nil:
|
case nil:
|
||||||
r.Params = &jsonNull
|
r.Params = &jsonNull
|
||||||
case r2["params"] == emptyParams:
|
case emptyParams:
|
||||||
r.Params = nil
|
r.Params = nil
|
||||||
default:
|
default:
|
||||||
b, err := json.Marshal(r2["params"])
|
b, err := json.Marshal(params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to marshal params: %w", err)
|
return fmt.Errorf("failed to marshal params: %w", err)
|
||||||
}
|
}
|
||||||
r.Params = (*json.RawMessage)(&b)
|
r.Params = (*json.RawMessage)(&b)
|
||||||
}
|
}
|
||||||
switch {
|
switch meta := pop("meta"); meta {
|
||||||
case r2["meta"] == nil:
|
case nil:
|
||||||
r.Meta = &jsonNull
|
r.Meta = &jsonNull
|
||||||
case r2["meta"] == emptyMeta:
|
case emptyMeta:
|
||||||
r.Meta = nil
|
r.Meta = nil
|
||||||
default:
|
default:
|
||||||
b, err := json.Marshal(r2["meta"])
|
b, err := json.Marshal(meta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to marshal Meta: %w", err)
|
return fmt.Errorf("failed to marshal Meta: %w", err)
|
||||||
}
|
}
|
||||||
r.Meta = (*json.RawMessage)(&b)
|
r.Meta = (*json.RawMessage)(&b)
|
||||||
}
|
}
|
||||||
switch rawID := r2["id"].(type) {
|
switch rawID := pop("id").(type) {
|
||||||
case nil:
|
case nil:
|
||||||
r.ID = ID{}
|
r.ID = ID{}
|
||||||
r.Notif = true
|
r.Notif = true
|
||||||
|
|
@ -115,13 +120,12 @@ func (r *Request) UnmarshalJSON(data []byte) error {
|
||||||
return fmt.Errorf("unexpected ID type: %T", rawID)
|
return fmt.Errorf("unexpected ID type: %T", rawID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The jsonrpc field should not be added to ExtraFields.
|
||||||
|
delete(r2, "jsonrpc")
|
||||||
|
|
||||||
// Clear the extra fields before populating them again.
|
// Clear the extra fields before populating them again.
|
||||||
r.ExtraFields = nil
|
r.ExtraFields = nil
|
||||||
for name, value := range r2 {
|
for name, value := range r2 {
|
||||||
switch name {
|
|
||||||
case "id", "jsonrpc", "meta", "method", "params":
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
r.ExtraFields = append(r.ExtraFields, RequestField{
|
r.ExtraFields = append(r.ExtraFields, RequestField{
|
||||||
Name: name,
|
Name: name,
|
||||||
Value: value,
|
Value: value,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue