1
0
Fork 0
mirror of https://github.com/sourcegraph/jsonrpc2.git synced 2026-06-21 10:18:20 +02:00

Add pluggable transport interface + WebSocket support

simplify API by using interface{}, rename from transport -> stream

add WebSocket transport in websocket subpackage

do not buffer in ReadObject, rename GetObjectReader/ReadObject -> NextObjectReader

use xtest (jsonrpc2_test) package to allow us to test subpackages that depend on us (in a future change)

factor out vscode-specific transport code and allow pluggable transports

remove Server (unused) and Serve (unnecessary):
  The Serve func had nothing specific to JSON-RPC; it was just a loop
  around (net.Listener).Accept. It added no value.
This commit is contained in:
Quinn Slack 2016-12-17 18:35:34 -08:00
parent 9fdd802ab4
commit 6e06d561ec
6 changed files with 416 additions and 194 deletions

View file

@ -3,16 +3,13 @@
package jsonrpc2
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net"
"strconv"
"strings"
"sync"
)
@ -243,8 +240,7 @@ func (id *ID) UnmarshalJSON(data []byte) error {
// is symmetric, so a Conn runs on both ends of a client-server
// connection.
type Conn struct {
conn io.Closer // all writes should go through w, all reads through readMessages
w *bufio.Writer
stream ObjectStream
h Handler
@ -276,10 +272,9 @@ var ErrClosed = errors.New("jsonrpc2: connection is closed")
//
// NewClient consumes conn, so you should call Close on the returned
// client not on the given conn.
func NewConn(ctx context.Context, conn io.ReadWriteCloser, h Handler, opt ...ConnOpt) *Conn {
func NewConn(ctx context.Context, stream ObjectStream, h Handler, opt ...ConnOpt) *Conn {
c := &Conn{
conn: conn,
w: bufio.NewWriter(conn),
stream: stream,
h: h,
pending: map[ID]*call{},
disconnect: make(chan struct{}),
@ -287,7 +282,7 @@ func NewConn(ctx context.Context, conn io.ReadWriteCloser, h Handler, opt ...Con
for _, opt := range opt {
opt(c)
}
go c.readMessages(ctx, bufio.NewReader(conn))
go c.readMessages(ctx)
return c
}
@ -301,10 +296,10 @@ func (c *Conn) Close() error {
}
c.closing = true
c.mu.Unlock()
return c.conn.Close()
return c.stream.Close()
}
func (c *Conn) send(ctx context.Context, m *anyMessage, wait bool) (*call, error) {
func (c *Conn) send(ctx context.Context, m *anyMessage, wait bool) (cc *call, err error) {
c.sending.Lock()
defer c.sending.Unlock()
@ -316,7 +311,6 @@ func (c *Conn) send(ctx context.Context, m *anyMessage, wait bool) (*call, error
// Store requests so we can later associate them with incoming
// responses.
var cc *call
if m.request != nil && wait {
cc = &call{request: m.request, seq: c.seq, done: make(chan error, 1)}
c.pending[ID{Num: c.seq}] = cc // use next seq as call ID
@ -334,17 +328,23 @@ func (c *Conn) send(ctx context.Context, m *anyMessage, wait bool) (*call, error
}
}
err := marshalHeadersAndBody(c.w, m)
if err != nil {
c.w.Flush()
if cc != nil {
c.mu.Lock()
delete(c.pending, ID{Num: cc.seq})
c.mu.Unlock()
// From here on, if we fail to send this, then we need to remove
// this from the pending map so we don't block on it or pile up
// pending entries for unsent messages.
defer func() {
if err != nil {
if cc != nil {
c.mu.Lock()
delete(c.pending, ID{Num: cc.seq})
c.mu.Unlock()
}
}
}()
if err := c.stream.WriteObject(m); err != nil {
return nil, err
}
return cc, c.w.Flush()
return cc, nil
}
// Call initiates a JSON-RPC call using the specified method and
@ -431,16 +431,11 @@ func (c *Conn) DisconnectNotify() <-chan struct{} {
return c.disconnect
}
func (c *Conn) readMessages(ctx context.Context, r *bufio.Reader) {
func (c *Conn) readMessages(ctx context.Context) {
var err error
for err == nil {
var m anyMessage
var n uint32
n, err = readHeaderContentLength(r)
if err == nil {
err = json.NewDecoder(io.LimitReader(r, int64(n))).Decode(&m)
}
err = c.stream.ReadObject(&m)
if err != nil {
break
}
@ -513,20 +508,6 @@ func (c *Conn) readMessages(ctx context.Context, r *bufio.Reader) {
close(c.disconnect)
}
// Server is a JSON-RPC server.
type Server struct{}
// Serve starts a new JSON-RPC server.
func Serve(ctx context.Context, lis net.Listener, h Handler, opt ...ConnOpt) error {
for {
conn, err := lis.Accept()
if err != nil {
return err
}
NewConn(ctx, conn, h, opt...)
}
}
// call represents a JSON-RPC call over its entire lifecycle.
type call struct {
request *Request
@ -612,49 +593,6 @@ func (m *anyMessage) UnmarshalJSON(data []byte) error {
return json.Unmarshal(data, v)
}
func readHeaderContentLength(r *bufio.Reader) (contentLength uint32, err error) {
for {
line, err := r.ReadString('\r')
if err != nil {
return 0, err
}
b, err := r.ReadByte()
if err != nil {
return 0, err
}
if b != '\n' {
return 0, fmt.Errorf(`jsonrpc2: line endings must be \r\n`)
}
if line == "\r" {
break
}
if strings.HasPrefix(line, "Content-Length: ") {
line = strings.TrimPrefix(line, "Content-Length: ")
line = strings.TrimSpace(line)
n, err := strconv.ParseUint(line, 10, 32)
if err != nil {
return 0, err
}
contentLength = uint32(n)
}
}
if contentLength == 0 {
err = fmt.Errorf("jsonrpc2: no Content-Length header found")
}
return
}
func marshalHeadersAndBody(w io.Writer, v interface{}) error {
body, err := json.Marshal(v)
if err != nil {
return err
}
fmt.Fprintf(w, "Content-Length: %d\r\n", len(body))
fmt.Fprint(w, "Content-Type: application/vscode-jsonrpc; charset=utf8\r\n\r\n")
_, err = w.Write(body)
return err
}
var (
errInvalidRequestJSON = errors.New("jsonrpc2: request must be either a JSON object or JSON array")
errInvalidResponseJSON = errors.New("jsonrpc2: response must be either a JSON object or JSON array")