Skip to content

Commit

Permalink
fix TestPusherAppendable
Browse files Browse the repository at this point in the history
  • Loading branch information
ashwanthgoli committed Nov 7, 2023
1 parent 08ed387 commit 94fefd5
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 65 deletions.
21 changes: 5 additions & 16 deletions pkg/ruler/base/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,17 @@ func (a *PusherAppender) Rollback() error {

// PusherAppendable fulfills the storage.Appendable interface for prometheus manager
type PusherAppendable struct {
pusher Pusher
userID string
rulesLimits RulesLimits
pusher Pusher
userID string

totalWrites prometheus.Counter
failedWrites prometheus.Counter
}

func NewPusherAppendable(pusher Pusher, userID string, limits RulesLimits, totalWrites, failedWrites prometheus.Counter) *PusherAppendable {
func NewPusherAppendable(pusher Pusher, userID string, totalWrites, failedWrites prometheus.Counter) *PusherAppendable {
return &PusherAppendable{
pusher: pusher,
userID: userID,
rulesLimits: limits,
totalWrites: totalWrites,
failedWrites: failedWrites,
}
Expand All @@ -129,15 +127,6 @@ type RulesLimits interface {
RulerAlertManagerConfig(userID string) *config.AlertManagerConfig
}

// EngineQueryFunc returns a new query function using the rules.EngineQueryFunc function
// and passing an altered timestamp.
func EngineQueryFunc(engine *promql.Engine, q storage.Queryable, overrides RulesLimits, userID string) rules.QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
orig := rules.EngineQueryFunc(engine, q)
return orig(ctx, qs, t)
}
}

func MetricsQueryFunc(qf rules.QueryFunc, queries, failedQueries prometheus.Counter) rules.QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
queries.Inc()
Expand Down Expand Up @@ -258,9 +247,9 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
}

return rules.NewManager(&rules.ManagerOptions{
Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites),
Appendable: NewPusherAppendable(p, userID, totalWrites, failedWrites),
Queryable: q,
QueryFunc: RecordAndReportRuleQueryMetrics(MetricsQueryFunc(EngineQueryFunc(engine, q, overrides, userID), totalQueries, failedQueries), queryTime, logger),
QueryFunc: RecordAndReportRuleQueryMetrics(MetricsQueryFunc(rules.EngineQueryFunc(engine, q), totalQueries, failedQueries), queryTime, logger),
Context: user.InjectOrgID(ctx, userID),
ExternalURL: cfg.ExternalURL.URL,
NotifyFunc: SendAlerts(notifier, cfg.ExternalURL.URL.String(), cfg.DatasourceUID),
Expand Down
43 changes: 6 additions & 37 deletions pkg/ruler/base/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,72 +34,41 @@ func (p *fakePusher) Push(_ context.Context, r *logproto.WriteRequest) (*logprot

func TestPusherAppendable(t *testing.T) {
pusher := &fakePusher{}
pa := NewPusherAppendable(pusher, "user-1", nil, prometheus.NewCounter(prometheus.CounterOpts{}), prometheus.NewCounter(prometheus.CounterOpts{}))
pa := NewPusherAppendable(pusher, "user-1", prometheus.NewCounter(prometheus.CounterOpts{}), prometheus.NewCounter(prometheus.CounterOpts{}))

for _, tc := range []struct {
name string
series string
evalDelay time.Duration
value float64
expectedTS int64
}{
{
name: "tenant without delay, normal value",
name: "tenant, normal value",
series: "foo_bar",
value: 1.234,
expectedTS: 120_000,
},
{
name: "tenant without delay, stale nan value",
name: "tenant, stale nan value",
series: "foo_bar",
value: math.Float64frombits(value.StaleNaN),
expectedTS: 120_000,
},
{
name: "tenant with delay, normal value",
series: "foo_bar",
value: 1.234,
expectedTS: 120_000,
evalDelay: time.Minute,
},
{
name: "tenant with delay, stale nan value",
value: math.Float64frombits(value.StaleNaN),
expectedTS: 60_000,
evalDelay: time.Minute,
},
{
name: "ALERTS without delay, normal value",
name: "ALERTS, normal value",
series: `ALERTS{alertname="boop"}`,
value: 1.234,
expectedTS: 120_000,
},
{
name: "ALERTS without delay, stale nan value",
name: "ALERTS, stale nan value",
series: `ALERTS{alertname="boop"}`,
value: math.Float64frombits(value.StaleNaN),
expectedTS: 120_000,
},
{
name: "ALERTS with delay, normal value",
series: `ALERTS{alertname="boop"}`,
value: 1.234,
expectedTS: 60_000,
evalDelay: time.Minute,
},
{
name: "ALERTS with delay, stale nan value",
series: `ALERTS_FOR_STATE{alertname="boop"}`,
value: math.Float64frombits(value.StaleNaN),
expectedTS: 60_000,
evalDelay: time.Minute,
},
} {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
pa.rulesLimits = &ruleLimits{
evalDelay: tc.evalDelay,
}

lbls, err := parser.ParseMetric(tc.series)
require.NoError(t, err)
Expand Down Expand Up @@ -154,7 +123,7 @@ func TestPusherErrors(t *testing.T) {
writes := prometheus.NewCounter(prometheus.CounterOpts{})
failures := prometheus.NewCounter(prometheus.CounterOpts{})

pa := NewPusherAppendable(pusher, "user-1", ruleLimits{evalDelay: 10 * time.Second}, writes, failures)
pa := NewPusherAppendable(pusher, "user-1", writes, failures)

lbls, err := parser.ParseMetric("foo_bar")
require.NoError(t, err)
Expand Down
13 changes: 4 additions & 9 deletions pkg/ruler/base/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,12 @@ func defaultRulerConfig(t testing.TB, store rulestore.RuleStore) Config {
}

type ruleLimits struct {
evalDelay time.Duration
tenantShard int
maxRulesPerRuleGroup int
maxRuleGroups int
alertManagerConfig map[string]*config.AlertManagerConfig
}

func (r ruleLimits) EvaluationDelay(_ string) time.Duration {
return r.evalDelay
}

func (r ruleLimits) RulerTenantShardSize(_ string) int {
return r.tenantShard
}
Expand Down Expand Up @@ -144,7 +139,7 @@ func testSetup(t *testing.T, q storage.Querier) (*promql.Engine, storage.Queryab
reg := prometheus.NewRegistry()
queryable := testQueryableFunc(q)

return engine, queryable, pusher, l, ruleLimits{evalDelay: 0, maxRuleGroups: 20, maxRulesPerRuleGroup: 15}, reg
return engine, queryable, pusher, l, ruleLimits{maxRuleGroups: 20, maxRulesPerRuleGroup: 15}, reg
}

func newManager(t *testing.T, cfg Config, q storage.Querier) *DefaultMultiTenantManager {
Expand All @@ -158,7 +153,7 @@ func newManager(t *testing.T, cfg Config, q storage.Querier) *DefaultMultiTenant
func newMultiTenantManager(t *testing.T, cfg Config, q storage.Querier, amConf map[string]*config.AlertManagerConfig) *DefaultMultiTenantManager {
engine, queryable, pusher, logger, _, reg := testSetup(t, q)

overrides := ruleLimits{evalDelay: 0, maxRuleGroups: 20, maxRulesPerRuleGroup: 15}
overrides := ruleLimits{maxRuleGroups: 20, maxRulesPerRuleGroup: 15}
overrides.alertManagerConfig = amConf

manager, err := NewDefaultMultiTenantManager(cfg, DefaultTenantManagerFactory(cfg, pusher, queryable, engine, overrides, nil, constants.Loki), reg, logger, overrides, constants.Loki)
Expand Down Expand Up @@ -476,7 +471,7 @@ func TestGetRules(t *testing.T) {
m := loki_storage.NewClientMetrics()
defer m.Unregister()
r := buildRuler(t, cfg, nil, m, rulerAddrMap)
r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize}
r.limits = ruleLimits{tenantShard: tc.shuffleShardSize}
rulerAddrMap[id] = r
if r.ring != nil {
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r.ring))
Expand Down Expand Up @@ -1420,7 +1415,7 @@ func TestSharding(t *testing.T) {
m := loki_storage.NewClientMetrics()
defer m.Unregister()
r := buildRuler(t, cfg, nil, m, nil)
r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize}
r.limits = ruleLimits{tenantShard: tc.shuffleShardSize}

if forceRing != nil {
r.ring = forceRing
Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type RulesLimits interface {

// queryFunc returns a new query function using the rules.EngineQueryFunc function
// and passing an altered timestamp.
func queryFunc(evaluator Evaluator, overrides RulesLimits, checker readyChecker, userID string, logger log.Logger) rules.QueryFunc {
func queryFunc(evaluator Evaluator, checker readyChecker, userID string, logger log.Logger) rules.QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
hash := logql.HashedQuery(qs)
detail := rules.FromOriginContext(ctx)
Expand Down Expand Up @@ -144,7 +144,7 @@ func MultiTenantRuleManager(cfg Config, evaluator Evaluator, overrides RulesLimi
registry.configureTenantStorage(userID)

logger = log.With(logger, "user", userID)
queryFn := queryFunc(evaluator, overrides, registry, userID, logger)
queryFn := queryFunc(evaluator, registry, userID, logger)
memStore := NewMemStore(userID, queryFn, newMemstoreMetrics(reg), 5*time.Minute, log.With(logger, "subcomponent", "MemStore"))

// GroupLoader builds a cache of the rules as they're loaded by the
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestNonMetricQuery(t *testing.T) {
eval, err := NewLocalEvaluator(engine, log)
require.NoError(t, err)

queryFunc := queryFunc(eval, overrides, fakeChecker{}, "fake", log)
queryFunc := queryFunc(eval, fakeChecker{}, "fake", log)

_, err = queryFunc(context.TODO(), `{job="nginx"}`, time.Now())
require.Error(t, err, "rule result is not a vector or scalar")
Expand Down

0 comments on commit 94fefd5

Please sign in to comment.