From 4ec78c38dfb377cbe61e25ef8bd9eb20b7a09956 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Fri, 5 May 2023 07:32:39 +0200 Subject: [PATCH] app/ship_processor.go: Include Blocknum and Timestamp in ActionTrace messages. --- app/ship_processor.go | 133 +++++++++++++++++++++--------------------- 1 file changed, 68 insertions(+), 65 deletions(-) diff --git a/app/ship_processor.go b/app/ship_processor.go index c7cb9f0..2ad4cef 100644 --- a/app/ship_processor.go +++ b/app/ship_processor.go @@ -43,7 +43,9 @@ func SpawnProccessor(shipStream *shipclient.Stream, writer api.Writer, abi *abi. // Attach handlers shipStream.BlockHandler = processor.processBlock - shipStream.TraceHandler = processor.processTraces + + // Needed because if nil, traces will not be included in the response from ship. + shipStream.TraceHandler = func([]*ship.TransactionTraceV0) {} return processor } @@ -77,83 +79,81 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) { } processor.encodeQueue(api.HeartbeatChannel, hb) - - err := processor.writer.Flush() - if err != nil { - log.WithError(err).Error("Failed to send messages") - } } -} -func (processor *ShipProcessor) processTraces(traces []*ship.TransactionTraceV0) { - for _, trace := range traces { + // Process traces + if block.Traces != nil && len(block.Traces.Elem) > 0 { + for _, trace := range block.Traces.AsTransactionTracesV0() { - processor.encodeQueue(api.TransactionChannel, trace) + processor.encodeQueue(api.TransactionChannel, trace) - // Actions - for _, actionTraceVar := range trace.ActionTraces { - var act_trace *ship.ActionTraceV1 + // Actions + for _, actionTraceVar := range trace.ActionTraces { + var act_trace *ship.ActionTraceV1 - if trace_v0, ok := actionTraceVar.Impl.(*ship.ActionTraceV0); ok { - // convert to v1 - act_trace = &ship.ActionTraceV1{ - ActionOrdinal: trace_v0.ActionOrdinal, - CreatorActionOrdinal: trace_v0.CreatorActionOrdinal, - Receipt: trace_v0.Receipt, - Receiver: trace_v0.Receiver, - Act: trace_v0.Act, - ContextFree: trace_v0.ContextFree, - Elapsed: trace_v0.Elapsed, - Console: trace_v0.Console, - AccountRamDeltas: trace_v0.AccountRamDeltas, - Except: trace_v0.Except, - ErrorCode: trace_v0.ErrorCode, - ReturnValue: []byte{}, + if trace_v0, ok := actionTraceVar.Impl.(*ship.ActionTraceV0); ok { + // convert to v1 + act_trace = &ship.ActionTraceV1{ + ActionOrdinal: trace_v0.ActionOrdinal, + CreatorActionOrdinal: trace_v0.CreatorActionOrdinal, + Receipt: trace_v0.Receipt, + Receiver: trace_v0.Receiver, + Act: trace_v0.Act, + ContextFree: trace_v0.ContextFree, + Elapsed: trace_v0.Elapsed, + Console: trace_v0.Console, + AccountRamDeltas: trace_v0.AccountRamDeltas, + Except: trace_v0.Except, + ErrorCode: trace_v0.ErrorCode, + ReturnValue: []byte{}, + } + } else { + act_trace = actionTraceVar.Impl.(*ship.ActionTraceV1) } - } else { - act_trace = actionTraceVar.Impl.(*ship.ActionTraceV1) - } - act := message.ActionTrace{ - TxID: trace.ID.String(), - Name: act_trace.Act.Name.String(), - Contract: act_trace.Act.Account.String(), - Receiver: act_trace.Receiver.String(), - HexData: hex.EncodeToString(act_trace.Act.Data), - } + act := message.ActionTrace{ + TxID: trace.ID.String(), + BlockNum: block.Block.BlockNumber(), + Timestamp: block.Block.Timestamp.Time.UTC(), + Name: act_trace.Act.Name.String(), + Contract: act_trace.Act.Account.String(), + Receiver: act_trace.Receiver.String(), + HexData: hex.EncodeToString(act_trace.Act.Data), + } - for _, auth := range act_trace.Act.Authorization { - act.Authorization = append(act.Authorization, message.PermissionLevel{ - Actor: auth.Actor.String(), - Permission: auth.Permission.String(), - }) - } + for _, auth := range act_trace.Act.Authorization { + act.Authorization = append(act.Authorization, message.PermissionLevel{ + Actor: auth.Actor.String(), + Permission: auth.Permission.String(), + }) + } - ABI, err := processor.abi.GetAbi(act_trace.Act.Account) - if err == nil { - data, err := ABI.DecodeAction(act_trace.Act.Data, act_trace.Act.Name) + ABI, err := processor.abi.GetAbi(act_trace.Act.Account) + if err == nil { + data, err := ABI.DecodeAction(act_trace.Act.Data, act_trace.Act.Name) + if err != nil { + log.WithError(err).Warn("Failed to decode action") + } + act.Data = data + } else { + log.WithError(err).Errorf("Failed to get abi for contract %s", act_trace.Act.Account) + } + + payload, err := processor.encode(act) if err != nil { - log.WithError(err).Warn("Failed to decode action") + continue } - act.Data = data - } else { - log.WithError(err).Errorf("Failed to get abi for contract %s", act_trace.Act.Account) - } - payload, err := processor.encode(act) - if err != nil { - continue - } + channels := []api.Channel{ + api.ActionChannel{}.Channel(), + api.ActionChannel{Name: act.Name}.Channel(), + api.ActionChannel{Contract: act.Contract}.Channel(), + api.ActionChannel{Name: act.Name, Contract: act.Contract}.Channel(), + } - channels := []api.Channel{ - api.ActionChannel{}.Channel(), - api.ActionChannel{Name: act.Name}.Channel(), - api.ActionChannel{Contract: act.Contract}.Channel(), - api.ActionChannel{Name: act.Name, Contract: act.Contract}.Channel(), - } - - for _, channel := range channels { - processor.queueMessage(channel, payload) + for _, channel := range channels { + processor.queueMessage(channel, payload) + } } } } @@ -164,6 +164,9 @@ func (processor *ShipProcessor) processTraces(traces []*ship.TransactionTraceV0) } } +func (processor *ShipProcessor) processTraces([]*ship.TransactionTraceV0) { +} + func (processor *ShipProcessor) Close() error { return processor.writer.Close() }