Skip to content

Commit

Permalink
Reorgnize sub log (#100)
Browse files Browse the repository at this point in the history
* merge reorg and log in SubscriptionLog

* update doc

Co-authored-by: dayong <[email protected]>
  • Loading branch information
wangdayong228 and dayong authored Aug 3, 2021
1 parent ba8cb3c commit c37ada4
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 73 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,14 @@ func main() {
fmt.Printf("decoded transfer event: {From: 0x%x, To: 0x%x, Value: %v} ", Transfer.From, Transfer.To, Transfer.Value)
}
```
### Use middleware to hook rpc request

## Subscribe Epochs/BlockHeads/Logs

Please find Publish-Subscribe API documentation from https://developer.confluxnetwork.org/conflux-doc/docs/pubsub

It should be noted that when subscribing logs, a `SubscribeLogs` object is received. It has two fields `Log` and `ChainRerog`, one of them must be nil and the other not. When Log is not nil, it means that a Log is received. When field `ChainReorg` is not nil, that means chainreorg is occurred. That represents the log related to epoch greater than or equal to `ChainReog.RevertTo` will become invalid, and the dapp needs to be dealt with at the business level.

## Use middleware to hook rpc request

Client applies method `UseCallRpcMiddleware` to set middleware for hooking `callRpc` method which is the core of all single rpc related methods. And `UseBatchCallRpcMiddleware` to set middleware for hooking `batchCallRPC`.

Expand Down
8 changes: 6 additions & 2 deletions changeLog.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
# Go-conflux-sdk Change Log

## v1.0.11
- Add `blockNumber` to block related methods `cfx_getBlockByHash`, `cfx_getBlockByEpochNumber`, `cfx_getBlockByHashWithPivotAssumption` which need `Conflux-rust v1.1.5` or above.
- Add new RPC method `cfx_getBlockByBlockNubmer`
- Refactor SubscribeLogs for avoiding lossing timing sequence of Chain-Reorg and Log
- Add variadic arguments support for rpc service
## v1.0.10
- Set default rpc request timeout to 30s
- Remove addition error msg in wrappedCallRPC
- Add method getAccountPendingTransactions in client
- Add method GetAccountPendingTransactions in client
## v1.0.9
- Apply middleware for hooking call rpc and batch call rpc
- Support set request rpc timeout in Client
Expand Down
23 changes: 2 additions & 21 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,27 +1019,8 @@ func (client *Client) SubscribeEpochs(channel chan types.WebsocketEpochResponse,
}

// SubscribeLogs subscribes all logs matching a certain filter, in order.
func (client *Client) SubscribeLogs(logChannel chan types.Log, chainReorgChannel chan types.ChainReorg, filter types.LogFilter) (*rpc.ClientSubscription, error) {
channel := make(chan types.SubscriptionLog, 100)
clientSubscrip, err := client.rpcRequester.Subscribe(context.Background(), "cfx", channel, "logs", filter)
if err != nil {
return nil, err
}

go func() {
for {
subscriptionLog, isOpen := <-channel
if !isOpen {
return
}
if subscriptionLog.IsRevertLog() {
chainReorgChannel <- subscriptionLog.ChainReorg
} else {
logChannel <- subscriptionLog.Log
}
}
}()
return clientSubscrip, nil
func (client *Client) SubscribeLogs(channel chan types.SubscriptionLog, filter types.LogFilter) (*rpc.ClientSubscription, error) {
return client.rpcRequester.Subscribe(context.Background(), "cfx", channel, "logs", filter)
}

// === helper methods ===
Expand Down
14 changes: 5 additions & 9 deletions example/example_client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/Conflux-Chain/go-conflux-sdk/middleware"
"github.com/Conflux-Chain/go-conflux-sdk/rpc"
"github.com/Conflux-Chain/go-conflux-sdk/types"
"github.com/Conflux-Chain/go-conflux-sdk/utils"

"github.com/Conflux-Chain/go-conflux-sdk/types/cfxaddress"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand Down Expand Up @@ -495,9 +496,8 @@ func subscribeEpochs() {

func subscribeLogs() {
fmt.Printf("\n- subscribe logs\n")
logChannel := make(chan types.Log, 100)
reorgChannel := make(chan types.ChainReorg, 100)
sub, err := client.SubscribeLogs(logChannel, reorgChannel, types.LogFilter{
channel := make(chan types.SubscriptionLog, 100)
sub, err := client.SubscribeLogs(channel, types.LogFilter{
Topics: [][]types.Hash{{"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"}}})
if err != nil {
fmt.Printf("subscribe log error:%+v\n", err.Error())
Expand Down Expand Up @@ -535,12 +535,8 @@ func subscribeLogs() {
select {
case err = <-errorchan:
fmt.Printf("subscription error:%v\n", err.Error())
// sub.Unsubscribe()
// return
case log := <-logChannel:
fmt.Printf("received %v log:%+v\n\n", i+1, log)
case reorg := <-reorgChannel:
fmt.Printf("received new reorg:%+v\n\n", reorg)
case log := <-channel:
fmt.Printf("received new log:%v\n\n", utils.PrettyJSON(log))
}
}
sub.Unsubscribe()
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca h1:Ld/zXl5t4+D69SiV4JoN7kkfvJdOWlPpfxrzxpLMoUk=
github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca/go.mod h1:u2MKkTVTVJWe5D1rCvame8WqhBd88EuIwODJZ1VHCPM=
github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef h1:wHSqTBrZW24CsNJDfeh9Ex6Pm0Rcpc7qrgKBiL44vF4=
Expand Down Expand Up @@ -285,6 +287,9 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
2 changes: 1 addition & 1 deletion interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type ClientOperator interface {

SubscribeNewHeads(channel chan types.BlockHeader) (*rpc.ClientSubscription, error)
SubscribeEpochs(channel chan types.WebsocketEpochResponse, subscriptionEpochType ...types.Epoch) (*rpc.ClientSubscription, error)
SubscribeLogs(logChannel chan types.Log, chainReorgChannel chan types.ChainReorg, filter types.LogFilter) (*rpc.ClientSubscription, error)
SubscribeLogs(channel chan types.SubscriptionLog, filter types.LogFilter) (*rpc.ClientSubscription, error)

WaitForTransationBePacked(txhash types.Hash, duration time.Duration) (*types.Transaction, error)
WaitForTransationReceipt(txhash types.Hash, duration time.Duration) (*types.TransactionReceipt, error)
Expand Down
2 changes: 1 addition & 1 deletion rpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestServerRegisterName(t *testing.T) {
t.Fatalf("Expected service calc to be registered")
}

wantCallbacks := 8
wantCallbacks := 9
if len(svc.callbacks) != wantCallbacks {
t.Errorf("Expected %d callbacks for service 'service', got %d", wantCallbacks, len(svc.callbacks))
}
Expand Down
140 changes: 103 additions & 37 deletions types/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,68 +58,68 @@ type rlpEncodableLog struct {
TransactionLogIndex *rlpNilableBigInt `rlp:"nil"`
}

// EncodeRLP implements the rlp.Encoder interface.
func (log Log) EncodeRLP(w io.Writer) error {
func (l Log) toRlpEncodable() rlpEncodableLog {
rlog := rlpEncodableLog{
Address: log.Address, Topics: log.Topics, Data: log.Data,
BlockHash: log.BlockHash, TransactionHash: log.TransactionHash,
Address: l.Address, Topics: l.Topics, Data: l.Data,
BlockHash: l.BlockHash, TransactionHash: l.TransactionHash,
}

if log.EpochNumber != nil {
rlog.EpochNumber = &rlpNilableBigInt{log.EpochNumber.ToInt()}
if l.EpochNumber != nil {
rlog.EpochNumber = &rlpNilableBigInt{l.EpochNumber.ToInt()}
}

if log.TransactionIndex != nil {
rlog.TransactionIndex = &rlpNilableBigInt{log.TransactionIndex.ToInt()}
if l.TransactionIndex != nil {
rlog.TransactionIndex = &rlpNilableBigInt{l.TransactionIndex.ToInt()}
}

if log.LogIndex != nil {
rlog.LogIndex = &rlpNilableBigInt{log.LogIndex.ToInt()}
if l.LogIndex != nil {
rlog.LogIndex = &rlpNilableBigInt{l.LogIndex.ToInt()}
}

if log.TransactionLogIndex != nil {
rlog.TransactionLogIndex = &rlpNilableBigInt{log.TransactionLogIndex.ToInt()}
if l.TransactionLogIndex != nil {
rlog.TransactionLogIndex = &rlpNilableBigInt{l.TransactionLogIndex.ToInt()}
}

return rlp.Encode(w, rlog)
return rlog
}

// DecodeRLP implements the rlp.Decoder interface.
func (log *Log) DecodeRLP(r *rlp.Stream) error {
var rlog rlpEncodableLog
if err := r.Decode(&rlog); err != nil {
return err
}
func (r rlpEncodableLog) toNormal() Log {
log := Log{}

log.Address, log.Topics, log.Data = rlog.Address, rlog.Topics, rlog.Data
log.BlockHash, log.TransactionHash = rlog.BlockHash, rlog.TransactionHash
log.Address, log.Topics, log.Data = r.Address, r.Topics, r.Data
log.BlockHash, log.TransactionHash = r.BlockHash, r.TransactionHash

if rlog.EpochNumber != nil {
log.EpochNumber = (*hexutil.Big)(rlog.EpochNumber.Val)
if r.EpochNumber != nil {
log.EpochNumber = (*hexutil.Big)(r.EpochNumber.Val)
}

if rlog.TransactionIndex != nil {
log.TransactionIndex = (*hexutil.Big)(rlog.TransactionIndex.Val)
if r.TransactionIndex != nil {
log.TransactionIndex = (*hexutil.Big)(r.TransactionIndex.Val)
}

if rlog.LogIndex != nil {
log.LogIndex = (*hexutil.Big)(rlog.LogIndex.Val)
if r.LogIndex != nil {
log.LogIndex = (*hexutil.Big)(r.LogIndex.Val)
}

if rlog.TransactionLogIndex != nil {
log.TransactionLogIndex = (*hexutil.Big)(rlog.TransactionLogIndex.Val)
if r.TransactionLogIndex != nil {
log.TransactionLogIndex = (*hexutil.Big)(r.TransactionLogIndex.Val)
}

return nil
return log
}

type SubscriptionLog struct {
Log
ChainReorg
// EncodeRLP implements the rlp.Encoder interface.
func (log Log) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, log.toRlpEncodable())
}

func (s SubscriptionLog) IsRevertLog() bool {
return !reflect.DeepEqual(s.ChainReorg, ChainReorg{})
// DecodeRLP implements the rlp.Decoder interface.
func (log *Log) DecodeRLP(r *rlp.Stream) error {
var rlog rlpEncodableLog
if err := r.Decode(&rlog); err != nil {
return err
}

*log = rlog.toNormal()
return nil
}

// UnmarshalJSON implements the json.Unmarshaler interface.
Expand Down Expand Up @@ -234,3 +234,69 @@ func resolveToHashes(val interface{}) ([]Hash, error) {

return nil, errors.Errorf("failed to convert %v to hash or hashes", val)
}

type SubscriptionLog struct {
*Log
*ChainReorg
}

type rlpEncodableSubscriptionLog struct {
Log *rlpEncodableLog `rlp:"nil"`
ChainReorg *rlpEncodableChainReorg `rlp:"nil"`
}

func (s SubscriptionLog) IsRevertLog() bool {
return s.ChainReorg != nil
// return !reflect.DeepEqual(s.ChainReorg, ChainReorg{})
}

func (s SubscriptionLog) MarshalJSON() ([]byte, error) {
if s.IsRevertLog() {
return json.Marshal(s.ChainReorg)
}
return json.Marshal(s.Log)
}

func (s SubscriptionLog) toRlpEncodable() rlpEncodableSubscriptionLog {
var rlpLog *rlpEncodableLog
if s.Log != nil {
_rlpLog := s.Log.toRlpEncodable()
rlpLog = &_rlpLog
}

var rlpReorg *rlpEncodableChainReorg
if s.ChainReorg != nil {
_rlpReorg := s.ChainReorg.toRlpEncodable()
rlpReorg = &_rlpReorg
}

r := rlpEncodableSubscriptionLog{rlpLog, rlpReorg}
return r
}

func (r rlpEncodableSubscriptionLog) toNormal() SubscriptionLog {
slog := SubscriptionLog{}
if r.Log != nil {
_log := r.Log.toNormal()
slog.Log = &_log
}

if r.ChainReorg != nil {
_reorg := r.ChainReorg.toNormal()
slog.ChainReorg = &_reorg
}
return slog
}

func (s SubscriptionLog) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, s.toRlpEncodable())
}

func (s *SubscriptionLog) DecodeRLP(r *rlp.Stream) error {
rlpSubLog := rlpEncodableSubscriptionLog{}
if err := r.Decode(&rlpSubLog); err != nil {
return err
}
*s = rlpSubLog.toNormal()
return nil
}
Loading

0 comments on commit c37ada4

Please sign in to comment.