Skip to content

Commit

Permalink
chore: some code refactor and docs change (#812)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Jun 28, 2023
1 parent 10cec59 commit de54931
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 37 deletions.
File renamed without changes.
6 changes: 3 additions & 3 deletions docs/user-guide/reference/kustomize/kustomize.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ apiVersion: kustomize.config.k8s.io/v1beta1
configurations:
- numaflow-transformer-config.yaml
# Or reference the remote configuration directly.
# - https://raw.githubusercontent.com/numaproj/numaflow/main/docs/user-guide/kustomize/numaflow-transformer-config.yaml
# - https://raw.githubusercontent.com/numaproj/numaflow/main/docs/user-guide/reference/kustomize/numaflow-transformer-config.yaml
```

Here is an [example](https://github.com/numaproj/numaflow/blob/main/docs/user-guide/kustomize/examples/transformer) to use transformers with a Pipeline.
Here is an [example](https://github.com/numaproj/numaflow/tree/main/docs/user-guide/reference/kustomize/examples/transformer) to use transformers with a Pipeline.

## Patch

Expand Down Expand Up @@ -84,4 +84,4 @@ patchesStrategicMerge:
rpu: 500
```

See the full example [here](https://github.com/numaproj/numaflow/blob/main/docs/user-guide/kustomize/examples/patch).
See the full example [here](https://github.com/numaproj/numaflow/tree/main/docs/user-guide/reference/kustomize/examples/patch).
3 changes: 2 additions & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ nav:
- Overview: "user-guide/sources/transformer/builtin-transformers/README.md"
- Filter: "user-guide/sources/transformer/builtin-transformers/filter.md"
- Event Time Extractor: "user-guide/sources/transformer/builtin-transformers/event-time-extractor.md"
- Event Time Extraction Filter: "user-guide/sources/transformer/builtin-transformers/time-extraction-filter.md"
- Sinks:
- Overview: "user-guide/sinks/overview.md"
- user-guide/sinks/kafka.md
Expand Down Expand Up @@ -91,14 +92,14 @@ nav:
- user-guide/reference/configuration/init-containers.md
- user-guide/reference/configuration/sidecar-containers.md
- user-guide/reference/configuration/pipeline-customization.md
- user-guide/reference/configuration/access-path.md
- user-guide/reference/kustomize/kustomize.md
- APIs.md
- Operator Manual:
- Releases ⧉: "operations/releases.md"
- operations/installation.md
- Configuration:
- Controller Configuration: "operations/controller-configmap.md"
- UI Server Access Path: "operations/ui-access-path.md"
- operations/metrics/metrics.md
- operations/grafana.md
- Contributor Guide:
Expand Down
2 changes: 1 addition & 1 deletion pkg/udferr/error.go → pkg/sdkclient/error/error.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package udferr
package error

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package udferr
package error

import (
"fmt"
Expand Down
6 changes: 3 additions & 3 deletions pkg/sdkclient/udf/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package client
import (
"context"
"fmt"
"github.com/numaproj/numaflow/pkg/udferr"
"io"
"log"
"strconv"
Expand All @@ -20,6 +19,7 @@ import (
functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1"
"github.com/numaproj/numaflow-go/pkg/function"
"github.com/numaproj/numaflow-go/pkg/info"
sdkerr "github.com/numaproj/numaflow/pkg/sdkclient/error"
)

// client contains the grpc connection and the grpc client.
Expand Down Expand Up @@ -217,7 +217,7 @@ func toUDFErr(name string, err error) error {
}
statusCode, ok := status.FromError(err)
// default udfError
udfError := udferr.New(udferr.NonRetryable, statusCode.Message())
udfError := sdkerr.New(sdkerr.NonRetryable, statusCode.Message())
// check if it's a standard status code
if !ok {
// if not, the status code will be unknown which we consider as non retryable
Expand All @@ -230,7 +230,7 @@ func toUDFErr(name string, err error) error {
return nil
case codes.DeadlineExceeded, codes.Unavailable, codes.Unknown:
// update to retryable err
udfError = udferr.New(udferr.Retryable, statusCode.Message())
udfError = sdkerr.New(sdkerr.Retryable, statusCode.Message())
log.Printf("failed %s: %s", name, udfError.Error())
return udfError
default:
Expand Down
7 changes: 4 additions & 3 deletions pkg/sdkclient/udf/clienttest/clienttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package clienttest
import (
"context"
"fmt"
"github.com/numaproj/numaflow/pkg/udferr"

"io"
"log"

Expand All @@ -14,6 +14,7 @@ import (

functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1"
"github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1/funcmock"
sdkerr "github.com/numaproj/numaflow/pkg/sdkclient/error"
)

// client contains the grpc client for testing.
Expand Down Expand Up @@ -151,7 +152,7 @@ func toUDFErr(name string, err error) error {
}
statusCode, ok := status.FromError(err)
// default udfError
udfError := udferr.New(udferr.NonRetryable, statusCode.Message())
udfError := sdkerr.New(sdkerr.NonRetryable, statusCode.Message())
// check if it's a standard status code
if !ok {
// if not, the status code will be unknown which we consider as non retryable
Expand All @@ -164,7 +165,7 @@ func toUDFErr(name string, err error) error {
return nil
case codes.DeadlineExceeded, codes.Unavailable, codes.Unknown:
// update to retryable err
udfError = udferr.New(udferr.Retryable, statusCode.Message())
udfError = sdkerr.New(sdkerr.Retryable, statusCode.Message())
log.Printf("failed %s: %s", name, udfError.Error())
return udfError
default:
Expand Down
24 changes: 12 additions & 12 deletions pkg/sources/transformer/grpc_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ package transformer
import (
"context"
"fmt"
"github.com/numaproj/numaflow/pkg/sdkclient/udf/client"
"time"

"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
"k8s.io/apimachinery/pkg/util/wait"

functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1"
"github.com/numaproj/numaflow/pkg/forward/applier"
"github.com/numaproj/numaflow/pkg/isb"
sdkerr "github.com/numaproj/numaflow/pkg/sdkclient/error"
"github.com/numaproj/numaflow/pkg/sdkclient/udf/client"
"github.com/numaproj/numaflow/pkg/udf/function"
"github.com/numaproj/numaflow/pkg/udferr"
"k8s.io/apimachinery/pkg/util/wait"

"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

// gRPCBasedTransformer applies user defined transformer over gRPC (over Unix Domain Socket) client/server where server is the transformer.
Expand Down Expand Up @@ -93,9 +93,9 @@ func (u *gRPCBasedTransformer) ApplyMap(ctx context.Context, readMessage *isb.Re

datumList, err := u.client.MapTFn(ctx, d)
if err != nil {
udfErr, _ := udferr.FromError(err)
udfErr, _ := sdkerr.FromError(err)
switch udfErr.ErrorKind() {
case udferr.Retryable:
case sdkerr.Retryable:
var success bool
_ = wait.ExponentialBackoffWithContext(ctx, wait.Backoff{
// retry every "duration * factor + [0, jitter]" interval for 5 times
Expand All @@ -106,11 +106,11 @@ func (u *gRPCBasedTransformer) ApplyMap(ctx context.Context, readMessage *isb.Re
}, func() (done bool, err error) {
datumList, err = u.client.MapTFn(ctx, d)
if err != nil {
udfErr, _ = udferr.FromError(err)
udfErr, _ = sdkerr.FromError(err)
switch udfErr.ErrorKind() {
case udferr.Retryable:
case sdkerr.Retryable:
return false, nil
case udferr.NonRetryable:
case sdkerr.NonRetryable:
return true, nil
default:
return true, nil
Expand All @@ -129,7 +129,7 @@ func (u *gRPCBasedTransformer) ApplyMap(ctx context.Context, readMessage *isb.Re
},
}
}
case udferr.NonRetryable:
case sdkerr.NonRetryable:
return nil, function.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.MapFn failed, %s", err),
Expand Down
26 changes: 13 additions & 13 deletions pkg/udf/function/uds_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,24 @@ package function
import (
"context"
"fmt"
clientsdk "github.com/numaproj/numaflow/pkg/sdkclient/udf/client"
"strconv"
"sync"
"time"

functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1"
functionsdk "github.com/numaproj/numaflow-go/pkg/function"
"github.com/numaproj/numaflow/pkg/udferr"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
"k8s.io/apimachinery/pkg/util/wait"

functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1"
functionsdk "github.com/numaproj/numaflow-go/pkg/function"
map_applier "github.com/numaproj/numaflow/pkg/forward/applier"
"github.com/numaproj/numaflow/pkg/isb"
reduce_applier "github.com/numaproj/numaflow/pkg/reduce/applier"
"github.com/numaproj/numaflow/pkg/reduce/pbq/partition"
sdkerr "github.com/numaproj/numaflow/pkg/sdkclient/error"
clientsdk "github.com/numaproj/numaflow/pkg/sdkclient/udf/client"
)

// UDSgRPCBasedUDF applies user defined function over gRPC (over Unix Domain Socket) client/server where server is the UDF.
Expand Down Expand Up @@ -101,9 +101,9 @@ func (u *UDSgRPCBasedUDF) ApplyMap(ctx context.Context, readMessage *isb.ReadMes

datumList, err := u.client.MapFn(ctx, d)
if err != nil {
udfErr, _ := udferr.FromError(err)
udfErr, _ := sdkerr.FromError(err)
switch udfErr.ErrorKind() {
case udferr.Retryable:
case sdkerr.Retryable:
var success bool
_ = wait.ExponentialBackoffWithContext(ctx, wait.Backoff{
// retry every "duration * factor + [0, jitter]" interval for 5 times
Expand All @@ -114,11 +114,11 @@ func (u *UDSgRPCBasedUDF) ApplyMap(ctx context.Context, readMessage *isb.ReadMes
}, func() (done bool, err error) {
datumList, err = u.client.MapFn(ctx, d)
if err != nil {
udfErr, _ = udferr.FromError(err)
udfErr, _ = sdkerr.FromError(err)
switch udfErr.ErrorKind() {
case udferr.Retryable:
case sdkerr.Retryable:
return false, nil
case udferr.NonRetryable:
case sdkerr.NonRetryable:
return true, nil
default:
return true, nil
Expand All @@ -137,7 +137,7 @@ func (u *UDSgRPCBasedUDF) ApplyMap(ctx context.Context, readMessage *isb.ReadMes
},
}
}
case udferr.NonRetryable:
case sdkerr.NonRetryable:
return nil, ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.MapFn failed, %s", err),
Expand Down Expand Up @@ -294,9 +294,9 @@ readLoop:
if err != nil {
// if any error happens in reduce
// will exit and restart the numa container
udfErr, _ := udferr.FromError(err)
udfErr, _ := sdkerr.FromError(err)
switch udfErr.ErrorKind() {
case udferr.Retryable:
case sdkerr.Retryable:
// TODO: currently we don't handle retryable errors for reduce
return nil, ApplyUDFErr{
UserUDFErr: false,
Expand All @@ -306,7 +306,7 @@ readLoop:
MainCarDown: false,
},
}
case udferr.NonRetryable:
case sdkerr.NonRetryable:
return nil, ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
Expand Down

0 comments on commit de54931

Please sign in to comment.