1
0
Fork 0
mirror of https://github.com/sourcegraph/jsonrpc2.git synced 2026-06-16 20:20:03 +02:00

Compare commits

..

32 commits

Author SHA1 Message Date
dependabot[bot]
4756698b1e
Bump actions/checkout from 5 to 6 in the github-actions group (#93) 2025-12-02 09:01:38 +02:00
dependabot[bot]
ef3ea8b2ea
Bump actions/setup-go from 5 to 6 in the github-actions group (#92)
Bumps the github-actions group with 1 update: [actions/setup-go](https://github.com/actions/setup-go).


Updates `actions/setup-go` from 5 to 6
- [Release notes](https://github.com/actions/setup-go/releases)
- [Commits](https://github.com/actions/setup-go/compare/v5...v6)

---
updated-dependencies:
- dependency-name: actions/setup-go
  dependency-version: '6'
  dependency-type: direct:production
  update-type: version-update:semver-major
  dependency-group: github-actions
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-10-02 20:55:33 +02:00
dependabot[bot]
3c4c92ad61
Bump actions/checkout from 4 to 5 in the github-actions group (#91)
Bumps the github-actions group with 1 update: [actions/checkout](https://github.com/actions/checkout).


Updates `actions/checkout` from 4 to 5
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v4...v5)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-version: '5'
  dependency-type: direct:production
  update-type: version-update:semver-major
  dependency-group: github-actions
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-02 13:36:38 +02:00
Sam Herrmann
ddb146fd0d
Cancel Handler context when connection closes (#90) 2025-08-19 16:19:52 +02:00
Kevin Gillette
2cc94179e1
transparently simplify control flow (#83) 2025-02-17 16:55:54 +02:00
Noah S-C
534fd43609
Merge pull request #80 from sourcegraph/nsc/lsifgo-to-scipgo
chore: use scip-go instead of lsif-go for precise indexing in CI
2024-02-23 16:31:37 +00:00
Noah S-C
4963d1c241 chore: use scip-go instead of lsif-go for precise indexing in CI 2024-02-23 15:34:49 +00:00
dependabot[bot]
dd69e185fa
Bump the github-actions group with 2 updates (#79)
Updates `actions/checkout` from 1 to 4

Updates `actions/setup-go` from 2 to 5

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-01-10 12:16:40 +02:00
Joyce
bf47ec21a6
ci: Create dependabot.yml (#78)
Signed-off-by: Joyce <joycebrum@google.com>
2024-01-10 12:06:38 +02:00
Will Dollman
cd64a673da
Merge pull request #75 from diogoteles08/patch-1
Docs: Create Security Policy
2023-09-25 12:04:20 +01:00
Will Dollman
943e53c8e9
Update SECURITY.md
Co-authored-by: Vincent <evict@users.noreply.github.com>
2023-09-25 11:53:41 +01:00
Will Dollman
e4e2e6324c Update security policy to use email for reporting 2023-09-25 11:50:00 +01:00
Diogo Teles Sant'Anna
510183e882
Create Security Policy 2023-09-11 14:51:22 -03:00
Diogo Teles Sant'Anna
8a0bf06edf
ci: set minimal permissions to GitHub workflows (#73)
Signed-off-by: Diogo Teles Sant'Anna <diogoteles@google.com>
2023-08-30 09:07:57 +02:00
Fazlul Shahriar
b9c1fbdb96
Fix logging of received response messages (#71)
As documented: OnRecv causes all requests received on conn to invoke
f(req, nil) and all responses to invoke f(req, resp).

Since OnRecv is called with both *Request and *Response being non-nil
when we're handling a response, we need to check that *Response is
non-nil before we check *Request is non-nil. This change just swaps the
two cases in the switch statement to fix the issue. For consistency,
I've swapped the cases for OnSend also, even when it's not needed.
2023-07-14 13:00:57 +02:00
Keegan Carruthers-Smith
5d80b29f44
conn: do not lock sending when closing (#70)
The sending mutex may be blocked due to the underlying conn blocking. If
that happens then we can't call close since close also attempts to hold
the sending mutex. Sending mutex is only used for serializing sends and
doesn't protect the fields close reads/writes. I believe we introduced
locking it so we would return ErrClosed. This commit instead introduces
a check in send which rechecks if we have since closed while attempting
to send.

Test Plan: expanded the test coverage
2023-06-07 08:40:20 +02:00
Sam Herrmann
040dc22f8a
Add package example test (#68) 2023-03-01 06:46:15 +02:00
Sam Herrmann
ae88a5e7c0
Always omit params member from request when empty (#67)
With this commit, the JSON encoding of `Request` always omits the params
member when calling `Conn.Call`, `Conn.DispatchCall`, or `Conn.Notify`
with the `params` argument set to `nil`. This change also removes the
`OmitNilParams` call option that was added in commit 8012d496 (#62).

As of this commit, if users desire to send a JSON-RPC request with a
`params` value of `null`, then they may do so by explicitly setting the
`params` argument of `Conn.Call`/`Conn.DispatchCall`/`Conn.Notify` to
`json.RawMessage("null")`.
2023-02-22 10:53:44 +02:00
Sam Herrmann
6864d8cc6d
Omit data field from error when empty (#66)
The JSON-RPC 2.0 specification allows the data member of an error object
to be omitted. Before this commit, if Error.Data was nil then the JSON
encoding was "null". That means that the data member was included in
every JSON-RPC error object, even when the data member was not
explicitly set.

This commit adds the omitempty struct tag to the Error.Data field,
meaning that the data member is omitted from the JSON encoding unless
explicitly set with the Error.SetError() method. Beware that this is a
breaking change for clients that may have strict null checks on the data
member.
2023-02-12 14:52:17 +02:00
Sam Herrmann
846c29e96d
Split jsonrpc2.go file into multiple files (#65)
This merge request moves some of the contents from the jsonrpc2.go file
into their own designated file. The new files being introduced
(excluding test files) are as follows:

* conn.go
* request.go
* response.go

The motive of this change is to make it easier to navigate the code.
Without this change, the jsonrpc2.go file is 813 lines of code.
2023-02-09 07:56:42 +01:00
Sam Herrmann
028a50bb39
Fix underlying connection not being closed on protocol error (#64)
Before this commit, the underlying connection of `Conn` was not being
closed when a protocol error was encountered. This behavior contradicted
with `Conn.DisconnectNotify()` because it reported that the underlying
connection was being closed. Additionally, the underlying connection was
now orphaned because `Conn` was no longer processing any of the
subsequent requests.

With this commit, the underlying connection is now being closed when a
protocol error is encountered, matching what `Conn.DisconnectNotify()`
has already been reporting.
2023-02-07 21:46:05 +01:00
Sam Herrmann
78a3d790f3
Pin staticcheck version to v0.2.2 (#63)
The CI pipeline has been broken in this project for some time because
the latest version of staticcheck is not compatible with every version
of Go. Through trial and error, it was discovered that staticcheck
v0.2.2 is the latest version that is compatible with Go 1.16.

The authors of staticcheck also recommend pinning the version in CI
pipelines to prevent unintentional breakage of the build [1].

References
[1]: https://staticcheck.io/docs/running-staticcheck/ci/github-actions/#version
2023-02-03 10:51:22 +02:00
Sam Herrmann
8012d49686
Add ability to omit params member from request (#62)
The JSON-RPC 2.0 specification allows the params member of a request to
be omitted [1]. Before this commit, this library did not allow the
params member to be omitted when sending a request. When the params
argument of the Conn.Call/Conn.DispatchCall or Conn.Notify method was
set to nil, then Request.Params was set to the JSON encoding of nil
which is null.

This commit adds a CallOption named OmitNilParams. If OmitNilParams is
used when sending a request with params set to nil, then the params
member in the JSON encoding of Request is omitted. If the OmitNilParams
option is not used then the previous behavior is maintained. In other
words, the changes in this commit are backwards compatible.

References
[1]: https://www.jsonrpc.org/specification#request_object
2023-01-24 08:47:36 +02:00
Semesse
e1f9fdf1bb
fix no corresponding request if req.ID is modified by onSend (#60) 2023-01-13 20:33:58 +02:00
Dax McDonald
065a868115
Merge pull request #59 from sourcegraph/batch-changes/dax/update-checkout-v3
ci: update to actions/checkout@v3
2023-01-05 07:22:44 -08:00
Sourcegraph
7f448843ac batch changes - update checkout v2 to v3 2023-01-03 22:03:16 +00:00
Michał Nowotnik
a896fc3eac
[#57] Fix and deprecate PlainObjectCodec (#58)
This change fixes a bug that causes PlainObjectCodec to
lose additional messages from stream. json.Decoder has
an internal buffer that reads more than one message, but
is discarded after every use. Now PlainObjectCodec reuses
encoder and decoder within a buffered stream, however
using it directly in your code retains the old, incorrect
behaviour.

A user should now use plainObjectStream if he needs
plain JSON-RPC 2.0 stream without headers.
`NewPlainObjectStream` method has been added for this reason.
2022-07-11 15:43:39 +02:00
Sam Herrmann
c9c77b6bb9
Add ability to set custom logger (#48)
Before this commit, a custom logger could be set with the LogMessages
function. However, by using LogMessages not only is a custom logger set
but also all received and sent messages are logged. Use cases exist
where a custom logger is desired to log errors but not all messages.
2021-11-19 10:30:03 +02:00
lhchavez
5f298fe6a1
Homogenize treatment of params and meta in UnmarshalJSON (#52)
This change makes the treatment of params and meta the same, by
assigning a well-known pointer at first to detect if the unmarshaling
process overwrites it with an explicit nil, or it stays the same in
which it means that it was unset from the beginning.
2021-08-04 18:55:02 +02:00
lhchavez
120d461fd1
Add GitHub actions (#51)
That extra green checkmark does wonders for peace of mind.
2021-08-04 14:46:32 +02:00
lhchavez
d6ac66e24f
Add a way to specify more non-standard-compliant fields to Request (#50)
This change introduces `ExtraField`, a `CallOption` that can add
arbitrary fields to the top-level JSON-RPC Request message.
2021-08-04 14:45:59 +02:00
amyxia
5cdc7d6ccd
fix typo in jsonrpc2/stream.go (#47)
Co-authored-by: xiarui.xr <xiarui.xr@antfin.com>
2021-05-26 14:52:51 +02:00
24 changed files with 1865 additions and 972 deletions

10
.github/dependabot.yml vendored Normal file
View file

@ -0,0 +1,10 @@
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "monthly"
groups:
github-actions:
patterns:
- "*"

34
.github/workflows/ci.yml vendored Normal file
View file

@ -0,0 +1,34 @@
name: CI
on:
pull_request: {}
push:
branches:
- master
permissions:
contents: read
jobs:
test:
strategy:
fail-fast: false
matrix:
go:
- 1.16
name: Go ${{ matrix.go }}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version: ${{ matrix.go }}
id: go
- name: Get dependencies
run: go get -t -v ./...
- name: Install staticcheck
run: go install honnef.co/go/tools/cmd/staticcheck@v0.2.2
- name: Lint
run: staticcheck -checks=all ./...
- name: Test
run: go test -v -race ./...

View file

@ -1,13 +0,0 @@
name: LSIF
on:
- push
jobs:
lsif-go:
runs-on: ubuntu-latest
container: sourcegraph/lsif-go
steps:
- uses: actions/checkout@v1
- name: Generate LSIF data
run: lsif-go
- name: Upload LSIF data
run: src lsif upload -github-token=${{ secrets.GITHUB_TOKEN }}

20
.github/workflows/scip.yml vendored Normal file
View file

@ -0,0 +1,20 @@
name: SCIP
'on':
- push
permissions:
contents: read
jobs:
scip-go:
runs-on: ubuntu-latest
container: sourcegraph/scip-go
steps:
- uses: actions/checkout@v6
- name: Get src-cli
run: curl -L https://sourcegraph.com/.api/src-cli/src_linux_amd64 -o /usr/local/bin/src;
chmod +x /usr/local/bin/src
- name: Set directory to safe for git
run: git config --global --add safe.directory $GITHUB_WORKSPACE
- name: Generate SCIP data
run: scip-go
- name: Upload SCIP data
run: src code-intel upload -github-token=${{ secrets.GITHUB_TOKEN }}

View file

@ -3,9 +3,8 @@
Package jsonrpc2 provides a [Go](https://golang.org) implementation of [JSON-RPC 2.0](http://www.jsonrpc.org/specification).
This package is **experimental** until further notice.
[**Open the code in Sourcegraph**](https://sourcegraph.com/github.com/sourcegraph/jsonrpc2)
* [Documentation](https://pkg.go.dev/github.com/sourcegraph/jsonrpc2)
* [Open the code in Sourcegraph](https://sourcegraph.com/github.com/sourcegraph/jsonrpc2)
## Known issues

12
SECURITY.md Normal file
View file

@ -0,0 +1,12 @@
# Security Policy
## Supported Versions
Security updates are applied only to the latest release.
## Reporting a Vulnerability
If you have discovered a security vulnerability in this project, please report it privately. **Do not disclose it as a public issue.** This gives us time to work with you to evaluate and fix the issue before public exposure, reducing the chance that the exploit will be used before a patch is released.
Please disclose it privately via email to security@sourcegraph.com. We will work with you to understand and resolve the issue promptly.

View file

@ -19,6 +19,15 @@ func Meta(meta interface{}) CallOption {
})
}
// ExtraField returns a call option which attaches the given name/value pair to
// the JSON-RPC 2.0 request. This can be used to add arbitrary extensions to
// JSON RPC 2.0.
func ExtraField(name string, value interface{}) CallOption {
return callOptionFunc(func(r *Request) error {
return r.SetExtraField(name, value)
})
}
// PickID returns a call option which sets the ID on a request. Care must be
// taken to ensure there are no conflicts with any previously picked ID, nor
// with the default sequence ID.

View file

@ -90,3 +90,53 @@ func TestStringID(t *testing.T) {
t.Fatal(err)
}
}
func TestExtraField(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a, b := inMemoryPeerConns()
defer a.Close()
defer b.Close()
handler := handlerFunc(func(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
replyWithError := func(msg string) {
respErr := &jsonrpc2.Error{Code: jsonrpc2.CodeInvalidRequest, Message: msg}
if err := conn.ReplyWithError(ctx, req.ID, respErr); err != nil {
t.Error(err)
}
}
var sessionID string
for _, field := range req.ExtraFields {
if field.Name != "sessionId" {
continue
}
var ok bool
sessionID, ok = field.Value.(string)
if !ok {
t.Errorf("\"sessionId\" is not a string: %v", field.Value)
}
}
if sessionID == "" {
replyWithError("sessionId must be set")
return
}
if sessionID != "session" {
replyWithError("sessionId has the wrong value")
return
}
if err := conn.Reply(ctx, req.ID, "ok"); err != nil {
t.Error(err)
}
})
connA := jsonrpc2.NewConn(ctx, jsonrpc2.NewBufferedStream(a, jsonrpc2.VSCodeObjectCodec{}), handler)
connB := jsonrpc2.NewConn(ctx, jsonrpc2.NewBufferedStream(b, jsonrpc2.VSCodeObjectCodec{}), noopHandler{})
defer connA.Close()
defer connB.Close()
var res string
if err := connB.Call(ctx, "f", nil, &res, jsonrpc2.ExtraField("sessionId", "session")); err != nil {
t.Fatal(err)
}
}

479
conn.go Normal file
View file

@ -0,0 +1,479 @@
package jsonrpc2
import (
"bytes"
"context"
"encoding/json"
"errors"
"io"
"log"
"os"
"strconv"
"sync"
)
// Conn is a JSON-RPC client/server connection. The JSON-RPC protocol
// is symmetric, so a Conn runs on both ends of a client-server
// connection.
type Conn struct {
stream ObjectStream
h Handler
mu sync.Mutex
closed bool
seq uint64
pending map[ID]*call
sending sync.Mutex
cancelCtx context.CancelFunc
disconnect chan struct{}
logger Logger
// Set by ConnOpt funcs.
onRecv []func(*Request, *Response)
onSend []func(*Request, *Response)
}
var _ JSONRPC2 = (*Conn)(nil)
// NewConn creates a new JSON-RPC client/server connection using the
// given ReadWriteCloser (typically a TCP connection or stdio). The
// JSON-RPC protocol is symmetric, so a Conn runs on both ends of a
// client-server connection.
//
// NewConn consumes stream, so you should call Close on the returned
// Conn not on the given stream or its underlying connection.
//
// Conn is closed when the given context's Done channel is closed.
func NewConn(ctx context.Context, stream ObjectStream, h Handler, opts ...ConnOpt) *Conn {
ctx, cancel := context.WithCancel(ctx)
c := &Conn{
stream: stream,
h: h,
pending: map[ID]*call{},
cancelCtx: cancel,
disconnect: make(chan struct{}),
logger: log.New(os.Stderr, "", log.LstdFlags),
}
for _, opt := range opts {
if opt == nil {
continue
}
opt(c)
}
go c.readMessages(ctx)
go func() {
<-ctx.Done()
c.close(nil)
}()
return c
}
// Close closes the JSON-RPC connection. The connection may not be
// used after it has been closed.
func (c *Conn) Close() error {
return c.close(nil)
}
// Call initiates a JSON-RPC call using the specified method and params, and
// waits for the response. If the response is successful, its result is stored
// in result (a pointer to a value that can be JSON-unmarshaled into);
// otherwise, a non-nil error is returned. See DispatchCall for more details.
func (c *Conn) Call(ctx context.Context, method string, params, result interface{}, opts ...CallOption) error {
call, err := c.DispatchCall(ctx, method, params, opts...)
if err != nil {
return err
}
return call.Wait(ctx, result)
}
// DisconnectNotify returns a channel that is closed when the
// underlying connection is disconnected.
func (c *Conn) DisconnectNotify() <-chan struct{} {
return c.disconnect
}
// DispatchCall dispatches a JSON-RPC call using the specified method and
// params, and returns a call proxy or an error. Call Wait() on the returned
// proxy to receive the response. Only use this function if you need to do work
// after dispatching the request, otherwise use Call.
//
// The params member is omitted from the JSON-RPC request if the given params is
// nil. Use json.RawMessage("null") to send a JSON-RPC request with its params
// member set to null.
func (c *Conn) DispatchCall(ctx context.Context, method string, params interface{}, opts ...CallOption) (Waiter, error) {
req := &Request{Method: method}
for _, opt := range opts {
if opt == nil {
continue
}
if err := opt.apply(req); err != nil {
return Waiter{}, err
}
}
if params != nil {
if err := req.SetParams(params); err != nil {
return Waiter{}, err
}
}
call, err := c.send(ctx, &anyMessage{request: req}, true)
if err != nil {
return Waiter{}, err
}
return Waiter{call: call}, nil
}
// Notify is like Call, but it returns when the notification request is sent
// (without waiting for a response, because JSON-RPC notifications do not have
// responses).
//
// The params member is omitted from the JSON-RPC request if the given params is
// nil. Use json.RawMessage("null") to send a JSON-RPC request with its params
// member set to null.
func (c *Conn) Notify(ctx context.Context, method string, params interface{}, opts ...CallOption) error {
req := &Request{Method: method, Notif: true}
for _, opt := range opts {
if opt == nil {
continue
}
if err := opt.apply(req); err != nil {
return err
}
}
if params != nil {
if err := req.SetParams(params); err != nil {
return err
}
}
_, err := c.send(ctx, &anyMessage{request: req}, false)
return err
}
// Reply sends a successful response with a result.
func (c *Conn) Reply(ctx context.Context, id ID, result interface{}) error {
resp := &Response{ID: id}
if err := resp.SetResult(result); err != nil {
return err
}
_, err := c.send(ctx, &anyMessage{response: resp}, false)
return err
}
// ReplyWithError sends a response with an error.
func (c *Conn) ReplyWithError(ctx context.Context, id ID, respErr *Error) error {
_, err := c.send(ctx, &anyMessage{response: &Response{ID: id, Error: respErr}}, false)
return err
}
// SendResponse sends resp to the peer. It is lower level than (*Conn).Reply.
func (c *Conn) SendResponse(ctx context.Context, resp *Response) error {
_, err := c.send(ctx, &anyMessage{response: resp}, false)
return err
}
func (c *Conn) close(cause error) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return ErrClosed
}
for _, call := range c.pending {
close(call.done)
}
if cause != nil && cause != io.EOF && cause != io.ErrUnexpectedEOF {
c.logger.Printf("jsonrpc2: protocol error: %v\n", cause)
}
close(c.disconnect)
c.cancelCtx()
c.closed = true
return c.stream.Close()
}
func (c *Conn) readMessages(ctx context.Context) {
for {
var m anyMessage
err := c.stream.ReadObject(&m)
if err != nil {
c.close(err)
return
}
switch {
// TODO: handle the case where both request and response are nil.
case m.request != nil:
for _, onRecv := range c.onRecv {
onRecv(m.request, nil)
}
c.h.Handle(ctx, c, m.request)
case m.response != nil:
resp := m.response
id := resp.ID
c.mu.Lock()
call := c.pending[id]
delete(c.pending, id)
c.mu.Unlock()
var req *Request
if call != nil {
call.response = resp
req = call.request
}
for _, onRecv := range c.onRecv {
onRecv(req, resp)
}
if call == nil {
c.logger.Printf("jsonrpc2: ignoring response #%s with no corresponding request\n", id)
continue
}
var err error
if resp.Error != nil {
err = resp.Error
}
call.done <- err
close(call.done)
}
}
}
func (c *Conn) send(_ context.Context, m *anyMessage, wait bool) (cc *call, err error) {
c.sending.Lock()
defer c.sending.Unlock()
// double check the error isn't due to being closed while sending.
defer func() {
if err != nil {
c.mu.Lock()
if c.closed {
err = ErrClosed
}
c.mu.Unlock()
}
}()
// m.request.ID could be changed, so we store a copy to correctly
// clean up pending
var id ID
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return nil, ErrClosed
}
// Assign a default id if not set
if m.request != nil && wait {
cc = &call{request: m.request, seq: c.seq, done: make(chan error, 1)}
isIDUnset := len(m.request.ID.Str) == 0 && m.request.ID.Num == 0
if isIDUnset {
if m.request.ID.IsString {
m.request.ID.Str = strconv.FormatUint(c.seq, 10)
} else {
m.request.ID.Num = c.seq
}
}
c.seq++
}
c.mu.Unlock()
if len(c.onSend) > 0 {
var (
req *Request
resp *Response
)
switch {
case m.request != nil:
req = m.request
case m.response != nil:
resp = m.response
}
for _, onSend := range c.onSend {
onSend(req, resp)
}
}
// Store requests so we can later associate them with incoming
// responses.
if m.request != nil && wait {
c.mu.Lock()
id = m.request.ID
c.pending[id] = cc
c.mu.Unlock()
}
// From here on, if we fail to send this, then we need to remove
// this from the pending map so we don't block on it or pile up
// pending entries for unsent messages.
defer func() {
if err != nil {
if cc != nil {
c.mu.Lock()
delete(c.pending, id)
c.mu.Unlock()
}
}
}()
if err := c.stream.WriteObject(m); err != nil {
return nil, err
}
return cc, nil
}
// Waiter proxies an ongoing JSON-RPC call.
type Waiter struct {
*call
}
// Wait for the result of an ongoing JSON-RPC call. If the response
// is successful, its result is stored in result (a pointer to a
// value that can be JSON-unmarshaled into); otherwise, a non-nil
// error is returned.
func (w Waiter) Wait(ctx context.Context, result interface{}) error {
select {
case <-ctx.Done():
return ctx.Err()
case err, ok := <-w.call.done:
if !ok {
return ErrClosed
}
if err != nil || result == nil {
return err
}
if w.call.response.Result == nil {
w.call.response.Result = &jsonNull
}
return json.Unmarshal(*w.call.response.Result, result)
}
}
// call represents a JSON-RPC call over its entire lifecycle.
type call struct {
request *Request
response *Response
seq uint64 // the seq of the request
done chan error
}
// anyMessage represents either a JSON Request or Response.
type anyMessage struct {
request *Request
response *Response
}
func (m anyMessage) MarshalJSON() ([]byte, error) {
var v interface{}
switch {
case m.request != nil && m.response == nil:
v = m.request
case m.request == nil && m.response != nil:
v = m.response
}
if v != nil {
return json.Marshal(v)
}
return nil, errors.New("jsonrpc2: message must have exactly one of the request or response fields set")
}
func (m *anyMessage) UnmarshalJSON(data []byte) error {
// The presence of these fields distinguishes between the 2
// message types.
type msg struct {
ID interface{} `json:"id"`
Method *string `json:"method"`
Result anyValueWithExplicitNull `json:"result"`
Error interface{} `json:"error"`
}
var isRequest, isResponse bool
checkType := func(m *msg) error {
mIsRequest := m.Method != nil
mIsResponse := m.Result.null || m.Result.value != nil || m.Error != nil
if (!mIsRequest && !mIsResponse) || (mIsRequest && mIsResponse) {
return errors.New("jsonrpc2: unable to determine message type (request or response)")
}
if (mIsRequest && isResponse) || (mIsResponse && isRequest) {
return errors.New("jsonrpc2: batch message type mismatch (must be all requests or all responses)")
}
isRequest = mIsRequest
isResponse = mIsResponse
return nil
}
if isArray := len(data) > 0 && data[0] == '['; isArray {
var msgs []msg
if err := json.Unmarshal(data, &msgs); err != nil {
return err
}
if len(msgs) == 0 {
return errors.New("jsonrpc2: invalid empty batch")
}
for i := range msgs {
if err := checkType(&msgs[i]); err != nil {
return err
}
}
} else {
var m msg
if err := json.Unmarshal(data, &m); err != nil {
return err
}
if err := checkType(&m); err != nil {
return err
}
}
var v interface{}
switch {
case isRequest && !isResponse:
v = &m.request
case !isRequest && isResponse:
v = &m.response
}
if err := json.Unmarshal(data, v); err != nil {
return err
}
if !isRequest && isResponse && m.response.Error == nil && m.response.Result == nil {
m.response.Result = &jsonNull
}
return nil
}
// anyValueWithExplicitNull is used to distinguish {} from
// {"result":null} by anyMessage's JSON unmarshaler.
type anyValueWithExplicitNull struct {
null bool // JSON "null"
value interface{}
}
func (v anyValueWithExplicitNull) MarshalJSON() ([]byte, error) {
return json.Marshal(v.value)
}
func (v *anyValueWithExplicitNull) UnmarshalJSON(data []byte) error {
data = bytes.TrimSpace(data)
if string(data) == "null" {
*v = anyValueWithExplicitNull{null: true}
return nil
}
*v = anyValueWithExplicitNull{}
return json.Unmarshal(data, &v.value)
}

View file

@ -43,6 +43,20 @@ func LogMessages(logger Logger) ConnOpt {
OnRecv(func(req *Request, resp *Response) {
switch {
case resp != nil:
method := "(no matching request)"
if req != nil {
method = req.Method
}
switch {
case resp.Result != nil:
result, _ := json.Marshal(resp.Result)
logger.Printf("jsonrpc2: --> result #%s: %s: %s\n", resp.ID, method, result)
case resp.Error != nil:
err, _ := json.Marshal(resp.Error)
logger.Printf("jsonrpc2: --> error #%s: %s: %s\n", resp.ID, method, err)
}
case req != nil:
mu.Lock()
reqMethods[req.ID] = req.Method
@ -54,34 +68,10 @@ func LogMessages(logger Logger) ConnOpt {
} else {
logger.Printf("jsonrpc2: --> request #%s: %s: %s\n", req.ID, req.Method, params)
}
case resp != nil:
var method string
if req != nil {
method = req.Method
} else {
method = "(no matching request)"
}
switch {
case resp.Result != nil:
result, _ := json.Marshal(resp.Result)
logger.Printf("jsonrpc2: --> result #%s: %s: %s\n", resp.ID, method, result)
case resp.Error != nil:
err, _ := json.Marshal(resp.Error)
logger.Printf("jsonrpc2: --> error #%s: %s: %s\n", resp.ID, method, err)
}
}
})(c)
OnSend(func(req *Request, resp *Response) {
switch {
case req != nil:
params, _ := json.Marshal(req.Params)
if req.Notif {
logger.Printf("jsonrpc2: <-- notif: %s: %s\n", req.Method, params)
} else {
logger.Printf("jsonrpc2: <-- request #%s: %s: %s\n", req.ID, req.Method, params)
}
case resp != nil:
mu.Lock()
method := reqMethods[resp.ID]
@ -98,7 +88,22 @@ func LogMessages(logger Logger) ConnOpt {
err, _ := json.Marshal(resp.Error)
logger.Printf("jsonrpc2: <-- error #%s: %s: %s\n", resp.ID, method, err)
}
case req != nil:
params, _ := json.Marshal(req.Params)
if req.Notif {
logger.Printf("jsonrpc2: <-- notif: %s: %s\n", req.Method, params)
} else {
logger.Printf("jsonrpc2: <-- request #%s: %s: %s\n", req.ID, req.Method, params)
}
}
})(c)
}
}
// SetLogger sets the logger for the connection.
func SetLogger(logger Logger) ConnOpt {
return func(c *Conn) {
c.logger = logger
}
}

130
conn_opt_test.go Normal file
View file

@ -0,0 +1,130 @@
package jsonrpc2_test
import (
"bufio"
"context"
"io"
"log"
"net"
"testing"
"github.com/sourcegraph/jsonrpc2"
)
func TestSetLogger(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rd, wr := io.Pipe()
defer rd.Close()
defer wr.Close()
buf := bufio.NewReader(rd)
logger := log.New(wr, "", log.Lmsgprefix)
a, b := net.Pipe()
connA := jsonrpc2.NewConn(
ctx,
jsonrpc2.NewBufferedStream(a, jsonrpc2.VSCodeObjectCodec{}),
noopHandler{},
jsonrpc2.SetLogger(logger),
)
connB := jsonrpc2.NewConn(
ctx,
jsonrpc2.NewBufferedStream(b, jsonrpc2.VSCodeObjectCodec{}),
noopHandler{},
)
defer connA.Close()
defer connB.Close()
// Write a response with no corresponding request.
if err := connB.Reply(ctx, jsonrpc2.ID{Num: 0}, nil); err != nil {
t.Fatal(err)
}
want := "jsonrpc2: ignoring response #0 with no corresponding request\n"
got, err := buf.ReadString('\n')
if err != nil {
t.Fatal(err)
}
if got != want {
t.Fatalf("got %q, want %q", got, want)
}
}
type dummyHandler struct {
t *testing.T
}
func (h *dummyHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
if !req.Notif {
err := conn.Reply(ctx, req.ID, nil)
if err != nil {
h.t.Error(err)
return
}
}
}
func TestLogMessages(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rd, wr := io.Pipe()
defer rd.Close()
defer wr.Close()
buf := bufio.NewReader(rd)
logger := log.New(wr, "", log.Lmsgprefix)
a, b := net.Pipe()
connA := jsonrpc2.NewConn(
ctx,
jsonrpc2.NewBufferedStream(a, jsonrpc2.VSCodeObjectCodec{}),
&dummyHandler{t},
jsonrpc2.LogMessages(logger),
)
connB := jsonrpc2.NewConn(
ctx,
jsonrpc2.NewBufferedStream(b, jsonrpc2.VSCodeObjectCodec{}),
&dummyHandler{t},
)
defer connA.Close()
defer connB.Close()
go func() {
if err := connA.Call(ctx, "method1", nil, nil); err != nil {
t.Error(err)
return
}
if err := connB.Call(ctx, "method2", nil, nil); err != nil {
t.Error(err)
return
}
if err := connA.Notify(ctx, "notification1", nil); err != nil {
t.Error(err)
return
}
if err := connB.Notify(ctx, "notification2", nil); err != nil {
t.Error(err)
return
}
}()
for i, want := range []string{
"jsonrpc2: <-- request #0: method1: null\n",
"jsonrpc2: --> result #0: method1: null\n",
"jsonrpc2: --> request #0: method2: null\n",
"jsonrpc2: <-- result #0: method2: null\n",
"jsonrpc2: <-- notif: notification1: null\n",
"jsonrpc2: --> notif: notification2: null\n",
} {
got, err := buf.ReadString('\n')
if err != nil {
t.Fatal(err)
}
if got != want {
t.Errorf("message %v: got %q, want %q", i, got, want)
}
}
}

304
conn_test.go Normal file
View file

@ -0,0 +1,304 @@
package jsonrpc2_test
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net"
"sync"
"testing"
"time"
"github.com/sourcegraph/jsonrpc2"
)
func TestConn(t *testing.T) {
t.Run("closes when context is done", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
connA, connB := Pipe(ctx, noopHandler{}, noopHandler{})
defer connA.Close()
defer connB.Close()
cancel()
<-connA.DisconnectNotify()
got := connA.Close()
want := jsonrpc2.ErrClosed
if got != want {
t.Fatalf("got %v, want %v", got, want)
}
})
t.Run("cancels context when closed", func(t *testing.T) {
ctxCanceled := make(chan struct{})
handler := handlerFunc(func(ctx context.Context, c *jsonrpc2.Conn, r *jsonrpc2.Request) {
// Block until the context is canceled.
<-ctx.Done()
close(ctxCanceled)
})
connA, connB := Pipe(context.Background(), noopHandler{}, jsonrpc2.AsyncHandler(handler))
defer connA.Close()
defer connB.Close()
// Send a notification from connA to connB to trigger connB's handler
// function.
if err := connA.Notify(context.Background(), "foo", nil, nil); err != nil {
t.Fatal(err)
}
// Disconnect connA from connB.
if err := connA.Close(); err != nil {
t.Fatal(err)
}
select {
case <-ctxCanceled:
// Test passed, the handler's context was canceled.
case <-time.After(time.Second):
t.Fatal("context not canceled")
}
})
}
var paramsTests = []struct {
sendParams interface{}
wantParams *json.RawMessage
}{
{
sendParams: nil,
wantParams: nil,
},
{
sendParams: jsonNull,
wantParams: &jsonNull,
},
{
sendParams: false,
wantParams: rawJSONMessage("false"),
},
{
sendParams: 0,
wantParams: rawJSONMessage("0"),
},
{
sendParams: "",
wantParams: rawJSONMessage(`""`),
},
{
sendParams: rawJSONMessage(`{"foo":"bar"}`),
wantParams: rawJSONMessage(`{"foo":"bar"}`),
},
}
func TestConn_DispatchCall(t *testing.T) {
for _, test := range paramsTests {
t.Run(fmt.Sprintf("%s", test.sendParams), func(t *testing.T) {
testParams(t, test.wantParams, func(c *jsonrpc2.Conn) error {
_, err := c.DispatchCall(context.Background(), "f", test.sendParams)
return err
})
})
}
}
func TestConn_Notify(t *testing.T) {
for _, test := range paramsTests {
t.Run(fmt.Sprintf("%s", test.sendParams), func(t *testing.T) {
testParams(t, test.wantParams, func(c *jsonrpc2.Conn) error {
return c.Notify(context.Background(), "f", test.sendParams)
})
})
}
}
func TestConn_DisconnectNotify(t *testing.T) {
t.Run("EOF", func(t *testing.T) {
connA, connB := net.Pipe()
c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewPlainObjectStream(connB), nil)
// By closing connA, connB receives io.EOF
if err := connA.Close(); err != nil {
t.Error(err)
}
assertDisconnect(t, c, connB)
})
t.Run("Close", func(t *testing.T) {
_, connB := net.Pipe()
c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewPlainObjectStream(connB), nil)
if err := c.Close(); err != nil {
t.Error(err)
}
assertDisconnect(t, c, connB)
})
t.Run("Close async", func(t *testing.T) {
done := make(chan struct{})
_, connB := net.Pipe()
c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewPlainObjectStream(connB), nil)
go func() {
if err := c.Close(); err != nil && err != jsonrpc2.ErrClosed {
t.Error(err)
}
close(done)
}()
assertDisconnect(t, c, connB)
<-done
})
t.Run("protocol error", func(t *testing.T) {
connA, connB := net.Pipe()
c := jsonrpc2.NewConn(
context.Background(),
jsonrpc2.NewPlainObjectStream(connB),
noopHandler{},
// Suppress log message. This connection receives an invalid JSON
// message that causes an error to be written to the logger. We
// don't want this expected error to appear in os.Stderr though when
// running tests in verbose mode or when other tests fail.
jsonrpc2.SetLogger(log.New(io.Discard, "", 0)),
)
connA.Write([]byte("invalid json"))
assertDisconnect(t, c, connB)
})
}
func TestConn_Close(t *testing.T) {
cases := []struct {
name string
run func(*testing.T, context.Context, *jsonrpc2.Conn)
}{{
name: "during Call",
run: func(t *testing.T, ctx context.Context, conn *jsonrpc2.Conn) {
ready := make(chan struct{})
done := make(chan struct{})
go func() {
close(ready)
err := conn.Call(ctx, "m", nil, nil)
if err != jsonrpc2.ErrClosed {
t.Errorf("got error %v, want %v", err, jsonrpc2.ErrClosed)
}
close(done)
}()
// Wait for the request to be sent before we close the connection.
<-ready
if err := conn.Close(); err != nil && err != jsonrpc2.ErrClosed {
t.Error(err)
}
<-done
},
}, {
name: "during Wait",
run: func(t *testing.T, ctx context.Context, conn *jsonrpc2.Conn) {
call, err := conn.DispatchCall(ctx, "m", nil, nil)
if err != nil {
t.Fatal(err)
}
if err := conn.Close(); err != nil {
t.Fatal(err)
}
if err := call.Wait(ctx, nil); err != jsonrpc2.ErrClosed {
t.Fatal(err)
}
},
}, {
name: "during Dispatch",
run: func(t *testing.T, ctx context.Context, conn *jsonrpc2.Conn) {
if err := conn.Close(); err != nil {
t.Fatal(err)
}
if _, err := conn.DispatchCall(ctx, "m", nil, nil); err != jsonrpc2.ErrClosed {
t.Fatal(err)
}
},
}}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
connA, connB := net.Pipe()
nodeA := jsonrpc2.NewConn(
ctx,
jsonrpc2.NewPlainObjectStream(connA), noopHandler{},
)
defer nodeA.Close()
nodeB := jsonrpc2.NewConn(
ctx,
jsonrpc2.NewPlainObjectStream(connB),
noopHandler{},
)
defer nodeB.Close()
tc.run(t, ctx, nodeB)
assertDisconnect(t, nodeB, connB)
})
}
}
func testParams(t *testing.T, want *json.RawMessage, fn func(c *jsonrpc2.Conn) error) {
wg := &sync.WaitGroup{}
handler := handlerFunc(func(ctx context.Context, conn *jsonrpc2.Conn, r *jsonrpc2.Request) {
assertRawJSONMessage(t, r.Params, want)
wg.Done()
})
connA, connB := Pipe(context.Background(), noopHandler{}, handler)
defer connA.Close()
defer connB.Close()
wg.Add(1)
if err := fn(connA); err != nil {
t.Error(err)
}
wg.Wait()
}
func assertDisconnect(t *testing.T, c *jsonrpc2.Conn, conn io.Writer) {
select {
case <-c.DisconnectNotify():
case <-time.After(200 * time.Millisecond):
t.Error("no disconnect notification")
return
}
// Assert that conn is closed by trying to write to it.
_, got := conn.Write(nil)
want := io.ErrClosedPipe
if got != want {
t.Errorf("got %s, want %s", got, want)
}
}
func assertRawJSONMessage(t *testing.T, got *json.RawMessage, want *json.RawMessage) {
// Assert pointers.
if got == nil || want == nil {
if got != want {
t.Errorf("pointer: got %s, want %s", got, want)
}
return
}
{
// If pointers are not nil, then assert values.
got := string(*got)
want := string(*want)
if got != want {
t.Errorf("value: got %q, want %q", got, want)
}
}
}
// Pipe returns two jsonrpc2.Conn, connected via a synchronous, in-memory, full
// duplex network connection.
func Pipe(ctx context.Context, handlerA, handlerB jsonrpc2.Handler) (connA *jsonrpc2.Conn, connB *jsonrpc2.Conn) {
a, b := net.Pipe()
connA = jsonrpc2.NewConn(ctx, jsonrpc2.NewPlainObjectStream(a), handlerA)
connB = jsonrpc2.NewConn(ctx, jsonrpc2.NewPlainObjectStream(b), handlerB)
return connA, connB
}

78
example_params_test.go Normal file
View file

@ -0,0 +1,78 @@
package jsonrpc2_test
import (
"context"
"encoding/json"
"fmt"
"net"
"os"
"github.com/sourcegraph/jsonrpc2"
)
// Send a JSON-RPC notification with its params member omitted.
func ExampleConn_Notify_paramsOmitted() {
ctx := context.Background()
connA, connB := net.Pipe()
defer connA.Close()
defer connB.Close()
rpcConn := jsonrpc2.NewConn(ctx, jsonrpc2.NewPlainObjectStream(connA), nil)
// Send the JSON-RPC notification.
go func() {
// Set params to nil.
if err := rpcConn.Notify(ctx, "foo", nil); err != nil {
fmt.Fprintln(os.Stderr, "notify:", err)
}
}()
// Read the raw JSON-RPC notification on connB.
//
// Reading the raw JSON-RPC request is for the purpose of this example only.
// Use a jsonrpc2.Handler to read parsed requests.
buf := make([]byte, 64)
n, err := connB.Read(buf)
if err != nil {
fmt.Fprintln(os.Stderr, "read:", err)
}
fmt.Printf("%s\n", buf[:n])
// Output: {"jsonrpc":"2.0","method":"foo"}
}
// Send a JSON-RPC notification with its params member set to null.
func ExampleConn_Notify_nullParams() {
ctx := context.Background()
connA, connB := net.Pipe()
defer connA.Close()
defer connB.Close()
rpcConn := jsonrpc2.NewConn(ctx, jsonrpc2.NewPlainObjectStream(connA), nil)
// Send the JSON-RPC notification.
go func() {
// Set params to the JSON null value.
params := json.RawMessage("null")
if err := rpcConn.Notify(ctx, "foo", params); err != nil {
fmt.Fprintln(os.Stderr, "notify:", err)
}
}()
// Read the raw JSON-RPC notification on connB.
//
// Reading the raw JSON-RPC request is for the purpose of this example only.
// Use a jsonrpc2.Handler to read parsed requests.
buf := make([]byte, 64)
n, err := connB.Read(buf)
if err != nil {
fmt.Fprintln(os.Stderr, "read:", err)
}
fmt.Printf("%s\n", buf[:n])
// Output: {"jsonrpc":"2.0","method":"foo","params":null}
}

64
example_test.go Normal file
View file

@ -0,0 +1,64 @@
package jsonrpc2_test
import (
"context"
"fmt"
"log"
"net"
"os"
"github.com/sourcegraph/jsonrpc2"
)
func Example() {
ctx := context.Background()
// Create an in-memory network connection. This connection is used below to
// transport the JSON-RPC messages. However, any io.ReadWriteCloser may be
// used to send/receive JSON-RPC messages.
connA, connB := net.Pipe()
// The following JSON-RPC connection is both a client and a server. It can
// send requests as well as receive requests. The incoming requests are
// handled by myHandler.
jsonrpcConnA := jsonrpc2.NewConn(ctx, jsonrpc2.NewPlainObjectStream(connA), &myHandler{})
defer jsonrpcConnA.Close()
// The following JSON-RPC connection has no handler, meaning that it is
// configured to only be a client. It can send requests and receive the
// responses to those requests, but it will ignore any incoming requests.
jsonrpcConnB := jsonrpc2.NewConn(ctx, jsonrpc2.NewPlainObjectStream(connB), nil)
defer jsonrpcConnB.Close()
// Send a request from jsonrpcConnB to jsonrpcConnA. The result of a
// successful call is stored in the result variable.
var result string
if err := jsonrpcConnB.Call(ctx, "sayHello", nil, &result); err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
fmt.Println(result)
// Output: hello world
}
// myHandler is the jsonrpc2.Handler used by jsonrpcConnA.
type myHandler struct{}
// Handle implements the jsonrpc2.Handler interface.
func (h *myHandler) Handle(ctx context.Context, c *jsonrpc2.Conn, r *jsonrpc2.Request) {
switch r.Method {
case "sayHello":
if err := c.Reply(ctx, r.ID, "hello world"); err != nil {
log.Println(err)
return
}
default:
err := &jsonrpc2.Error{Code: jsonrpc2.CodeMethodNotFound, Message: "Method not found"}
if err := c.ReplyWithError(ctx, r.ID, err); err != nil {
log.Println(err)
return
}
}
}

View file

@ -30,20 +30,16 @@ func (h *HandlerWithErrorConfigurer) Handle(ctx context.Context, conn *Conn, req
if err == nil {
err = resp.SetResult(result)
}
if err != nil {
if e, ok := err.(*Error); ok {
resp.Error = e
} else {
resp.Error = &Error{Message: err.Error()}
}
if e, ok := err.(*Error); ok {
resp.Error = e
} else if err != nil {
resp.Error = &Error{Message: err.Error()}
}
if !req.Notif {
if err := conn.SendResponse(ctx, resp); err != nil {
if err != ErrClosed || !h.suppressErrClosed {
conn.logger.Printf("jsonrpc2 handler: sending response %s: %v\n", resp.ID, err)
}
}
err = conn.SendResponse(ctx, resp)
if err != nil && (err != ErrClosed || !h.suppressErrClosed) {
conn.logger.Printf("jsonrpc2 handler: sending response %s: %v\n", resp.ID, err)
}
}

35
internal_test.go Normal file
View file

@ -0,0 +1,35 @@
package jsonrpc2
import (
"encoding/json"
"testing"
)
func TestAnyMessage(t *testing.T) {
tests := map[string]struct {
request, response, invalid bool
}{
// Single messages
`{}`: {invalid: true},
`{"foo":"bar"}`: {invalid: true},
`{"method":"m"}`: {request: true},
`{"result":123}`: {response: true},
`{"result":null}`: {response: true},
`{"error":{"code":456,"message":"m"}}`: {response: true},
}
for s, want := range tests {
var m anyMessage
if err := json.Unmarshal([]byte(s), &m); err != nil {
if !want.invalid {
t.Errorf("%s: error: %v", s, err)
}
continue
}
if (m.request != nil) != want.request {
t.Errorf("%s: got request %v, want %v", s, m.request != nil, want.request)
}
if (m.response != nil) != want.response {
t.Errorf("%s: got response %v, want %v", s, m.response != nil, want.response)
}
}
}

View file

@ -3,16 +3,11 @@
package jsonrpc2
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"os"
"strconv"
"sync"
)
// JSONRPC2 describes an interface for issuing requests that speak the
@ -30,176 +25,14 @@ type JSONRPC2 interface {
Close() error
}
// Request represents a JSON-RPC request or
// notification. See
// http://www.jsonrpc.org/specification#request_object and
// http://www.jsonrpc.org/specification#notification.
type Request struct {
Method string `json:"method"`
Params *json.RawMessage `json:"params,omitempty"`
ID ID `json:"id"`
Notif bool `json:"-"`
// Meta optionally provides metadata to include in the request.
//
// NOTE: It is not part of spec. However, it is useful for propogating
// tracing context, etc.
Meta *json.RawMessage `json:"meta,omitempty"`
}
// MarshalJSON implements json.Marshaler and adds the "jsonrpc":"2.0"
// property.
func (r Request) MarshalJSON() ([]byte, error) {
r2 := struct {
Method string `json:"method"`
Params *json.RawMessage `json:"params,omitempty"`
ID *ID `json:"id,omitempty"`
Meta *json.RawMessage `json:"meta,omitempty"`
JSONRPC string `json:"jsonrpc"`
}{
Method: r.Method,
Params: r.Params,
Meta: r.Meta,
JSONRPC: "2.0",
}
if !r.Notif {
r2.ID = &r.ID
}
return json.Marshal(r2)
}
// UnmarshalJSON implements json.Unmarshaler.
func (r *Request) UnmarshalJSON(data []byte) error {
var r2 struct {
Method string `json:"method"`
Params *json.RawMessage `json:"params,omitempty"`
Meta *json.RawMessage `json:"meta,omitempty"`
ID *ID `json:"id"`
}
// Detect if the "params" field is JSON "null" or just not present
// by seeing if the field gets overwritten to nil.
r2.Params = &json.RawMessage{}
if err := json.Unmarshal(data, &r2); err != nil {
return err
}
r.Method = r2.Method
switch {
case r2.Params == nil:
r.Params = &jsonNull
case len(*r2.Params) == 0:
r.Params = nil
default:
r.Params = r2.Params
}
r.Meta = r2.Meta
if r2.ID == nil {
r.ID = ID{}
r.Notif = true
} else {
r.ID = *r2.ID
r.Notif = false
}
return nil
}
// SetParams sets r.Params to the JSON representation of v. If JSON
// marshaling fails, it returns an error.
func (r *Request) SetParams(v interface{}) error {
b, err := json.Marshal(v)
if err != nil {
return err
}
r.Params = (*json.RawMessage)(&b)
return nil
}
// SetMeta sets r.Meta to the JSON representation of v. If JSON
// marshaling fails, it returns an error.
func (r *Request) SetMeta(v interface{}) error {
b, err := json.Marshal(v)
if err != nil {
return err
}
r.Meta = (*json.RawMessage)(&b)
return nil
}
// Response represents a JSON-RPC response. See
// http://www.jsonrpc.org/specification#response_object.
type Response struct {
ID ID `json:"id"`
Result *json.RawMessage `json:"result,omitempty"`
Error *Error `json:"error,omitempty"`
// Meta optionally provides metadata to include in the response.
//
// NOTE: It is not part of spec. However, it is useful for propogating
// tracing context, etc.
Meta *json.RawMessage `json:"meta,omitempty"`
// SPEC NOTE: The spec says "If there was an error in detecting
// the id in the Request object (e.g. Parse error/Invalid
// Request), it MUST be Null." If we made the ID field nullable,
// then we'd have to make it a pointer type. For simplicity, we're
// ignoring the case where there was an error in detecting the ID
// in the Request object.
}
// MarshalJSON implements json.Marshaler and adds the "jsonrpc":"2.0"
// property.
func (r Response) MarshalJSON() ([]byte, error) {
if (r.Result == nil || len(*r.Result) == 0) && r.Error == nil {
return nil, errors.New("can't marshal *jsonrpc2.Response (must have result or error)")
}
type tmpType Response // avoid infinite MarshalJSON recursion
b, err := json.Marshal(tmpType(r))
if err != nil {
return nil, err
}
b = append(b[:len(b)-1], []byte(`,"jsonrpc":"2.0"}`)...)
return b, nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (r *Response) UnmarshalJSON(data []byte) error {
type tmpType Response
// Detect if the "result" field is JSON "null" or just not present
// by seeing if the field gets overwritten to nil.
*r = Response{Result: &json.RawMessage{}}
if err := json.Unmarshal(data, (*tmpType)(r)); err != nil {
return err
}
if r.Result == nil { // JSON "null"
r.Result = &jsonNull
} else if len(*r.Result) == 0 {
r.Result = nil
}
return nil
}
// SetResult sets r.Result to the JSON representation of v. If JSON
// marshaling fails, it returns an error.
func (r *Response) SetResult(v interface{}) error {
b, err := json.Marshal(v)
if err != nil {
return err
}
r.Result = (*json.RawMessage)(&b)
return nil
}
// Error represents a JSON-RPC response error.
type Error struct {
Code int64 `json:"code"`
Message string `json:"message"`
Data *json.RawMessage `json:"data"`
Data *json.RawMessage `json:"data,omitempty"`
}
// SetError sets e.Error to the JSON representation of v. If JSON
// SetError sets e.Data to the JSON encoding of v. If JSON
// marshaling fails, it panics.
func (e *Error) SetError(v interface{}) {
b, err := json.Marshal(v)
@ -226,10 +59,10 @@ const (
// Handler handles JSON-RPC requests and notifications.
type Handler interface {
// Handle is called to handle a request. No other requests are handled
// until it returns. If you do not require strict ordering behavior
// of received RPCs, it is suggested to wrap your handler in
// AsyncHandler.
// Handle is called to handle a request. No other requests are handled until
// it returns. If you do not require strict ordering behavior of received
// RPCs, it is suggested to wrap your handler in AsyncHandler. The context
// is automatically canceled when the connection closes.
Handle(context.Context, *Conn, *Request)
}
@ -279,455 +112,8 @@ func (id *ID) UnmarshalJSON(data []byte) error {
return nil
}
// Conn is a JSON-RPC client/server connection. The JSON-RPC protocol
// is symmetric, so a Conn runs on both ends of a client-server
// connection.
type Conn struct {
stream ObjectStream
h Handler
mu sync.Mutex
shutdown bool
closing bool
seq uint64
pending map[ID]*call
sending sync.Mutex
disconnect chan struct{}
logger Logger
// Set by ConnOpt funcs.
onRecv []func(*Request, *Response)
onSend []func(*Request, *Response)
}
var _ JSONRPC2 = (*Conn)(nil)
// ErrClosed indicates that the JSON-RPC connection is closed (or in
// the process of closing).
var ErrClosed = errors.New("jsonrpc2: connection is closed")
// NewConn creates a new JSON-RPC client/server connection using the
// given ReadWriteCloser (typically a TCP connection or stdio). The
// JSON-RPC protocol is symmetric, so a Conn runs on both ends of a
// client-server connection.
//
// NewClient consumes conn, so you should call Close on the returned
// client not on the given conn.
func NewConn(ctx context.Context, stream ObjectStream, h Handler, opts ...ConnOpt) *Conn {
c := &Conn{
stream: stream,
h: h,
pending: map[ID]*call{},
disconnect: make(chan struct{}),
logger: log.New(os.Stderr, "", log.LstdFlags),
}
for _, opt := range opts {
if opt == nil {
continue
}
opt(c)
}
go c.readMessages(ctx)
return c
}
// Close closes the JSON-RPC connection. The connection may not be
// used after it has been closed.
func (c *Conn) Close() error {
c.mu.Lock()
if c.shutdown || c.closing {
c.mu.Unlock()
return ErrClosed
}
c.closing = true
c.mu.Unlock()
return c.stream.Close()
}
func (c *Conn) send(_ context.Context, m *anyMessage, wait bool) (cc *call, err error) {
c.sending.Lock()
defer c.sending.Unlock()
// m.request.ID could be changed, so we store a copy to correctly
// clean up pending
var id ID
c.mu.Lock()
if c.shutdown || c.closing {
c.mu.Unlock()
return nil, ErrClosed
}
// Store requests so we can later associate them with incoming
// responses.
if m.request != nil && wait {
cc = &call{request: m.request, seq: c.seq, done: make(chan error, 1)}
isIDUnset := len(m.request.ID.Str) == 0 && m.request.ID.Num == 0
if isIDUnset {
if m.request.ID.IsString {
m.request.ID.Str = strconv.FormatUint(c.seq, 10)
} else {
m.request.ID.Num = c.seq
}
}
id = m.request.ID
c.pending[id] = cc
c.seq++
}
c.mu.Unlock()
if len(c.onSend) > 0 {
var (
req *Request
resp *Response
)
switch {
case m.request != nil:
req = m.request
case m.response != nil:
resp = m.response
}
for _, onSend := range c.onSend {
onSend(req, resp)
}
}
// From here on, if we fail to send this, then we need to remove
// this from the pending map so we don't block on it or pile up
// pending entries for unsent messages.
defer func() {
if err != nil {
if cc != nil {
c.mu.Lock()
delete(c.pending, id)
c.mu.Unlock()
}
}
}()
if err := c.stream.WriteObject(m); err != nil {
return nil, err
}
return cc, nil
}
// Call initiates a JSON-RPC call using the specified method and
// params, and waits for the response. If the response is successful,
// its result is stored in result (a pointer to a value that can be
// JSON-unmarshaled into); otherwise, a non-nil error is returned.
func (c *Conn) Call(ctx context.Context, method string, params, result interface{}, opts ...CallOption) error {
call, err := c.DispatchCall(ctx, method, params, opts...)
if err != nil {
return err
}
return call.Wait(ctx, result)
}
// DispatchCall dispatches a JSON-RPC call using the specified method
// and params, and returns a call proxy or an error. Call Wait()
// on the returned proxy to receive the response. Only use this
// function if you need to do work after dispatching the request,
// otherwise use Call.
func (c *Conn) DispatchCall(ctx context.Context, method string, params interface{}, opts ...CallOption) (Waiter, error) {
req := &Request{Method: method}
if err := req.SetParams(params); err != nil {
return Waiter{}, err
}
for _, opt := range opts {
if opt == nil {
continue
}
if err := opt.apply(req); err != nil {
return Waiter{}, err
}
}
call, err := c.send(ctx, &anyMessage{request: req}, true)
if err != nil {
return Waiter{}, err
}
return Waiter{call: call}, nil
}
// Waiter proxies an ongoing JSON-RPC call.
type Waiter struct {
*call
}
// Wait for the result of an ongoing JSON-RPC call. If the response
// is successful, its result is stored in result (a pointer to a
// value that can be JSON-unmarshaled into); otherwise, a non-nil
// error is returned.
func (w Waiter) Wait(ctx context.Context, result interface{}) error {
select {
case err, ok := <-w.call.done:
if !ok {
err = ErrClosed
}
if err != nil {
return err
}
if result != nil {
if w.call.response.Result == nil {
w.call.response.Result = &jsonNull
}
if err := json.Unmarshal(*w.call.response.Result, result); err != nil {
return err
}
}
return nil
case <-ctx.Done():
return ctx.Err()
}
}
var jsonNull = json.RawMessage("null")
// Notify is like Call, but it returns when the notification request
// is sent (without waiting for a response, because JSON-RPC
// notifications do not have responses).
func (c *Conn) Notify(ctx context.Context, method string, params interface{}, opts ...CallOption) error {
req := &Request{Method: method, Notif: true}
if err := req.SetParams(params); err != nil {
return err
}
for _, opt := range opts {
if opt == nil {
continue
}
if err := opt.apply(req); err != nil {
return err
}
}
_, err := c.send(ctx, &anyMessage{request: req}, false)
return err
}
// Reply sends a successful response with a result.
func (c *Conn) Reply(ctx context.Context, id ID, result interface{}) error {
resp := &Response{ID: id}
if err := resp.SetResult(result); err != nil {
return err
}
_, err := c.send(ctx, &anyMessage{response: resp}, false)
return err
}
// ReplyWithError sends a response with an error.
func (c *Conn) ReplyWithError(ctx context.Context, id ID, respErr *Error) error {
_, err := c.send(ctx, &anyMessage{response: &Response{ID: id, Error: respErr}}, false)
return err
}
// SendResponse sends resp to the peer. It is lower level than (*Conn).Reply.
func (c *Conn) SendResponse(ctx context.Context, resp *Response) error {
_, err := c.send(ctx, &anyMessage{response: resp}, false)
return err
}
// DisconnectNotify returns a channel that is closed when the
// underlying connection is disconnected.
func (c *Conn) DisconnectNotify() <-chan struct{} {
return c.disconnect
}
func (c *Conn) readMessages(ctx context.Context) {
var err error
for err == nil {
var m anyMessage
err = c.stream.ReadObject(&m)
if err != nil {
break
}
switch {
case m.request != nil:
for _, onRecv := range c.onRecv {
onRecv(m.request, nil)
}
c.h.Handle(ctx, c, m.request)
case m.response != nil:
resp := m.response
if resp != nil {
id := resp.ID
c.mu.Lock()
call := c.pending[id]
delete(c.pending, id)
c.mu.Unlock()
if call != nil {
call.response = resp
}
if len(c.onRecv) > 0 {
var req *Request
if call != nil {
req = call.request
}
for _, onRecv := range c.onRecv {
onRecv(req, resp)
}
}
switch {
case call == nil:
c.logger.Printf("jsonrpc2: ignoring response #%s with no corresponding request\n", id)
case resp.Error != nil:
call.done <- resp.Error
close(call.done)
default:
call.done <- nil
close(call.done)
}
}
}
}
c.sending.Lock()
c.mu.Lock()
c.shutdown = true
closing := c.closing
if err == io.EOF {
if closing {
err = ErrClosed
} else {
err = io.ErrUnexpectedEOF
}
}
for _, call := range c.pending {
call.done <- err
close(call.done)
}
c.mu.Unlock()
c.sending.Unlock()
if err != io.ErrUnexpectedEOF && !closing {
c.logger.Printf("jsonrpc2: protocol error: %v\n", err)
}
close(c.disconnect)
}
// call represents a JSON-RPC call over its entire lifecycle.
type call struct {
request *Request
response *Response
seq uint64 // the seq of the request
done chan error
}
// anyMessage represents either a JSON Request or Response.
type anyMessage struct {
request *Request
response *Response
}
func (m anyMessage) MarshalJSON() ([]byte, error) {
var v interface{}
switch {
case m.request != nil && m.response == nil:
v = m.request
case m.request == nil && m.response != nil:
v = m.response
}
if v != nil {
return json.Marshal(v)
}
return nil, errors.New("jsonrpc2: message must have exactly one of the request or response fields set")
}
func (m *anyMessage) UnmarshalJSON(data []byte) error {
// The presence of these fields distinguishes between the 2
// message types.
type msg struct {
ID interface{} `json:"id"`
Method *string `json:"method"`
Result anyValueWithExplicitNull `json:"result"`
Error interface{} `json:"error"`
}
var isRequest, isResponse bool
checkType := func(m *msg) error {
mIsRequest := m.Method != nil
mIsResponse := m.Result.null || m.Result.value != nil || m.Error != nil
if (!mIsRequest && !mIsResponse) || (mIsRequest && mIsResponse) {
return errors.New("jsonrpc2: unable to determine message type (request or response)")
}
if (mIsRequest && isResponse) || (mIsResponse && isRequest) {
return errors.New("jsonrpc2: batch message type mismatch (must be all requests or all responses)")
}
isRequest = mIsRequest
isResponse = mIsResponse
return nil
}
if isArray := len(data) > 0 && data[0] == '['; isArray {
var msgs []msg
if err := json.Unmarshal(data, &msgs); err != nil {
return err
}
if len(msgs) == 0 {
return errors.New("jsonrpc2: invalid empty batch")
}
for i := range msgs {
if err := checkType(&msg{
ID: msgs[i].ID,
Method: msgs[i].Method,
Result: msgs[i].Result,
Error: msgs[i].Error,
}); err != nil {
return err
}
}
} else {
var m msg
if err := json.Unmarshal(data, &m); err != nil {
return err
}
if err := checkType(&m); err != nil {
return err
}
}
var v interface{}
switch {
case isRequest && !isResponse:
v = &m.request
case !isRequest && isResponse:
v = &m.response
}
if err := json.Unmarshal(data, v); err != nil {
return err
}
if !isRequest && isResponse && m.response.Error == nil && m.response.Result == nil {
m.response.Result = &jsonNull
}
return nil
}
// anyValueWithExplicitNull is used to distinguish {} from
// {"result":null} by anyMessage's JSON unmarshaler.
type anyValueWithExplicitNull struct {
null bool // JSON "null"
value interface{}
}
func (v anyValueWithExplicitNull) MarshalJSON() ([]byte, error) {
return json.Marshal(v.value)
}
func (v *anyValueWithExplicitNull) UnmarshalJSON(data []byte) error {
data = bytes.TrimSpace(data)
if string(data) == "null" {
*v = anyValueWithExplicitNull{null: true}
return nil
}
*v = anyValueWithExplicitNull{}
return json.Unmarshal(data, &v.value)
}

View file

@ -1,7 +1,6 @@
package jsonrpc2_test
import (
"bytes"
"context"
"encoding/json"
"fmt"
@ -19,62 +18,61 @@ import (
websocketjsonrpc2 "github.com/sourcegraph/jsonrpc2/websocket"
)
func TestRequest_MarshalJSON_jsonrpc(t *testing.T) {
b, err := json.Marshal(&jsonrpc2.Request{})
if err != nil {
t.Fatal(err)
func TestError_MarshalJSON(t *testing.T) {
tests := []struct {
name string
setError func(err *jsonrpc2.Error)
want string
}{
{
name: "Data == nil",
want: `{"code":-32603,"message":"Internal error"}`,
},
{
name: "Error.SetError(nil)",
setError: func(err *jsonrpc2.Error) {
err.SetError(nil)
},
want: `{"code":-32603,"message":"Internal error","data":null}`,
},
{
name: "Error.SetError(0)",
setError: func(err *jsonrpc2.Error) {
err.SetError(0)
},
want: `{"code":-32603,"message":"Internal error","data":0}`,
},
{
name: `Error.SetError("")`,
setError: func(err *jsonrpc2.Error) {
err.SetError("")
},
want: `{"code":-32603,"message":"Internal error","data":""}`,
},
{
name: `Error.SetError(false)`,
setError: func(err *jsonrpc2.Error) {
err.SetError(false)
},
want: `{"code":-32603,"message":"Internal error","data":false}`,
},
}
if want := `{"method":"","id":0,"jsonrpc":"2.0"}`; string(b) != want {
t.Errorf("got %q, want %q", b, want)
}
}
func TestResponse_MarshalJSON_jsonrpc(t *testing.T) {
null := json.RawMessage("null")
b, err := json.Marshal(&jsonrpc2.Response{Result: &null})
if err != nil {
t.Fatal(err)
}
if want := `{"id":0,"result":null,"jsonrpc":"2.0"}`; string(b) != want {
t.Errorf("got %q, want %q", b, want)
}
}
func TestResponseMarshalJSON_Notif(t *testing.T) {
tests := map[*jsonrpc2.Request]bool{
{ID: jsonrpc2.ID{Num: 0}}: true,
{ID: jsonrpc2.ID{Num: 1}}: true,
{ID: jsonrpc2.ID{Str: "", IsString: true}}: true,
{ID: jsonrpc2.ID{Str: "a", IsString: true}}: true,
{Notif: true}: false,
}
for r, wantIDKey := range tests {
b, err := json.Marshal(r)
for _, test := range tests {
e := &jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
}
if test.setError != nil {
test.setError(e)
}
b, err := json.Marshal(e)
if err != nil {
t.Fatal(err)
t.Error(err)
}
hasIDKey := bytes.Contains(b, []byte(`"id"`))
if hasIDKey != wantIDKey {
t.Errorf("got %s, want contain id key: %v", b, wantIDKey)
}
}
}
func TestResponseUnmarshalJSON_Notif(t *testing.T) {
tests := map[string]bool{
`{"method":"f","id":0}`: false,
`{"method":"f","id":1}`: false,
`{"method":"f","id":"a"}`: false,
`{"method":"f","id":""}`: false,
`{"method":"f"}`: true,
}
for s, want := range tests {
var r jsonrpc2.Request
if err := json.Unmarshal([]byte(s), &r); err != nil {
t.Fatal(err)
}
if r.Notif != want {
t.Errorf("%s: got %v, want %v", s, r.Notif, want)
got := string(b)
if got != test.want {
t.Fatalf("%s: got %q, want %q", test.name, got, test.want)
}
}
}
@ -112,44 +110,67 @@ func (h *testHandlerB) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jso
h.t.Errorf("testHandlerB got unexpected request %+v", req)
}
type streamMaker func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream
func testClientServerForCodec(t *testing.T, streamMaker streamMaker) {
ctx := context.Background()
done := make(chan struct{})
lis, err := net.Listen("tcp", "127.0.0.1:0") // any available address
if err != nil {
t.Fatal("Listen:", err)
}
defer func() {
if lis == nil {
return // already closed
}
if err = lis.Close(); err != nil {
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Fatal(err)
}
}
}()
ha := testHandlerA{t: t}
go func() {
if err = serve(ctx, lis, &ha, streamMaker); err != nil {
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Error(err)
}
}
close(done)
}()
conn, err := net.Dial("tcp", lis.Addr().String())
if err != nil {
t.Fatal("Dial:", err)
}
testClientServer(ctx, t, streamMaker(conn))
lis.Close()
<-done // ensure Serve's error return (if any) is caught by this test
}
func TestClientServer(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
ctx := context.Background()
done := make(chan struct{})
lis, err := net.Listen("tcp", "127.0.0.1:0") // any available address
if err != nil {
t.Fatal("Listen:", err)
}
defer func() {
if lis == nil {
return // already closed
}
if err = lis.Close(); err != nil {
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Fatal(err)
}
}
}()
ha := testHandlerA{t: t}
go func() {
if err = serve(ctx, lis, &ha); err != nil {
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Error(err)
}
}
close(done)
}()
conn, err := net.Dial("tcp", lis.Addr().String())
if err != nil {
t.Fatal("Dial:", err)
}
testClientServer(ctx, t, jsonrpc2.NewBufferedStream(conn, jsonrpc2.VarintObjectCodec{}))
lis.Close()
<-done // ensure Serve's error return (if any) is caught by this test
t.Run("tcp-varint-object-codec", func(t *testing.T) {
testClientServerForCodec(t, func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream {
return jsonrpc2.NewBufferedStream(conn, jsonrpc2.VarintObjectCodec{})
})
})
t.Run("tcp-vscode-object-codec", func(t *testing.T) {
testClientServerForCodec(t, func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream {
return jsonrpc2.NewBufferedStream(conn, jsonrpc2.VSCodeObjectCodec{})
})
})
t.Run("tcp-plain-object-codec", func(t *testing.T) {
testClientServerForCodec(t, func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream {
return jsonrpc2.NewBufferedStream(conn, jsonrpc2.PlainObjectCodec{})
})
})
t.Run("tcp-plain-object-stream", func(t *testing.T) {
testClientServerForCodec(t, func(conn io.ReadWriteCloser) jsonrpc2.ObjectStream {
return jsonrpc2.NewPlainObjectStream(conn)
})
})
t.Run("websocket", func(t *testing.T) {
ctx := context.Background()
@ -291,88 +312,19 @@ type noopHandler struct{}
func (noopHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {}
type readWriteCloser struct {
read, write func(p []byte) (n int, err error)
}
func (x readWriteCloser) Read(p []byte) (n int, err error) {
return x.read(p)
}
func (x readWriteCloser) Write(p []byte) (n int, err error) {
return x.write(p)
}
func (readWriteCloser) Close() error { return nil }
func eof(p []byte) (n int, err error) {
return 0, io.EOF
}
func TestConn_DisconnectNotify_EOF(t *testing.T) {
c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewBufferedStream(&readWriteCloser{eof, eof}, jsonrpc2.VarintObjectCodec{}), nil)
select {
case <-c.DisconnectNotify():
case <-time.After(200 * time.Millisecond):
t.Fatal("no disconnect notification")
}
}
func TestConn_DisconnectNotify_Close(t *testing.T) {
c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewBufferedStream(&readWriteCloser{eof, eof}, jsonrpc2.VarintObjectCodec{}), nil)
if err := c.Close(); err != nil {
t.Error(err)
}
select {
case <-c.DisconnectNotify():
case <-time.After(200 * time.Millisecond):
t.Fatal("no disconnect notification")
}
}
func TestConn_DisconnectNotify_Close_async(t *testing.T) {
done := make(chan struct{})
c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewBufferedStream(&readWriteCloser{eof, eof}, jsonrpc2.VarintObjectCodec{}), nil)
go func() {
if err := c.Close(); err != nil && err != jsonrpc2.ErrClosed {
t.Error(err)
}
close(done)
}()
select {
case <-c.DisconnectNotify():
case <-time.After(200 * time.Millisecond):
t.Fatal("no disconnect notification")
}
<-done
}
func TestConn_Close_waitingForResponse(t *testing.T) {
c := jsonrpc2.NewConn(context.Background(), jsonrpc2.NewBufferedStream(&readWriteCloser{eof, eof}, jsonrpc2.VarintObjectCodec{}), noopHandler{})
done := make(chan struct{})
go func() {
if err := c.Call(context.Background(), "m", nil, nil); err != jsonrpc2.ErrClosed {
t.Errorf("got error %v, want %v", err, jsonrpc2.ErrClosed)
}
close(done)
}()
if err := c.Close(); err != nil && err != jsonrpc2.ErrClosed {
t.Error(err)
}
select {
case <-c.DisconnectNotify():
case <-time.After(200 * time.Millisecond):
t.Fatal("no disconnect notification")
}
<-done
}
func serve(ctx context.Context, lis net.Listener, h jsonrpc2.Handler, opts ...jsonrpc2.ConnOpt) error {
func serve(ctx context.Context, lis net.Listener, h jsonrpc2.Handler, streamMaker streamMaker, opts ...jsonrpc2.ConnOpt) error {
for {
conn, err := lis.Accept()
if err != nil {
return err
}
jsonrpc2.NewConn(ctx, jsonrpc2.NewBufferedStream(conn, jsonrpc2.VarintObjectCodec{}), h, opts...)
jsonrpc2.NewConn(ctx, streamMaker(conn), h, opts...)
}
}
func rawJSONMessage(v string) *json.RawMessage {
b := []byte(v)
return (*json.RawMessage)(&b)
}
var jsonNull = json.RawMessage("null")

View file

@ -1,128 +0,0 @@
package jsonrpc2
import (
"bytes"
"encoding/json"
"reflect"
"testing"
)
func TestAnyMessage(t *testing.T) {
tests := map[string]struct {
request, response, invalid bool
}{
// Single messages
`{}`: {invalid: true},
`{"foo":"bar"}`: {invalid: true},
`{"method":"m"}`: {request: true},
`{"result":123}`: {response: true},
`{"result":null}`: {response: true},
`{"error":{"code":456,"message":"m"}}`: {response: true},
}
for s, want := range tests {
var m anyMessage
if err := json.Unmarshal([]byte(s), &m); err != nil {
if !want.invalid {
t.Errorf("%s: error: %v", s, err)
}
continue
}
if (m.request != nil) != want.request {
t.Errorf("%s: got request %v, want %v", s, m.request != nil, want.request)
}
if (m.response != nil) != want.response {
t.Errorf("%s: got response %v, want %v", s, m.response != nil, want.response)
}
}
}
func TestRequest_MarshalUnmarshalJSON(t *testing.T) {
null := json.RawMessage("null")
obj := json.RawMessage(`{"foo":"bar"}`)
tests := []struct {
data []byte
want Request
}{
{
data: []byte(`{"method":"m","params":{"foo":"bar"},"id":123,"jsonrpc":"2.0"}`),
want: Request{ID: ID{Num: 123}, Method: "m", Params: &obj},
},
{
data: []byte(`{"method":"m","params":null,"id":123,"jsonrpc":"2.0"}`),
want: Request{ID: ID{Num: 123}, Method: "m", Params: &null},
},
{
data: []byte(`{"method":"m","id":123,"jsonrpc":"2.0"}`),
want: Request{ID: ID{Num: 123}, Method: "m", Params: nil},
},
}
for _, test := range tests {
var got Request
if err := json.Unmarshal(test.data, &got); err != nil {
t.Error(err)
continue
}
if !reflect.DeepEqual(got, test.want) {
t.Errorf("%q: got %+v, want %+v", test.data, got, test.want)
continue
}
data, err := json.Marshal(got)
if err != nil {
t.Error(err)
continue
}
if !bytes.Equal(data, test.data) {
t.Errorf("got JSON %q, want %q", data, test.data)
}
}
}
func TestResponse_MarshalUnmarshalJSON(t *testing.T) {
null := json.RawMessage("null")
obj := json.RawMessage(`{"foo":"bar"}`)
tests := []struct {
data []byte
want Response
error bool
}{
{
data: []byte(`{"id":123,"result":{"foo":"bar"},"jsonrpc":"2.0"}`),
want: Response{ID: ID{Num: 123}, Result: &obj},
},
{
data: []byte(`{"id":123,"result":null,"jsonrpc":"2.0"}`),
want: Response{ID: ID{Num: 123}, Result: &null},
},
{
data: []byte(`{"id":123,"jsonrpc":"2.0"}`),
want: Response{ID: ID{Num: 123}, Result: nil},
error: true, // either result or error field must be set
},
}
for _, test := range tests {
var got Response
if err := json.Unmarshal(test.data, &got); err != nil {
t.Error(err)
continue
}
if !reflect.DeepEqual(got, test.want) {
t.Errorf("%q: got %+v, want %+v", test.data, got, test.want)
continue
}
data, err := json.Marshal(got)
if err != nil {
if test.error {
continue
}
t.Error(err)
continue
}
if test.error {
t.Errorf("%q: expected error", test.data)
continue
}
if !bytes.Equal(data, test.data) {
t.Errorf("got JSON %q, want %q", data, test.data)
}
}
}

178
request.go Normal file
View file

@ -0,0 +1,178 @@
package jsonrpc2
import (
"bytes"
"encoding/json"
"errors"
"fmt"
)
// Request represents a JSON-RPC request or
// notification. See
// http://www.jsonrpc.org/specification#request_object and
// http://www.jsonrpc.org/specification#notification.
type Request struct {
Method string `json:"method"`
Params *json.RawMessage `json:"params,omitempty"`
ID ID `json:"id"`
Notif bool `json:"-"`
// Meta optionally provides metadata to include in the request.
//
// NOTE: It is not part of spec. However, it is useful for propagating
// tracing context, etc.
Meta *json.RawMessage `json:"meta,omitempty"`
// ExtraFields optionally adds fields to the root of the JSON-RPC request.
//
// NOTE: It is not part of the spec, but there are other protocols based on
// JSON-RPC 2 that require it.
ExtraFields []RequestField `json:"-"`
}
// MarshalJSON implements json.Marshaler and adds the "jsonrpc":"2.0"
// property.
func (r Request) MarshalJSON() ([]byte, error) {
r2 := map[string]interface{}{
"jsonrpc": "2.0",
"method": r.Method,
}
for _, field := range r.ExtraFields {
r2[field.Name] = field.Value
}
if !r.Notif {
r2["id"] = &r.ID
}
if r.Params != nil {
r2["params"] = r.Params
}
if r.Meta != nil {
r2["meta"] = r.Meta
}
return json.Marshal(r2)
}
// UnmarshalJSON implements json.Unmarshaler.
func (r *Request) UnmarshalJSON(data []byte) error {
r2 := make(map[string]interface{})
pop := func(key string) interface{} {
defer delete(r2, key)
return r2[key]
}
// Detect if the "params" or "meta" fields are JSON "null" or just not
// present by seeing if the field gets overwritten to nil.
emptyParams := &json.RawMessage{}
r2["params"] = emptyParams
emptyMeta := &json.RawMessage{}
r2["meta"] = emptyMeta
decoder := json.NewDecoder(bytes.NewReader(data))
decoder.UseNumber()
if err := decoder.Decode(&r2); err != nil {
return err
}
var ok bool
r.Method, ok = pop("method").(string)
if !ok {
return errors.New("missing method field")
}
switch params := pop("params"); params {
case nil:
r.Params = &jsonNull
case emptyParams:
r.Params = nil
default:
b, err := json.Marshal(params)
if err != nil {
return fmt.Errorf("failed to marshal params: %w", err)
}
r.Params = (*json.RawMessage)(&b)
}
switch meta := pop("meta"); meta {
case nil:
r.Meta = &jsonNull
case emptyMeta:
r.Meta = nil
default:
b, err := json.Marshal(meta)
if err != nil {
return fmt.Errorf("failed to marshal Meta: %w", err)
}
r.Meta = (*json.RawMessage)(&b)
}
switch rawID := pop("id").(type) {
case nil:
r.ID = ID{}
r.Notif = true
case string:
r.ID = ID{Str: rawID, IsString: true}
r.Notif = false
case json.Number:
id, err := rawID.Int64()
if err != nil {
return fmt.Errorf("failed to unmarshal ID: %w", err)
}
r.ID = ID{Num: uint64(id)}
r.Notif = false
default:
return fmt.Errorf("unexpected ID type: %T", rawID)
}
// The jsonrpc field should not be added to ExtraFields.
delete(r2, "jsonrpc")
// Clear the extra fields before populating them again.
r.ExtraFields = nil
for name, value := range r2 {
r.ExtraFields = append(r.ExtraFields, RequestField{
Name: name,
Value: value,
})
}
return nil
}
// SetParams sets r.Params to the JSON encoding of v. If JSON
// marshaling fails, it returns an error.
func (r *Request) SetParams(v interface{}) error {
b, err := json.Marshal(v)
if err != nil {
return err
}
r.Params = (*json.RawMessage)(&b)
return nil
}
// SetMeta sets r.Meta to the JSON encoding of v. If JSON
// marshaling fails, it returns an error.
func (r *Request) SetMeta(v interface{}) error {
b, err := json.Marshal(v)
if err != nil {
return err
}
r.Meta = (*json.RawMessage)(&b)
return nil
}
// SetExtraField adds an entry to r.ExtraFields, so that it is added to the
// JSON encoding of the request, as a way to add arbitrary extensions to
// JSON RPC 2.0. If JSON marshaling fails, it returns an error.
func (r *Request) SetExtraField(name string, v interface{}) error {
switch name {
case "id", "jsonrpc", "meta", "method", "params":
return fmt.Errorf("invalid extra field %q", name)
}
r.ExtraFields = append(r.ExtraFields, RequestField{
Name: name,
Value: v,
})
return nil
}
// RequestField is a top-level field that can be added to the JSON-RPC request.
type RequestField struct {
Name string
Value interface{}
}

64
request_test.go Normal file
View file

@ -0,0 +1,64 @@
package jsonrpc2_test
import (
"bytes"
"encoding/json"
"reflect"
"testing"
"github.com/sourcegraph/jsonrpc2"
)
func TestRequest_MarshalJSON_jsonrpc(t *testing.T) {
b, err := json.Marshal(&jsonrpc2.Request{})
if err != nil {
t.Fatal(err)
}
if want := `{"id":0,"jsonrpc":"2.0","method":""}`; string(b) != want {
t.Errorf("got %q, want %q", b, want)
}
}
func TestRequest_MarshalUnmarshalJSON(t *testing.T) {
obj := json.RawMessage(`{"foo":"bar"}`)
tests := []struct {
data []byte
want jsonrpc2.Request
}{
{
data: []byte(`{"id":123,"jsonrpc":"2.0","method":"m","params":{"foo":"bar"}}`),
want: jsonrpc2.Request{ID: jsonrpc2.ID{Num: 123}, Method: "m", Params: &obj},
},
{
data: []byte(`{"id":123,"jsonrpc":"2.0","method":"m","params":null}`),
want: jsonrpc2.Request{ID: jsonrpc2.ID{Num: 123}, Method: "m", Params: &jsonNull},
},
{
data: []byte(`{"id":123,"jsonrpc":"2.0","method":"m"}`),
want: jsonrpc2.Request{ID: jsonrpc2.ID{Num: 123}, Method: "m", Params: nil},
},
{
data: []byte(`{"id":123,"jsonrpc":"2.0","method":"m","sessionId":"session"}`),
want: jsonrpc2.Request{ID: jsonrpc2.ID{Num: 123}, Method: "m", Params: nil, ExtraFields: []jsonrpc2.RequestField{{Name: "sessionId", Value: "session"}}},
},
}
for _, test := range tests {
var got jsonrpc2.Request
if err := json.Unmarshal(test.data, &got); err != nil {
t.Error(err)
continue
}
if !reflect.DeepEqual(got, test.want) {
t.Errorf("%q: got %+v, want %+v", test.data, got, test.want)
continue
}
data, err := json.Marshal(got)
if err != nil {
t.Error(err)
continue
}
if !bytes.Equal(data, test.data) {
t.Errorf("got JSON %q, want %q", data, test.data)
}
}
}

72
response.go Normal file
View file

@ -0,0 +1,72 @@
package jsonrpc2
import (
"encoding/json"
"errors"
)
// Response represents a JSON-RPC response. See
// http://www.jsonrpc.org/specification#response_object.
type Response struct {
ID ID `json:"id"`
Result *json.RawMessage `json:"result,omitempty"`
Error *Error `json:"error,omitempty"`
// Meta optionally provides metadata to include in the response.
//
// NOTE: It is not part of spec. However, it is useful for propagating
// tracing context, etc.
Meta *json.RawMessage `json:"meta,omitempty"`
// SPEC NOTE: The spec says "If there was an error in detecting
// the id in the Request object (e.g. Parse error/Invalid
// Request), it MUST be Null." If we made the ID field nullable,
// then we'd have to make it a pointer type. For simplicity, we're
// ignoring the case where there was an error in detecting the ID
// in the Request object.
}
// MarshalJSON implements json.Marshaler and adds the "jsonrpc":"2.0"
// property.
func (r Response) MarshalJSON() ([]byte, error) {
if (r.Result == nil || len(*r.Result) == 0) && r.Error == nil {
return nil, errors.New("can't marshal *jsonrpc2.Response (must have result or error)")
}
type tmpType Response // avoid infinite MarshalJSON recursion
b, err := json.Marshal(tmpType(r))
if err != nil {
return nil, err
}
b = append(b[:len(b)-1], []byte(`,"jsonrpc":"2.0"}`)...)
return b, nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (r *Response) UnmarshalJSON(data []byte) error {
type tmpType Response
// Detect if the "result" field is JSON "null" or just not present
// by seeing if the field gets overwritten to nil.
*r = Response{Result: &json.RawMessage{}}
if err := json.Unmarshal(data, (*tmpType)(r)); err != nil {
return err
}
if r.Result == nil { // JSON "null"
r.Result = &jsonNull
} else if len(*r.Result) == 0 {
r.Result = nil
}
return nil
}
// SetResult sets r.Result to the JSON representation of v. If JSON
// marshaling fails, it returns an error.
func (r *Response) SetResult(v interface{}) error {
b, err := json.Marshal(v)
if err != nil {
return err
}
r.Result = (*json.RawMessage)(&b)
return nil
}

108
response_test.go Normal file
View file

@ -0,0 +1,108 @@
package jsonrpc2_test
import (
"bytes"
"encoding/json"
"reflect"
"testing"
"github.com/sourcegraph/jsonrpc2"
)
func TestResponse_MarshalJSON_jsonrpc(t *testing.T) {
b, err := json.Marshal(&jsonrpc2.Response{Result: &jsonNull})
if err != nil {
t.Fatal(err)
}
if want := `{"id":0,"result":null,"jsonrpc":"2.0"}`; string(b) != want {
t.Errorf("got %q, want %q", b, want)
}
}
func TestResponseMarshalJSON_Notif(t *testing.T) {
tests := map[*jsonrpc2.Request]bool{
{ID: jsonrpc2.ID{Num: 0}}: true,
{ID: jsonrpc2.ID{Num: 1}}: true,
{ID: jsonrpc2.ID{Str: "", IsString: true}}: true,
{ID: jsonrpc2.ID{Str: "a", IsString: true}}: true,
{Notif: true}: false,
}
for r, wantIDKey := range tests {
b, err := json.Marshal(r)
if err != nil {
t.Fatal(err)
}
hasIDKey := bytes.Contains(b, []byte(`"id"`))
if hasIDKey != wantIDKey {
t.Errorf("got %s, want contain id key: %v", b, wantIDKey)
}
}
}
func TestResponseUnmarshalJSON_Notif(t *testing.T) {
tests := map[string]bool{
`{"method":"f","id":0}`: false,
`{"method":"f","id":1}`: false,
`{"method":"f","id":"a"}`: false,
`{"method":"f","id":""}`: false,
`{"method":"f"}`: true,
}
for s, want := range tests {
var r jsonrpc2.Request
if err := json.Unmarshal([]byte(s), &r); err != nil {
t.Fatal(err)
}
if r.Notif != want {
t.Errorf("%s: got %v, want %v", s, r.Notif, want)
}
}
}
func TestResponse_MarshalUnmarshalJSON(t *testing.T) {
obj := json.RawMessage(`{"foo":"bar"}`)
tests := []struct {
data []byte
want jsonrpc2.Response
error bool
}{
{
data: []byte(`{"id":123,"result":{"foo":"bar"},"jsonrpc":"2.0"}`),
want: jsonrpc2.Response{ID: jsonrpc2.ID{Num: 123}, Result: &obj},
},
{
data: []byte(`{"id":123,"result":null,"jsonrpc":"2.0"}`),
want: jsonrpc2.Response{ID: jsonrpc2.ID{Num: 123}, Result: &jsonNull},
},
{
data: []byte(`{"id":123,"jsonrpc":"2.0"}`),
want: jsonrpc2.Response{ID: jsonrpc2.ID{Num: 123}, Result: nil},
error: true, // either result or error field must be set
},
}
for _, test := range tests {
var got jsonrpc2.Response
if err := json.Unmarshal(test.data, &got); err != nil {
t.Error(err)
continue
}
if !reflect.DeepEqual(got, test.want) {
t.Errorf("%q: got %+v, want %+v", test.data, got, test.want)
continue
}
data, err := json.Marshal(got)
if err != nil {
if test.error {
continue
}
t.Error(err)
continue
}
if test.error {
t.Errorf("%q: expected error", test.data)
continue
}
if !bytes.Equal(data, test.data) {
t.Errorf("got JSON %q, want %q", data, test.data)
}
}
}

View file

@ -40,6 +40,12 @@ type bufferedObjectStream struct {
// objectStream is used to produce the bytes to write to the stream
// for the JSON-RPC 2.0 objects.
func NewBufferedStream(conn io.ReadWriteCloser, codec ObjectCodec) ObjectStream {
switch v := codec.(type) {
case PlainObjectCodec:
v.decoder = json.NewDecoder(conn)
v.encoder = json.NewEncoder(conn)
codec = v
}
return &bufferedObjectStream{
conn: conn,
w: bufio.NewWriter(conn),
@ -68,7 +74,7 @@ func (t *bufferedObjectStream) Close() error {
return t.conn.Close()
}
// An ObjectCodec specifies how to encoed and decode a JSON-RPC 2.0
// An ObjectCodec specifies how to encode and decode a JSON-RPC 2.0
// object in a stream.
type ObjectCodec interface {
// WriteObject writes a JSON-RPC 2.0 object to the stream.
@ -164,14 +170,57 @@ func (VSCodeObjectCodec) ReadObject(stream *bufio.Reader, v interface{}) error {
}
// PlainObjectCodec reads/writes plain JSON-RPC 2.0 objects without a header.
type PlainObjectCodec struct{}
//
// Deprecated: use NewPlainObjectStream
type PlainObjectCodec struct {
decoder *json.Decoder
encoder *json.Encoder
}
// WriteObject implements ObjectCodec.
func (PlainObjectCodec) WriteObject(stream io.Writer, v interface{}) error {
func (c PlainObjectCodec) WriteObject(stream io.Writer, v interface{}) error {
if c.encoder != nil {
return c.encoder.Encode(v)
}
return json.NewEncoder(stream).Encode(v)
}
// ReadObject implements ObjectCodec.
func (PlainObjectCodec) ReadObject(stream *bufio.Reader, v interface{}) error {
func (c PlainObjectCodec) ReadObject(stream *bufio.Reader, v interface{}) error {
if c.decoder != nil {
return c.decoder.Decode(v)
}
return json.NewDecoder(stream).Decode(v)
}
// plainObjectStream reads/writes plain JSON-RPC 2.0 objects without a header.
type plainObjectStream struct {
conn io.Closer
decoder *json.Decoder
encoder *json.Encoder
}
// NewPlainObjectStream creates a buffered stream from a network
// connection (or other similar interface). The underlying
// objectStream produces plain JSON-RPC 2.0 objects without a header.
func NewPlainObjectStream(conn io.ReadWriteCloser) ObjectStream {
return &plainObjectStream{
conn: conn,
encoder: json.NewEncoder(conn),
decoder: json.NewDecoder(conn),
}
}
func (os *plainObjectStream) ReadObject(v interface{}) error {
return os.decoder.Decode(v)
}
// WriteObject serializes a value to JSON and writes it to a stream.
// Not thread-safe, a user must synchronize writes in a multithreaded environment.
func (os *plainObjectStream) WriteObject(v interface{}) error {
return os.encoder.Encode(v)
}
func (os *plainObjectStream) Close() error {
return os.conn.Close()
}