diff --git a/oracle/market_mapper.go b/oracle/market_mapper.go index 5ed171551..8e1083a7a 100644 --- a/oracle/market_mapper.go +++ b/oracle/market_mapper.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "os" + "strings" "time" "go.uber.org/zap" @@ -42,8 +43,31 @@ func (o *OracleImpl) listenForMarketMapUpdates(ctx context.Context) { continue } + if o.lastUpdated != 0 && o.lastUpdated == result.Value.LastUpdated { + o.logger.Debug("skipping market map update on no lastUpdated change", zap.Uint64("lastUpdated", o.lastUpdated)) + continue + } + + validSubset, err := result.Value.MarketMap.GetValidSubset() + if err != nil { + o.logger.Error("failed to validate market map", zap.Error(err)) + continue + } + + // Detect removed markets and surface info about the removals + var removedMarkets []string + for t := range result.Value.MarketMap.Markets { + if _, in := validSubset.Markets[t]; !in { + removedMarkets = append(removedMarkets, t) + } + } + if len(validSubset.Markets) == 0 || len(validSubset.Markets) != len(result.Value.MarketMap.Markets) { + o.logger.Warn("invalid market map update has caused some markets to be removed") + o.logger.Info("markets removed from invalid market map", zap.String("markets", strings.Join(removedMarkets, " "))) + } + // Update the oracle with the latest market map iff the market map has changed. - updated := result.Value.MarketMap + updated := validSubset if o.marketMap.Equal(updated) { o.logger.Debug("market map has not changed") continue @@ -55,12 +79,15 @@ func (o *OracleImpl) listenForMarketMapUpdates(ctx context.Context) { continue } + o.lastUpdated = result.Value.GetLastUpdated() + // Write the market map to the configured path. if err := o.WriteMarketMap(); err != nil { o.logger.Error("failed to write market map", zap.Error(err)) } - o.logger.Info("updated oracle with new market map", zap.Any("market_map", updated)) + o.logger.Info("updated oracle with new market map") + o.logger.Debug("updated oracle with new market map", zap.Any("market_map", updated)) } } } diff --git a/oracle/market_mapper_test.go b/oracle/market_mapper_test.go index 506300d87..507e138f7 100644 --- a/oracle/market_mapper_test.go +++ b/oracle/market_mapper_test.go @@ -296,4 +296,46 @@ func TestListenForMarketMapUpdates(t *testing.T) { // Clean up the file. require.NoError(t, os.Remove(path)) }) + t.Run("can update providers with a new market map and handle partially invalid state", func(t *testing.T) { + chains := []mmclienttypes.Chain{{ChainID: "dYdX"}} + handler, factory := marketMapperFactory(t, chains) + handler.On("CreateURL", mock.Anything).Return("", nil).Maybe() + + resolved := make(mmclienttypes.ResolvedMarketMap) + resp := mmtypes.MarketMapResponse{ + MarketMap: partialInvalidMarketMap, + } + resolved[chains[0]] = mmclienttypes.NewMarketMapResult(&resp, time.Now()) + handler.On("ParseResponse", mock.Anything, mock.Anything).Return(mmclienttypes.NewMarketMapResponse(resolved, nil)).Maybe() + + o, err := oracle.New( + oracleCfgWithMockMapper, + noOpPriceAggregator{}, + oracle.WithLogger(logger), + oracle.WithMarketMapperFactory(factory), + oracle.WithPriceAPIQueryHandlerFactory(oraclefactory.APIQueryHandlerFactory), + oracle.WithPriceWebSocketQueryHandlerFactory(oraclefactory.WebSocketQueryHandlerFactory), + ) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + err := o.Start(ctx) + if !errors.Is(err, context.Canceled) { + t.Errorf("Start() should have returned context.Canceled error") + } + }() + + // Wait for the oracle to start. + time.Sleep(5000 * time.Millisecond) + + // The oracle should have been updated. + require.Equal(t, validMarketMapSubset, o.GetMarketMap()) + + // Stop the oracle. + cancel() + o.Stop() + }) } diff --git a/oracle/oracle.go b/oracle/oracle.go index 4a7e1a10e..914c983f5 100644 --- a/oracle/oracle.go +++ b/oracle/oracle.go @@ -54,6 +54,8 @@ type OracleImpl struct { //nolint:revive cfg config.OracleConfig // marketMap is the market map that the oracle is using. marketMap mmtypes.MarketMap + // lastUpdated is the field in the marketmap module tracking the last block at which an update was posted + lastUpdated uint64 // writeTo is a path to write the market map to. writeTo string diff --git a/oracle/update.go b/oracle/update.go index 56b74d224..097a3da56 100644 --- a/oracle/update.go +++ b/oracle/update.go @@ -19,19 +19,14 @@ func (o *OracleImpl) UpdateMarketMap(marketMap mmtypes.MarketMap) error { o.mut.Lock() defer o.mut.Unlock() - validSubset, err := marketMap.GetValidSubset() - if err != nil { + if err := marketMap.ValidateBasic(); err != nil { o.logger.Error("failed to validate market map", zap.Error(err)) return err } - if len(validSubset.Markets) == 0 { - o.logger.Warn("market map update produced no valid markets to fetch") - } - // Iterate over all existing price providers and update their market maps. for name, state := range o.priceProviders { - providerTickers, err := types.ProviderTickersFromMarketMap(name, validSubset) + providerTickers, err := types.ProviderTickersFromMarketMap(name, marketMap) if err != nil { o.logger.Error("failed to create provider market map", zap.String("provider", name), zap.Error(err)) return err @@ -47,7 +42,7 @@ func (o *OracleImpl) UpdateMarketMap(marketMap mmtypes.MarketMap) error { o.priceProviders[name] = updatedState } - o.marketMap = validSubset + o.marketMap = marketMap if o.aggregator != nil { o.aggregator.UpdateMarketMap(o.marketMap) } diff --git a/oracle/update_test.go b/oracle/update_test.go index 3dd182b80..b19990038 100644 --- a/oracle/update_test.go +++ b/oracle/update_test.go @@ -18,7 +18,7 @@ import ( ) func TestUpdateWithMarketMap(t *testing.T) { - t.Run("bad market map is not rejected", func(t *testing.T) { + t.Run("bad market map is rejected", func(t *testing.T) { orc, err := oracle.New( oracleCfg, noOpPriceAggregator{}, @@ -35,7 +35,7 @@ func TestUpdateWithMarketMap(t *testing.T) { "bad": {}, }, }) - require.NoError(t, err) + require.Error(t, err) o.Stop() }) @@ -626,60 +626,4 @@ func TestUpdateProviderState(t *testing.T) { 500*time.Millisecond, ) }) - - t.Run("can update the market map with partial failure on NormalizeBy", func(t *testing.T) { - orc, err := oracle.New( - oracleCfg, - noOpPriceAggregator{}, - oracle.WithLogger(logger), - oracle.WithPriceAPIQueryHandlerFactory(oraclefactory.APIQueryHandlerFactory), - oracle.WithPriceWebSocketQueryHandlerFactory(oraclefactory.WebSocketQueryHandlerFactory), - ) - require.NoError(t, err) - o := orc.(*oracle.OracleImpl) - require.NoError(t, o.Init(context.TODO())) - - providers := o.GetProviderState() - require.Len(t, providers, 3) - - // Update the oracle's market map. - require.NoError(t, o.UpdateMarketMap(partialInvalidMarketMap)) - - providers = o.GetProviderState() - - cbTickers, err := types.ProviderTickersFromMarketMap(coinbase.Name, validMarketMapSubset) - require.NoError(t, err) - - // Check the state after the update. - coinbaseState, ok := providers[coinbase.Name] - require.True(t, ok) - checkProviderState( - t, - cbTickers, - coinbase.Name, - providertypes.API, - false, - coinbaseState, - ) - - okxTickers, err := types.ProviderTickersFromMarketMap(okx.Name, validMarketMapSubset) - require.NoError(t, err) - - okxState, ok := providers[okx.Name] - require.True(t, ok) - checkProviderState( - t, - okxTickers, - okx.Name, - providertypes.WebSockets, - false, - okxState, - ) - - binanceState, ok := providers[binance.Name] - require.True(t, ok) - checkProviderState(t, nil, binance.Name, providertypes.API, false, binanceState) - - o.Stop() - }) } diff --git a/providers/apis/marketmap/fetcher.go b/providers/apis/marketmap/fetcher.go index be54b9cdc..7182c3825 100644 --- a/providers/apis/marketmap/fetcher.go +++ b/providers/apis/marketmap/fetcher.go @@ -122,25 +122,6 @@ func (f *MarketMapFetcher) Fetch( ) } - // Validate the market map response. - // - // TODO: Add checks on the chain ID. - if err := resp.MarketMap.ValidateBasic(); err != nil { - f.logger.Info( - "invalid market map response from module", - zap.Any("market_map", resp.MarketMap), - zap.Error(err), - ) - - return types.NewMarketMapResponseWithErr( - chains, - providertypes.NewErrorWithCode( - fmt.Errorf("invalid market map response: %w", err), - providertypes.ErrorInvalidResponse, - ), - ) - } - resolved := make(types.ResolvedMarketMap) resolved[chains[0]] = types.NewMarketMapResult(resp, time.Now()) diff --git a/providers/apis/marketmap/fetcher_test.go b/providers/apis/marketmap/fetcher_test.go index f13644ca9..094203d49 100644 --- a/providers/apis/marketmap/fetcher_test.go +++ b/providers/apis/marketmap/fetcher_test.go @@ -120,22 +120,28 @@ func TestFetch(t *testing.T) { }, }, { - name: "errors when the market map response is invalid", + name: "does not error when the market map response is invalid", chains: chains[:1], client: func() mmtypes.QueryClient { c := mocks.NewQueryClient(t) c.On("MarketMap", mock.Anything, mock.Anything).Return( &mmtypes.MarketMapResponse{ - MarketMap: badMarketMap, + MarketMap: badMarketMap, + ChainId: chains[0].ChainID, + LastUpdated: 11, }, nil, ) return c }, expected: types.MarketMapResponse{ - UnResolved: types.UnResolvedMarketMap{ - chains[0]: providertypes.UnresolvedResult{ - ErrorWithCode: providertypes.NewErrorWithCode(fmt.Errorf("invalid market map response"), providertypes.ErrorAPIGeneral), + Resolved: types.ResolvedMarketMap{ + chains[0]: types.MarketMapResult{ + Value: &mmtypes.MarketMapResponse{ + MarketMap: badMarketMap, + ChainId: chains[0].ChainID, + LastUpdated: 11, + }, }, }, },