From 1f19a7421d6194aa25ed9fa2d3a2327ba6bbf449 Mon Sep 17 00:00:00 2001 From: Derek Wang Date: Fri, 7 Jul 2023 14:46:38 -0700 Subject: [PATCH] feat: capability to increase max message size (#835) Signed-off-by: Derek Wang --- api/json-schema/schema.json | 2 +- api/openapi-spec/swagger.json | 2 +- .../namespaced-controller-wo-crds.yaml | 4 +++ .../numaflow-controller-config.yaml | 4 +++ config/install.yaml | 4 +++ config/namespace-install.yaml | 4 +++ docs/APIs.md | 11 +++++--- .../inter-step-buffer-service.md | 7 +++++- .../numaflow-controller-config.yaml | 4 +++ .../configuration/max-message-size.md | 25 +++++++++++++++++++ mkdocs.yml | 1 + pkg/apis/numaflow/v1alpha1/generated.proto | 7 +++--- .../v1alpha1/jetstream_buffer_service.go | 7 +++--- .../numaflow/v1alpha1/openapi_generated.go | 2 +- .../installer/assets/jetstream/nats.conf | 6 ++++- pkg/reconciler/isbsvc/installer/jetstream.go | 23 ++++++++++++----- pkg/sdkclient/sink/client/client.go | 7 ++++-- pkg/sdkclient/sink/client/options.go | 8 ++++++ pkg/sdkclient/udf/client/client.go | 2 +- pkg/udf/builtin/builtin.go | 2 +- 20 files changed, 108 insertions(+), 24 deletions(-) create mode 100644 docs/user-guide/reference/configuration/max-message-size.md diff --git a/api/json-schema/schema.json b/api/json-schema/schema.json index ddfdcfccef..2486113b9d 100644 --- a/api/json-schema/schema.json +++ b/api/json-schema/schema.json @@ -18298,7 +18298,7 @@ "type": "string" }, "settings": { - "description": "JetStream configuration, if not specified, global settings in numaflow-controller-config will be used. See https://docs.nats.io/running-a-nats-service/configuration#jetstream. Only configure \"max_memory_store\" or \"max_file_store\", do not set \"store_dir\" as it has been hardcoded.", + "description": "Nats/JetStream configuration, if not specified, global settings in numaflow-controller-config will be used. See https://docs.nats.io/running-a-nats-service/configuration#limits and https://docs.nats.io/running-a-nats-service/configuration#jetstream. For limits, only \"max_payload\" is supported for configuration, defaults to 1048576 (1MB), not recommended to use values over 8388608 (8MB) but max_payload can be set up to 67108864 (64MB). For jetstream, only \"max_memory_store\" and \"max_file_store\" are supported for configuration, do not set \"store_dir\" as it has been hardcoded.", "type": "string" }, "startArgs": { diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index bbcae2de65..d921cbe2f9 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -18294,7 +18294,7 @@ "type": "string" }, "settings": { - "description": "JetStream configuration, if not specified, global settings in numaflow-controller-config will be used. See https://docs.nats.io/running-a-nats-service/configuration#jetstream. Only configure \"max_memory_store\" or \"max_file_store\", do not set \"store_dir\" as it has been hardcoded.", + "description": "Nats/JetStream configuration, if not specified, global settings in numaflow-controller-config will be used. See https://docs.nats.io/running-a-nats-service/configuration#limits and https://docs.nats.io/running-a-nats-service/configuration#jetstream. For limits, only \"max_payload\" is supported for configuration, defaults to 1048576 (1MB), not recommended to use values over 8388608 (8MB) but max_payload can be set up to 67108864 (64MB). For jetstream, only \"max_memory_store\" and \"max_file_store\" are supported for configuration, do not set \"store_dir\" as it has been hardcoded.", "type": "string" }, "startArgs": { diff --git a/config/advanced-install/namespaced-controller-wo-crds.yaml b/config/advanced-install/namespaced-controller-wo-crds.yaml index a7a0ff0530..350a385ce1 100644 --- a/config/advanced-install/namespaced-controller-wo-crds.yaml +++ b/config/advanced-install/namespaced-controller-wo-crds.yaml @@ -150,6 +150,10 @@ data: jetstream: # Default JetStream settings, could be overridden by InterStepBufferService specs settings: | + # https://docs.nats.io/running-a-nats-service/configuration#limits + # Only support to configure "max_payload". + # Max payload size in bytes, defaults to 1 MB. It is not recommended to use values over 8MB but max_payload can be set up to 64MB. + max_payload: 1048576 # https://docs.nats.io/running-a-nats-service/configuration#jetstream # Only configure "max_memory_store" or "max_file_store", do not set "store_dir" as it has been hardcoded. # e.g. 1G. -1 means no limit, up to 75% of available memory. This only take effect for streams created using memory storage. diff --git a/config/base/controller-manager/numaflow-controller-config.yaml b/config/base/controller-manager/numaflow-controller-config.yaml index c1661131b3..4182a1e228 100644 --- a/config/base/controller-manager/numaflow-controller-config.yaml +++ b/config/base/controller-manager/numaflow-controller-config.yaml @@ -37,6 +37,10 @@ data: jetstream: # Default JetStream settings, could be overridden by InterStepBufferService specs settings: | + # https://docs.nats.io/running-a-nats-service/configuration#limits + # Only support to configure "max_payload". + # Max payload size in bytes, defaults to 1 MB. It is not recommended to use values over 8MB but max_payload can be set up to 64MB. + max_payload: 1048576 # https://docs.nats.io/running-a-nats-service/configuration#jetstream # Only configure "max_memory_store" or "max_file_store", do not set "store_dir" as it has been hardcoded. # e.g. 1G. -1 means no limit, up to 75% of available memory. This only take effect for streams created using memory storage. diff --git a/config/install.yaml b/config/install.yaml index 7bcfc182bb..367d631ca2 100644 --- a/config/install.yaml +++ b/config/install.yaml @@ -12621,6 +12621,10 @@ data: jetstream: # Default JetStream settings, could be overridden by InterStepBufferService specs settings: | + # https://docs.nats.io/running-a-nats-service/configuration#limits + # Only support to configure "max_payload". + # Max payload size in bytes, defaults to 1 MB. It is not recommended to use values over 8MB but max_payload can be set up to 64MB. + max_payload: 1048576 # https://docs.nats.io/running-a-nats-service/configuration#jetstream # Only configure "max_memory_store" or "max_file_store", do not set "store_dir" as it has been hardcoded. # e.g. 1G. -1 means no limit, up to 75% of available memory. This only take effect for streams created using memory storage. diff --git a/config/namespace-install.yaml b/config/namespace-install.yaml index 42f014375b..705b2054b7 100644 --- a/config/namespace-install.yaml +++ b/config/namespace-install.yaml @@ -12532,6 +12532,10 @@ data: jetstream: # Default JetStream settings, could be overridden by InterStepBufferService specs settings: | + # https://docs.nats.io/running-a-nats-service/configuration#limits + # Only support to configure "max_payload". + # Max payload size in bytes, defaults to 1 MB. It is not recommended to use values over 8MB but max_payload can be set up to 64MB. + max_payload: 1048576 # https://docs.nats.io/running-a-nats-service/configuration#jetstream # Only configure "max_memory_store" or "max_file_store", do not set "store_dir" as it has been hardcoded. # e.g. 1G. -1 means no limit, up to 75% of available memory. This only take effect for streams created using memory storage. diff --git a/docs/APIs.md b/docs/APIs.md index 8d82f8daf3..34a5354671 100644 --- a/docs/APIs.md +++ b/docs/APIs.md @@ -2166,11 +2166,16 @@ type.) (Optional)

-JetStream configuration, if not specified, global settings in +Nats/JetStream configuration, if not specified, global settings in numaflow-controller-config will be used. See +https://docs.nats.io/running-a-nats-service/configuration#limits +and https://docs.nats.io/running-a-nats-service/configuration#jetstream. -Only configure “max_memory_store” or “max_file_store”, do not set -“store_dir” as it has been hardcoded. +For limits, only “max_payload” is supported for configuration, defaults +to 1048576 (1MB), not recommended to use values over 8388608 (8MB) but +max_payload can be set up to 67108864 (64MB). For jetstream, only +“max_memory_store” and “max_file_store” are supported for configuration, +do not set “store_dir” as it has been hardcoded.

diff --git a/docs/core-concepts/inter-step-buffer-service.md b/docs/core-concepts/inter-step-buffer-service.md index a223e1dfb6..b698066fdb 100644 --- a/docs/core-concepts/inter-step-buffer-service.md +++ b/docs/core-concepts/inter-step-buffer-service.md @@ -81,8 +81,13 @@ There are 2 places to configure JetStream settings: A sample JetStream configuration: ``` +# https://docs.nats.io/running-a-nats-service/configuration#limits +# Only "max_payload" is supported for configuration in this section. +# Max payload size in bytes, defaults to 1 MB. It is not recommended to use values over 8MB but max_payload can be set up to 64MB. +max_payload: 1048576 +# # https://docs.nats.io/running-a-nats-service/configuration#jetstream -# Only configure "max_memory_store" or "max_file_store", do not set "store_dir" as it has been hardcoded. +# Only configure "max_memory_store" or "max_file_store" in this section, do not set "store_dir" as it has been hardcoded. # # e.g. 1G. -1 means no limit, up to 75% of available memory. This only take effect for streams created using memory storage. max_memory_store: -1 diff --git a/docs/operations/numaflow-controller-config.yaml b/docs/operations/numaflow-controller-config.yaml index a970a95536..2b26280232 100644 --- a/docs/operations/numaflow-controller-config.yaml +++ b/docs/operations/numaflow-controller-config.yaml @@ -38,6 +38,10 @@ data: jetstream: # Default JetStream settings, could be overridden by InterStepBufferService specs settings: | + # https://docs.nats.io/running-a-nats-service/configuration#limits + # Only support to configure "max_payload". + # Max payload size, defaults to 1 MB. It is not recommended to use values over 8MB but max_payload can be set up to 64MB. + max_payload: 1048576 # https://docs.nats.io/running-a-nats-service/configuration#jetstream # Only configure "max_memory_store" or "max_file_store", do not set "store_dir" as it has been hardcoded. # e.g. 1G. -1 means no limit, up to 75% of available memory. This only take effect for streams created using memory storage. diff --git a/docs/user-guide/reference/configuration/max-message-size.md b/docs/user-guide/reference/configuration/max-message-size.md new file mode 100644 index 0000000000..3ff86624a8 --- /dev/null +++ b/docs/user-guide/reference/configuration/max-message-size.md @@ -0,0 +1,25 @@ +# Maximum Message Size + +The default maximum message size is `1MB`. There's a way to increase this limit in case you want to, but please think it through before doing so. + +The max message size is determined by: + +- Max messages size supported by gRPC (default value is `64MB` in Numaflow). +- Max messages size supported by the Inter-Step Buffer implementation. + +If `JetStream` is used as the Inter-Step Buffer implementation, the default max message size for it is configured as `1MB`. You can change it by setting the `spec.jetstream.settings` in the `InterStepBufferService` specification. + +```yaml +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: InterStepBufferService +metadata: + name: default +spec: + jetstream: + settings: | + max_payload: 8388608 # 8MB +``` + +It's not recommended to use values over `8388608` (8MB) but `max_payload` can be set up to `67108864` (64MB). + +Please be aware that if you increase the max message size of the `InterStepBufferService`, you probably will also need to change some other limits. For example, if the size of each messages is as large as 8MB, then 100 messages flowing in the pipeline will make each of the Inter-Step Buffer need at least 800MB of disk space to store the messages, and the memory consumption will also be high, that will probably cause the Inter-Step Buffer Service to crash. In that case, you might need to update the retention policy in the Inter-Step Buffer Service to make sure the messages are not stored for too long. Check out the [Inter-Step Buffer Service](../../../core-concepts/inter-step-buffer-service.md#buffer-configuration) for more details. diff --git a/mkdocs.yml b/mkdocs.yml index e1f39d949f..ffeba33daf 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -92,6 +92,7 @@ nav: - user-guide/reference/configuration/init-containers.md - user-guide/reference/configuration/sidecar-containers.md - user-guide/reference/configuration/pipeline-customization.md + - user-guide/reference/configuration/max-message-size.md - user-guide/reference/kustomize/kustomize.md - APIs.md - Operator Manual: diff --git a/pkg/apis/numaflow/v1alpha1/generated.proto b/pkg/apis/numaflow/v1alpha1/generated.proto index 674cf6a636..9722887033 100644 --- a/pkg/apis/numaflow/v1alpha1/generated.proto +++ b/pkg/apis/numaflow/v1alpha1/generated.proto @@ -570,9 +570,10 @@ message JetStreamBufferService { // +optional optional AbstractPodTemplate abstractPodTemplate = 7; - // JetStream configuration, if not specified, global settings in numaflow-controller-config will be used. - // See https://docs.nats.io/running-a-nats-service/configuration#jetstream. - // Only configure "max_memory_store" or "max_file_store", do not set "store_dir" as it has been hardcoded. + // Nats/JetStream configuration, if not specified, global settings in numaflow-controller-config will be used. + // See https://docs.nats.io/running-a-nats-service/configuration#limits and https://docs.nats.io/running-a-nats-service/configuration#jetstream. + // For limits, only "max_payload" is supported for configuration, defaults to 1048576 (1MB), not recommended to use values over 8388608 (8MB) but max_payload can be set up to 67108864 (64MB). + // For jetstream, only "max_memory_store" and "max_file_store" are supported for configuration, do not set "store_dir" as it has been hardcoded. // +optional optional string settings = 8; diff --git a/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go b/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go index eb278a9068..2bcd235f9c 100644 --- a/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go +++ b/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go @@ -46,9 +46,10 @@ type JetStreamBufferService struct { Persistence *PersistenceStrategy `json:"persistence,omitempty" protobuf:"bytes,6,opt,name=persistence"` // +optional AbstractPodTemplate `json:",inline" protobuf:"bytes,7,opt,name=abstractPodTemplate"` - // JetStream configuration, if not specified, global settings in numaflow-controller-config will be used. - // See https://docs.nats.io/running-a-nats-service/configuration#jetstream. - // Only configure "max_memory_store" or "max_file_store", do not set "store_dir" as it has been hardcoded. + // Nats/JetStream configuration, if not specified, global settings in numaflow-controller-config will be used. + // See https://docs.nats.io/running-a-nats-service/configuration#limits and https://docs.nats.io/running-a-nats-service/configuration#jetstream. + // For limits, only "max_payload" is supported for configuration, defaults to 1048576 (1MB), not recommended to use values over 8388608 (8MB) but max_payload can be set up to 67108864 (64MB). + // For jetstream, only "max_memory_store" and "max_file_store" are supported for configuration, do not set "store_dir" as it has been hardcoded. // +optional Settings *string `json:"settings,omitempty" protobuf:"bytes,8,opt,name=settings"` // Optional arguments to start nats-server. For example, "-D" to enable debugging output, "-DV" to enable debugging and tracing. diff --git a/pkg/apis/numaflow/v1alpha1/openapi_generated.go b/pkg/apis/numaflow/v1alpha1/openapi_generated.go index 4ff3e76625..8183c12453 100644 --- a/pkg/apis/numaflow/v1alpha1/openapi_generated.go +++ b/pkg/apis/numaflow/v1alpha1/openapi_generated.go @@ -2032,7 +2032,7 @@ func schema_pkg_apis_numaflow_v1alpha1_JetStreamBufferService(ref common.Referen }, "settings": { SchemaProps: spec.SchemaProps{ - Description: "JetStream configuration, if not specified, global settings in numaflow-controller-config will be used. See https://docs.nats.io/running-a-nats-service/configuration#jetstream. Only configure \"max_memory_store\" or \"max_file_store\", do not set \"store_dir\" as it has been hardcoded.", + Description: "Nats/JetStream configuration, if not specified, global settings in numaflow-controller-config will be used. See https://docs.nats.io/running-a-nats-service/configuration#limits and https://docs.nats.io/running-a-nats-service/configuration#jetstream. For limits, only \"max_payload\" is supported for configuration, defaults to 1048576 (1MB), not recommended to use values over 8388608 (8MB) but max_payload can be set up to 67108864 (64MB). For jetstream, only \"max_memory_store\" and \"max_file_store\" are supported for configuration, do not set \"store_dir\" as it has been hardcoded.", Type: []string{"string"}, Format: "", }, diff --git a/pkg/reconciler/isbsvc/installer/assets/jetstream/nats.conf b/pkg/reconciler/isbsvc/installer/assets/jetstream/nats.conf index b7df5259d9..c9bb41860a 100644 --- a/pkg/reconciler/isbsvc/installer/assets/jetstream/nats.conf +++ b/pkg/reconciler/isbsvc/installer/assets/jetstream/nats.conf @@ -1,5 +1,8 @@ port: {{.ClientPort}} pid_file: "/var/run/nats/nats.pid" + +max_payload: {{.MaxPayload}} + ############### # # # Monitoring # @@ -15,7 +18,8 @@ server_name: $POD_NAME jetstream { {{.EncryptionSettings}} store_dir: "/data/jetstream/store" - {{.Settings}} + max_memory_store: {{.MaxMemoryStore}} + max_file_store: {{.MaxFileStore}} } ################################### diff --git a/pkg/reconciler/isbsvc/installer/jetstream.go b/pkg/reconciler/isbsvc/installer/jetstream.go index cb6deb7f51..d132682306 100644 --- a/pkg/reconciler/isbsvc/installer/jetstream.go +++ b/pkg/reconciler/isbsvc/installer/jetstream.go @@ -397,10 +397,6 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error { for j := 0; j < replicas; j++ { routes = append(routes, fmt.Sprintf("nats://%s-%s.%s.%s.svc:%s", ssName, strconv.Itoa(j), svcName, r.isbs.Namespace, strconv.Itoa(int(clusterPort)))) } - settings := r.config.ISBSvc.JetStream.Settings - if x := r.isbs.Spec.JetStream.Settings; x != nil { - settings = *x - } encryptionSettings := "" if r.isbs.Spec.JetStream.Encryption { encryptionSettings = "key: $JS_KEY" @@ -415,6 +411,17 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error { } ` } + // Merge Nats settings + v := viper.New() + v.SetConfigType("yaml") + if err := v.ReadConfig(bytes.NewBufferString(r.config.ISBSvc.JetStream.Settings)); err != nil { + return fmt.Errorf("invalid jetstream settings in global configuration, %w", err) + } + if x := r.isbs.Spec.JetStream.Settings; x != nil { + if err := v.MergeConfig(bytes.NewBufferString(*x)); err != nil { + return fmt.Errorf("failed to merge customized jetstream settings, %w", err) + } + } confTpl := template.Must(template.ParseFS(jetStremAssets, "assets/jetstream/nats.conf")) var confTplOutput bytes.Buffer if err := confTpl.Execute(&confTplOutput, struct { @@ -423,7 +430,9 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error { ClusterPort string ClientPort string Routes string - Settings string + MaxPayload string + MaxMemoryStore string + MaxFileStore string EncryptionSettings string TLSConfig string }{ @@ -432,7 +441,9 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error { ClusterPort: strconv.Itoa(int(clusterPort)), ClientPort: strconv.Itoa(int(clientPort)), Routes: strings.Join(routes, ","), - Settings: settings, + MaxPayload: v.GetString("max_payload"), + MaxFileStore: v.GetString("max_file_store"), + MaxMemoryStore: v.GetString("max_memory_store"), EncryptionSettings: encryptionSettings, TLSConfig: clusterTLSConfig, }); err != nil { diff --git a/pkg/sdkclient/sink/client/client.go b/pkg/sdkclient/sink/client/client.go index bbe637cfc6..eba2760d93 100644 --- a/pkg/sdkclient/sink/client/client.go +++ b/pkg/sdkclient/sink/client/client.go @@ -3,10 +3,11 @@ package client import ( "context" "fmt" - "github.com/numaproj/numaflow-go/pkg/sink" "log" "time" + "github.com/numaproj/numaflow-go/pkg/sink" + sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1" "github.com/numaproj/numaflow-go/pkg/info" "google.golang.org/grpc" @@ -28,6 +29,7 @@ func New(inputOptions ...Option) (*client, error) { sockAddr: sink.Addr, serverInfoFilePath: info.ServerInfoFilePath, serverInfoReadinessTimeout: 120 * time.Second, // Default timeout is 120 seconds + maxMessageSize: 1024 * 1024 * 64, // 64 MB } for _, inputOption := range inputOptions { @@ -52,7 +54,8 @@ func New(inputOptions ...Option) (*client, error) { c := new(client) sockAddr := fmt.Sprintf("%s:%s", sink.Protocol, opts.sockAddr) - conn, err := grpc.Dial(sockAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.Dial(sockAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(opts.maxMessageSize), grpc.MaxCallSendMsgSize(opts.maxMessageSize))) if err != nil { return nil, fmt.Errorf("failed to execute grpc.Dial(%q): %w", sockAddr, err) } diff --git a/pkg/sdkclient/sink/client/options.go b/pkg/sdkclient/sink/client/options.go index 6cd3cf2956..4ac819f8d4 100644 --- a/pkg/sdkclient/sink/client/options.go +++ b/pkg/sdkclient/sink/client/options.go @@ -6,6 +6,7 @@ type options struct { sockAddr string serverInfoFilePath string serverInfoReadinessTimeout time.Duration + maxMessageSize int } // Option is the interface to apply options. @@ -31,3 +32,10 @@ func WithServerInfoReadinessTimeout(t time.Duration) Option { o.serverInfoReadinessTimeout = t } } + +// WithMaxMessageSize sets the max message size to the given size. +func WithMaxMessageSize(size int) Option { + return func(o *options) { + o.maxMessageSize = size + } +} diff --git a/pkg/sdkclient/udf/client/client.go b/pkg/sdkclient/udf/client/client.go index 1833c203e5..d1e70faebc 100644 --- a/pkg/sdkclient/udf/client/client.go +++ b/pkg/sdkclient/udf/client/client.go @@ -31,7 +31,7 @@ type client struct { // New creates a new client object. func New(inputOptions ...Option) (*client, error) { var opts = &options{ - maxMessageSize: function.DefaultMaxMessageSize, + maxMessageSize: 1024 * 1024 * 64, // 64 MB serverInfoFilePath: info.ServerInfoFilePath, tcpSockAddr: function.TCP_ADDR, udsSockAddr: function.UDS_ADDR, diff --git a/pkg/udf/builtin/builtin.go b/pkg/udf/builtin/builtin.go index 6f0dfb1f21..f918c48217 100644 --- a/pkg/udf/builtin/builtin.go +++ b/pkg/udf/builtin/builtin.go @@ -43,7 +43,7 @@ func (b *Builtin) Start(ctx context.Context) error { if err != nil { return err } - server.New().RegisterMapper(executor).Start(ctx) + server.New().RegisterMapper(executor).Start(ctx, server.WithMaxMessageSize(1024*1024*64)) return nil }