diff --git a/openmeter/entitlement/adapter/entitlement.go b/openmeter/entitlement/adapter/entitlement.go index 613bf6781..2e1cfd018 100644 --- a/openmeter/entitlement/adapter/entitlement.go +++ b/openmeter/entitlement/adapter/entitlement.go @@ -191,6 +191,8 @@ func (a *entitlementDBAdapter) DeactivateEntitlement(ctx context.Context, entitl return err } +// TODO[OM-1009]: This returns all the entitlements even the expired ones, for billing we would need to have a range for +// the batch ingested events. Let's narrow down the list of entitlements active during that period. func (a *entitlementDBAdapter) ListAffectedEntitlements(ctx context.Context, eventFilters []balanceworker.IngestEventQueryFilter) ([]balanceworker.IngestEventDataResponse, error) { return entutils.TransactingRepo[[]balanceworker.IngestEventDataResponse, *entitlementDBAdapter]( ctx, diff --git a/openmeter/entitlement/balanceworker/entitlementhandler.go b/openmeter/entitlement/balanceworker/entitlementhandler.go index 00682c4a9..751ae27c0 100644 --- a/openmeter/entitlement/balanceworker/entitlementhandler.go +++ b/openmeter/entitlement/balanceworker/entitlementhandler.go @@ -53,7 +53,13 @@ func (w *Worker) processEntitlementEntity(ctx context.Context, entitlementEntity return nil, fmt.Errorf("entitlement entity is nil") } - if entitlementEntity.DeletedAt != nil { + if entitlementEntity.ActiveFrom != nil && entitlementEntity.ActiveFrom.After(calculatedAt) { + // Not yet active entitlement we don't need to process it yet + return nil, nil + } + + if entitlementEntity.DeletedAt != nil || + (entitlementEntity.ActiveTo != nil && entitlementEntity.ActiveTo.Before(calculatedAt)) { // entitlement got deleted while processing changes => let's create a delete event so that we are not working snapshot, err := w.createDeletedSnapshotEvent(ctx, diff --git a/test/entitlement/regression/scenario_test.go b/test/entitlement/regression/scenario_test.go index 68b23d2d7..dff5899f9 100644 --- a/test/entitlement/regression/scenario_test.go +++ b/test/entitlement/regression/scenario_test.go @@ -5,12 +5,15 @@ import ( "testing" "time" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/openmeterio/openmeter/openmeter/credit" "github.com/openmeterio/openmeter/openmeter/credit/balance" "github.com/openmeterio/openmeter/openmeter/credit/grant" "github.com/openmeterio/openmeter/openmeter/entitlement" + "github.com/openmeterio/openmeter/openmeter/entitlement/balanceworker" + entitlementdriver "github.com/openmeterio/openmeter/openmeter/entitlement/driver" meteredentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/metered" "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" "github.com/openmeterio/openmeter/openmeter/testutils" @@ -574,3 +577,89 @@ func TestGrantingAfterOverage(t *testing.T) { assert.Equal(0.0, currentBalance.Overage) assert.Equal(2000.0, currentBalance.UsageInPeriod) } + +func TestBalanceWorkerActiveToFromEntitlementsMapping(t *testing.T) { + defer clock.ResetTime() + deps := setupDependencies(t) + defer deps.Close() + ctx := context.Background() + assert := assert.New(t) + + // Let's create a feature + clock.SetTime(testutils.GetRFC3339Time(t, "2024-07-07T14:44:19Z")) + feature, err := deps.FeatureConnector.CreateFeature(ctx, feature.CreateFeatureInputs{ + Name: "feature-1", + Key: "feature-1", + Namespace: "namespace-1", + MeterSlug: convert.ToPointer("meter-1"), + }) + assert.NoError(err) + assert.NotNil(feature) + + // Let's create a new entitlement for the feature + clock.SetTime(testutils.GetRFC3339Time(t, "2024-08-22T11:25:00Z")) + ent1, err := deps.EntitlementConnector.ScheduleEntitlement(ctx, entitlement.CreateEntitlementInputs{ + Namespace: "namespace-1", + FeatureID: &feature.ID, + FeatureKey: &feature.Key, + SubjectKey: "subject-1", + EntitlementType: entitlement.EntitlementTypeMetered, + UsagePeriod: &entitlement.UsagePeriod{ + Interval: recurrence.RecurrencePeriodMonth, + Anchor: testutils.GetRFC3339Time(t, "2024-08-22T11:25:00Z"), + }, + ActiveFrom: lo.ToPtr(testutils.GetRFC3339Time(t, "2024-08-22T11:25:00Z")), + ActiveTo: lo.ToPtr(testutils.GetRFC3339Time(t, "2024-08-22T11:30:00Z")), + }) + assert.NoError(err) + assert.NotNil(ent1) + + ent2, err := deps.EntitlementConnector.ScheduleEntitlement(ctx, entitlement.CreateEntitlementInputs{ + Namespace: "namespace-1", + FeatureID: &feature.ID, + FeatureKey: &feature.Key, + SubjectKey: "subject-1", + EntitlementType: entitlement.EntitlementTypeMetered, + UsagePeriod: &entitlement.UsagePeriod{ + Interval: recurrence.RecurrencePeriodMonth, + Anchor: testutils.GetRFC3339Time(t, "2024-08-22T11:25:00Z"), + }, + ActiveFrom: lo.ToPtr(testutils.GetRFC3339Time(t, "2024-08-22T11:30:00Z")), + ActiveTo: lo.ToPtr(testutils.GetRFC3339Time(t, "2024-08-22T11:35:00Z")), + }) + assert.NoError(err) + assert.NotNil(ent2) + + // Lets grant some credit for 500 + clock.SetTime(testutils.GetRFC3339Time(t, "2024-08-22T11:35:18Z")) + bwRepo, ok := deps.EntitlementRepo.(balanceworker.BalanceWorkerRepository) + assert.True(ok) + + affectedEntitlements, err := bwRepo.ListAffectedEntitlements(ctx, []balanceworker.IngestEventQueryFilter{ + { + Namespace: "namespace-1", + MeterSlugs: []string{"meter-1"}, + SubjectKey: "subject-1", + }, + }) + assert.NoError(err) + assert.Len(affectedEntitlements, 2) + + entitlements, err := deps.EntitlementConnector.ListEntitlements(ctx, entitlement.ListEntitlementsParams{ + Namespaces: []string{affectedEntitlements[0].Namespace}, + IDs: []string{affectedEntitlements[0].EntitlementID}, + IncludeDeleted: true, + }) + assert.NoError(err) + assert.Len(entitlements.Items, 1) + + ns := affectedEntitlements[0].Namespace + entID := affectedEntitlements[0].EntitlementID + + value, err := deps.EntitlementConnector.GetEntitlementValue(ctx, ns, "subject-1", entID, clock.Now()) + assert.NoError(err) + + mappedValues, err := entitlementdriver.MapEntitlementValueToAPI(value) + assert.NoError(err) + assert.False(mappedValues.HasAccess) +}