Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace ErrorReporter with a structured logging interface. #537

Merged
merged 2 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rpc/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func BenchmarkPingPong(b *testing.B) {
p1, p2 := net.Pipe()
srv := testcp.PingPong_ServerToClient(pingPongServer{})
conn1 := rpc.NewConn(rpc.NewStreamTransport(p2), &rpc.Options{
ErrorReporter: testErrorReporter{tb: b},
Logger: testErrorReporter{tb: b},
BootstrapClient: capnp.Client(srv),
})
defer func() {
Expand All @@ -86,7 +86,7 @@ func BenchmarkPingPong(b *testing.B) {
}
}()
conn2 := rpc.NewConn(rpc.NewStreamTransport(p1), &rpc.Options{
ErrorReporter: testErrorReporter{tb: b},
Logger: testErrorReporter{tb: b},
})
defer func() {
if err := conn2.Close(); err != nil {
Expand Down
30 changes: 27 additions & 3 deletions rpc/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,35 @@ var (
)

type errReporter struct {
ErrorReporter
Logger Logger
}

func (er errReporter) Debug(msg string, args ...any) {
if er.Logger != nil {
er.Logger.Debug(msg, args...)
}
}

func (er errReporter) Info(msg string, args ...any) {
if er.Logger != nil {
er.Logger.Info(msg, args...)
}
}

func (er errReporter) Warn(msg string, args ...any) {
if er.Logger != nil {
er.Logger.Warn(msg, args...)
}
}

func (er errReporter) Error(msg string, args ...any) {
if er.Logger != nil {
er.Logger.Error(msg, args...)
}
}

func (er errReporter) ReportError(err error) {
if er.ErrorReporter != nil && err != nil {
er.ErrorReporter.ReportError(err)
if err != nil {
er.Error(err.Error())
}
}
79 changes: 56 additions & 23 deletions rpc/level0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestSendAbort(t *testing.T) {
defer p2.Close()

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t, fail: true},
Logger: testErrorReporter{tb: t, fail: true},
// Give it plenty of time to actually send the message;
// otherwise we might time out and close the connection first.
// "plenty of time" here really means defer to the test suite's
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestSendAbort(t *testing.T) {
p1, p2 := net.Pipe()
defer p2.Close()
conn := rpc.NewConn(transport.NewStream(p1), &rpc.Options{
ErrorReporter: testErrorReporter{tb: t, fail: true},
Logger: testErrorReporter{tb: t, fail: true},
})

// Should have a timeout.
Expand All @@ -140,7 +140,7 @@ func TestRecvAbort(t *testing.T) {
defer p2.Close()

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})

select {
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestSendBootstrapError(t *testing.T) {
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
defer finishTest(t, conn, p2)

Expand Down Expand Up @@ -288,7 +288,7 @@ func TestSendBootstrapCall(t *testing.T) {
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
defer finishTest(t, conn, p2)

Expand Down Expand Up @@ -499,7 +499,7 @@ func TestSendBootstrapCallException(t *testing.T) {
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
defer finishTest(t, conn, p2)

Expand Down Expand Up @@ -676,7 +676,7 @@ func TestSendBootstrapPipelineCall(t *testing.T) {
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
defer finishTest(t, conn, p2)

Expand Down Expand Up @@ -877,7 +877,7 @@ func TestRecvBootstrapError(t *testing.T) {
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
defer finishTest(t, conn, p2)
ctx := context.Background()
Expand Down Expand Up @@ -954,7 +954,7 @@ func TestRecvBootstrapCall(t *testing.T) {

conn := rpc.NewConn(p1, &rpc.Options{
BootstrapClient: srv,
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
defer func() {
finishTest(t, conn, p2)
Expand Down Expand Up @@ -1108,7 +1108,7 @@ func TestRecvBootstrapCallException(t *testing.T) {

conn := rpc.NewConn(p1, &rpc.Options{
BootstrapClient: srv,
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
defer finishTest(t, conn, p2)

Expand Down Expand Up @@ -1265,7 +1265,7 @@ func TestRecvBootstrapPipelineCall(t *testing.T) {

conn := rpc.NewConn(p1, &rpc.Options{
BootstrapClient: srv,
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
defer func() {
finishTest(t, conn, p2)
Expand Down Expand Up @@ -1372,12 +1372,12 @@ func TestDuplicateBootstrap(t *testing.T) {

srvConn := rpc.NewConn(p1, &rpc.Options{
BootstrapClient: srv,
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
defer srvConn.Close()

clientConn := rpc.NewConn(p2, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
defer clientConn.Close()

Expand Down Expand Up @@ -1411,12 +1411,12 @@ func TestUseConnAfterBootstrapError(t *testing.T) {

srvConn := rpc.NewConn(p1, &rpc.Options{
BootstrapClient: srv,
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
defer srvConn.Close()

clientConn := rpc.NewConn(p2, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
defer clientConn.Close()

Expand Down Expand Up @@ -1461,7 +1461,7 @@ func TestCallOnClosedConn(t *testing.T) {

defer p2.Close()
conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
closed := false
defer func() {
Expand Down Expand Up @@ -1606,7 +1606,7 @@ func TestRecvCancel(t *testing.T) {
defer p2.Close()
conn := rpc.NewConn(p1, &rpc.Options{
BootstrapClient: srv,
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
closed := false
defer func() {
Expand Down Expand Up @@ -1750,7 +1750,7 @@ func TestSendCancel(t *testing.T) {
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
defer finishTest(t, conn, p2)
ctx := context.Background()
Expand Down Expand Up @@ -2247,15 +2247,48 @@ func canceledContext(parent context.Context) context.Context {

type testErrorReporter struct {
tb interface {
Log(...any)
Logf(string, ...any)
Fail()
}
fail bool
}

func (r testErrorReporter) ReportError(e error) {
r.tb.Log("conn error:", e)
if r.fail {
r.tb.Fail()
func mkArgsMap(args []any) map[string]any {
if len(args)%2 != 0 {
panic("odd number of arguments passed to logging method")
}
ret := make(map[string]any, len(args)/2)
for i := 0; i < len(args); i += 2 {
k := args[i].(string)
v := args[i+1]
ret[k] = v
}
return ret
}

func (t testErrorReporter) log(level string, msg string, args ...any) {
t.tb.Logf("log level %v: %v (args: %v)", level, msg, mkArgsMap(args))
}

func (t testErrorReporter) Debug(msg string, args ...any) {
t.log("debug", msg, args...)
}

func (t testErrorReporter) Info(msg string, args ...any) {
t.log("info", msg, args...)
}

func (t testErrorReporter) Warn(msg string, args ...any) {
t.log("warn", msg, args...)
}

func (t testErrorReporter) Error(msg string, args ...any) {
t.log("error", msg, args...)
if t.fail {
t.tb.Fail()
}
}

func (t testErrorReporter) ReportError(e error) {
t.Error(e.Error())
}
6 changes: 3 additions & 3 deletions rpc/level1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func testSendDisembargo(t *testing.T, sendPrimeTo rpccp.Call_sendResultsTo_Which
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
defer finishTest(t, conn, p2)
ctx := context.Background()
Expand Down Expand Up @@ -512,7 +512,7 @@ func TestRecvDisembargo(t *testing.T) {

conn := rpc.NewConn(p1, &rpc.Options{
BootstrapClient: srv,
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
defer finishTest(t, conn, p2)
ctx := context.Background()
Expand Down Expand Up @@ -816,7 +816,7 @@ func TestIssue3(t *testing.T) {

conn := rpc.NewConn(p1, &rpc.Options{
BootstrapClient: srv,
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
})
defer finishTest(t, conn, p2)
ctx := context.Background()
Expand Down
26 changes: 18 additions & 8 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ type Options struct {
// closed.
BootstrapClient capnp.Client

// ErrorReporter will be called upon when errors occur while the Conn
// is receiving messages from the remote vat.
ErrorReporter ErrorReporter
// Logger is used for logging by the RPC system, including errors that
// occur while the Conn is receiving messages from the remote vat.
Logger Logger

// AbortTimeout specifies how long to block on sending an abort message
// before closing the transport. If zero, then a reasonably short
Expand All @@ -203,10 +203,20 @@ type Options struct {
Network Network
}

// ErrorReporter can receive errors from a Conn. ReportError should be quick
// to return and should not use the Conn that it is attached to.
type ErrorReporter interface {
ReportError(error)
// Logger is used for logging by the RPC system. Each method logs
// messages at a different level, but otherwise has the same semantics:
//
// - Message is a human-readable description of the log event.
// - Args is a sequenece of key, value pairs, where the keys must be strings
// and the values may be any type.
// - The methods may not block for long periods of time.
//
// This interface is designed such that it is satisfied by *slog.Logger.
type Logger interface {
Debug(message string, args ...any)
Info(message string, args ...any)
Warn(message string, args ...any)
Error(message string, args ...any)
}

// NewConn creates a new connection that communicates on a given transport.
Expand All @@ -233,7 +243,7 @@ func NewConn(t Transport, opts *Options) *Conn {

if opts != nil {
c.bootstrap = opts.BootstrapClient
c.er = errReporter{opts.ErrorReporter}
c.er = errReporter{opts.Logger}
c.abortTimeout = opts.AbortTimeout
c.network = opts.Network
c.remotePeerID = opts.RemotePeerID
Expand Down
10 changes: 5 additions & 5 deletions rpc/senderpromise_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestSenderPromiseFulfill(t *testing.T) {
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
BootstrapClient: capnp.Client(p),
})
defer finishTest(t, conn, p2)
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestResolveUnimplementedDrop(t *testing.T) {
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
BootstrapClient: capnp.Client(provider),
})
defer finishTest(t, conn, p2)
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestDisembargoSenderPromise(t *testing.T) {
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
BootstrapClient: capnp.Client(p),
})
defer finishTest(t, conn, p2)
Expand Down Expand Up @@ -364,14 +364,14 @@ func TestPromiseOrdering(t *testing.T) {
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

c1 := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
BootstrapClient: capnp.Client(p),
})
ord := &echoNumOrderChecker{
t: t,
}
c2 := rpc.NewConn(p2, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
Logger: testErrorReporter{tb: t},
BootstrapClient: capnp.Client(testcapnp.PingPong_ServerToClient(ord)),
})

Expand Down
Loading