Skip to content

Commit

Permalink
feat: capability to increase max message size (#835)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Jul 7, 2023
1 parent 5639e5c commit 1f19a74
Show file tree
Hide file tree
Showing 20 changed files with 108 additions and 24 deletions.
2 changes: 1 addition & 1 deletion api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -18298,7 +18298,7 @@
"type": "string"
},
"settings": {
"description": "JetStream configuration, if not specified, global settings in numaflow-controller-config will be used. See https://docs.nats.io/running-a-nats-service/configuration#jetstream. Only configure \"max_memory_store\" or \"max_file_store\", do not set \"store_dir\" as it has been hardcoded.",
"description": "Nats/JetStream configuration, if not specified, global settings in numaflow-controller-config will be used. See https://docs.nats.io/running-a-nats-service/configuration#limits and https://docs.nats.io/running-a-nats-service/configuration#jetstream. For limits, only \"max_payload\" is supported for configuration, defaults to 1048576 (1MB), not recommended to use values over 8388608 (8MB) but max_payload can be set up to 67108864 (64MB). For jetstream, only \"max_memory_store\" and \"max_file_store\" are supported for configuration, do not set \"store_dir\" as it has been hardcoded.",
"type": "string"
},
"startArgs": {
Expand Down
2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -18294,7 +18294,7 @@
"type": "string"
},
"settings": {
"description": "JetStream configuration, if not specified, global settings in numaflow-controller-config will be used. See https://docs.nats.io/running-a-nats-service/configuration#jetstream. Only configure \"max_memory_store\" or \"max_file_store\", do not set \"store_dir\" as it has been hardcoded.",
"description": "Nats/JetStream configuration, if not specified, global settings in numaflow-controller-config will be used. See https://docs.nats.io/running-a-nats-service/configuration#limits and https://docs.nats.io/running-a-nats-service/configuration#jetstream. For limits, only \"max_payload\" is supported for configuration, defaults to 1048576 (1MB), not recommended to use values over 8388608 (8MB) but max_payload can be set up to 67108864 (64MB). For jetstream, only \"max_memory_store\" and \"max_file_store\" are supported for configuration, do not set \"store_dir\" as it has been hardcoded.",
"type": "string"
},
"startArgs": {
Expand Down
4 changes: 4 additions & 0 deletions config/advanced-install/namespaced-controller-wo-crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ data:
jetstream:
# Default JetStream settings, could be overridden by InterStepBufferService specs
settings: |
# https://docs.nats.io/running-a-nats-service/configuration#limits
# Only support to configure "max_payload".
# Max payload size in bytes, defaults to 1 MB. It is not recommended to use values over 8MB but max_payload can be set up to 64MB.
max_payload: 1048576
# https://docs.nats.io/running-a-nats-service/configuration#jetstream
# Only configure "max_memory_store" or "max_file_store", do not set "store_dir" as it has been hardcoded.
# e.g. 1G. -1 means no limit, up to 75% of available memory. This only take effect for streams created using memory storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ data:
jetstream:
# Default JetStream settings, could be overridden by InterStepBufferService specs
settings: |
# https://docs.nats.io/running-a-nats-service/configuration#limits
# Only support to configure "max_payload".
# Max payload size in bytes, defaults to 1 MB. It is not recommended to use values over 8MB but max_payload can be set up to 64MB.
max_payload: 1048576
# https://docs.nats.io/running-a-nats-service/configuration#jetstream
# Only configure "max_memory_store" or "max_file_store", do not set "store_dir" as it has been hardcoded.
# e.g. 1G. -1 means no limit, up to 75% of available memory. This only take effect for streams created using memory storage.
Expand Down
4 changes: 4 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12621,6 +12621,10 @@ data:
jetstream:
# Default JetStream settings, could be overridden by InterStepBufferService specs
settings: |
# https://docs.nats.io/running-a-nats-service/configuration#limits
# Only support to configure "max_payload".
# Max payload size in bytes, defaults to 1 MB. It is not recommended to use values over 8MB but max_payload can be set up to 64MB.
max_payload: 1048576
# https://docs.nats.io/running-a-nats-service/configuration#jetstream
# Only configure "max_memory_store" or "max_file_store", do not set "store_dir" as it has been hardcoded.
# e.g. 1G. -1 means no limit, up to 75% of available memory. This only take effect for streams created using memory storage.
Expand Down
4 changes: 4 additions & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12532,6 +12532,10 @@ data:
jetstream:
# Default JetStream settings, could be overridden by InterStepBufferService specs
settings: |
# https://docs.nats.io/running-a-nats-service/configuration#limits
# Only support to configure "max_payload".
# Max payload size in bytes, defaults to 1 MB. It is not recommended to use values over 8MB but max_payload can be set up to 64MB.
max_payload: 1048576
# https://docs.nats.io/running-a-nats-service/configuration#jetstream
# Only configure "max_memory_store" or "max_file_store", do not set "store_dir" as it has been hardcoded.
# e.g. 1G. -1 means no limit, up to 75% of available memory. This only take effect for streams created using memory storage.
Expand Down
11 changes: 8 additions & 3 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2166,11 +2166,16 @@ type.)
<td>
<em>(Optional)</em>
<p>
JetStream configuration, if not specified, global settings in
Nats/JetStream configuration, if not specified, global settings in
numaflow-controller-config will be used. See
<a href="https://docs.nats.io/running-a-nats-service/configuration#limits">https://docs.nats.io/running-a-nats-service/configuration#limits</a>
and
<a href="https://docs.nats.io/running-a-nats-service/configuration#jetstream">https://docs.nats.io/running-a-nats-service/configuration#jetstream</a>.
Only configure “max_memory_store” or “max_file_store”, do not set
“store_dir” as it has been hardcoded.
For limits, only “max_payload” is supported for configuration, defaults
to 1048576 (1MB), not recommended to use values over 8388608 (8MB) but
max_payload can be set up to 67108864 (64MB). For jetstream, only
“max_memory_store” and “max_file_store” are supported for configuration,
do not set “store_dir” as it has been hardcoded.
</p>
</td>
</tr>
Expand Down
7 changes: 6 additions & 1 deletion docs/core-concepts/inter-step-buffer-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,13 @@ There are 2 places to configure JetStream settings:
A sample JetStream configuration:

```
# https://docs.nats.io/running-a-nats-service/configuration#limits
# Only "max_payload" is supported for configuration in this section.
# Max payload size in bytes, defaults to 1 MB. It is not recommended to use values over 8MB but max_payload can be set up to 64MB.
max_payload: 1048576
#
# https://docs.nats.io/running-a-nats-service/configuration#jetstream
# Only configure "max_memory_store" or "max_file_store", do not set "store_dir" as it has been hardcoded.
# Only configure "max_memory_store" or "max_file_store" in this section, do not set "store_dir" as it has been hardcoded.
#
# e.g. 1G. -1 means no limit, up to 75% of available memory. This only take effect for streams created using memory storage.
max_memory_store: -1
Expand Down
4 changes: 4 additions & 0 deletions docs/operations/numaflow-controller-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ data:
jetstream:
# Default JetStream settings, could be overridden by InterStepBufferService specs
settings: |
# https://docs.nats.io/running-a-nats-service/configuration#limits
# Only support to configure "max_payload".
# Max payload size, defaults to 1 MB. It is not recommended to use values over 8MB but max_payload can be set up to 64MB.
max_payload: 1048576
# https://docs.nats.io/running-a-nats-service/configuration#jetstream
# Only configure "max_memory_store" or "max_file_store", do not set "store_dir" as it has been hardcoded.
# e.g. 1G. -1 means no limit, up to 75% of available memory. This only take effect for streams created using memory storage.
Expand Down
25 changes: 25 additions & 0 deletions docs/user-guide/reference/configuration/max-message-size.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Maximum Message Size

The default maximum message size is `1MB`. There's a way to increase this limit in case you want to, but please think it through before doing so.

The max message size is determined by:

- Max messages size supported by gRPC (default value is `64MB` in Numaflow).
- Max messages size supported by the Inter-Step Buffer implementation.

If `JetStream` is used as the Inter-Step Buffer implementation, the default max message size for it is configured as `1MB`. You can change it by setting the `spec.jetstream.settings` in the `InterStepBufferService` specification.

```yaml
apiVersion: numaflow.numaproj.io/v1alpha1
kind: InterStepBufferService
metadata:
name: default
spec:
jetstream:
settings: |
max_payload: 8388608 # 8MB
```
It's not recommended to use values over `8388608` (8MB) but `max_payload` can be set up to `67108864` (64MB).

Please be aware that if you increase the max message size of the `InterStepBufferService`, you probably will also need to change some other limits. For example, if the size of each messages is as large as 8MB, then 100 messages flowing in the pipeline will make each of the Inter-Step Buffer need at least 800MB of disk space to store the messages, and the memory consumption will also be high, that will probably cause the Inter-Step Buffer Service to crash. In that case, you might need to update the retention policy in the Inter-Step Buffer Service to make sure the messages are not stored for too long. Check out the [Inter-Step Buffer Service](../../../core-concepts/inter-step-buffer-service.md#buffer-configuration) for more details.
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ nav:
- user-guide/reference/configuration/init-containers.md
- user-guide/reference/configuration/sidecar-containers.md
- user-guide/reference/configuration/pipeline-customization.md
- user-guide/reference/configuration/max-message-size.md
- user-guide/reference/kustomize/kustomize.md
- APIs.md
- Operator Manual:
Expand Down
7 changes: 4 additions & 3 deletions pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ type JetStreamBufferService struct {
Persistence *PersistenceStrategy `json:"persistence,omitempty" protobuf:"bytes,6,opt,name=persistence"`
// +optional
AbstractPodTemplate `json:",inline" protobuf:"bytes,7,opt,name=abstractPodTemplate"`
// JetStream configuration, if not specified, global settings in numaflow-controller-config will be used.
// See https://docs.nats.io/running-a-nats-service/configuration#jetstream.
// Only configure "max_memory_store" or "max_file_store", do not set "store_dir" as it has been hardcoded.
// Nats/JetStream configuration, if not specified, global settings in numaflow-controller-config will be used.
// See https://docs.nats.io/running-a-nats-service/configuration#limits and https://docs.nats.io/running-a-nats-service/configuration#jetstream.
// For limits, only "max_payload" is supported for configuration, defaults to 1048576 (1MB), not recommended to use values over 8388608 (8MB) but max_payload can be set up to 67108864 (64MB).
// For jetstream, only "max_memory_store" and "max_file_store" are supported for configuration, do not set "store_dir" as it has been hardcoded.
// +optional
Settings *string `json:"settings,omitempty" protobuf:"bytes,8,opt,name=settings"`
// Optional arguments to start nats-server. For example, "-D" to enable debugging output, "-DV" to enable debugging and tracing.
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion pkg/reconciler/isbsvc/installer/assets/jetstream/nats.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
port: {{.ClientPort}}
pid_file: "/var/run/nats/nats.pid"

max_payload: {{.MaxPayload}}

###############
# #
# Monitoring #
Expand All @@ -15,7 +18,8 @@ server_name: $POD_NAME
jetstream {
{{.EncryptionSettings}}
store_dir: "/data/jetstream/store"
{{.Settings}}
max_memory_store: {{.MaxMemoryStore}}
max_file_store: {{.MaxFileStore}}
}

###################################
Expand Down
23 changes: 17 additions & 6 deletions pkg/reconciler/isbsvc/installer/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,6 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error {
for j := 0; j < replicas; j++ {
routes = append(routes, fmt.Sprintf("nats://%s-%s.%s.%s.svc:%s", ssName, strconv.Itoa(j), svcName, r.isbs.Namespace, strconv.Itoa(int(clusterPort))))
}
settings := r.config.ISBSvc.JetStream.Settings
if x := r.isbs.Spec.JetStream.Settings; x != nil {
settings = *x
}
encryptionSettings := ""
if r.isbs.Spec.JetStream.Encryption {
encryptionSettings = "key: $JS_KEY"
Expand All @@ -415,6 +411,17 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error {
}
`
}
// Merge Nats settings
v := viper.New()
v.SetConfigType("yaml")
if err := v.ReadConfig(bytes.NewBufferString(r.config.ISBSvc.JetStream.Settings)); err != nil {
return fmt.Errorf("invalid jetstream settings in global configuration, %w", err)
}
if x := r.isbs.Spec.JetStream.Settings; x != nil {
if err := v.MergeConfig(bytes.NewBufferString(*x)); err != nil {
return fmt.Errorf("failed to merge customized jetstream settings, %w", err)
}
}
confTpl := template.Must(template.ParseFS(jetStremAssets, "assets/jetstream/nats.conf"))
var confTplOutput bytes.Buffer
if err := confTpl.Execute(&confTplOutput, struct {
Expand All @@ -423,7 +430,9 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error {
ClusterPort string
ClientPort string
Routes string
Settings string
MaxPayload string
MaxMemoryStore string
MaxFileStore string
EncryptionSettings string
TLSConfig string
}{
Expand All @@ -432,7 +441,9 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error {
ClusterPort: strconv.Itoa(int(clusterPort)),
ClientPort: strconv.Itoa(int(clientPort)),
Routes: strings.Join(routes, ","),
Settings: settings,
MaxPayload: v.GetString("max_payload"),
MaxFileStore: v.GetString("max_file_store"),
MaxMemoryStore: v.GetString("max_memory_store"),
EncryptionSettings: encryptionSettings,
TLSConfig: clusterTLSConfig,
}); err != nil {
Expand Down
7 changes: 5 additions & 2 deletions pkg/sdkclient/sink/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package client
import (
"context"
"fmt"
"github.com/numaproj/numaflow-go/pkg/sink"
"log"
"time"

"github.com/numaproj/numaflow-go/pkg/sink"

sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1"
"github.com/numaproj/numaflow-go/pkg/info"
"google.golang.org/grpc"
Expand All @@ -28,6 +29,7 @@ func New(inputOptions ...Option) (*client, error) {
sockAddr: sink.Addr,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoReadinessTimeout: 120 * time.Second, // Default timeout is 120 seconds
maxMessageSize: 1024 * 1024 * 64, // 64 MB
}

for _, inputOption := range inputOptions {
Expand All @@ -52,7 +54,8 @@ func New(inputOptions ...Option) (*client, error) {

c := new(client)
sockAddr := fmt.Sprintf("%s:%s", sink.Protocol, opts.sockAddr)
conn, err := grpc.Dial(sockAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.Dial(sockAddr, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(opts.maxMessageSize), grpc.MaxCallSendMsgSize(opts.maxMessageSize)))
if err != nil {
return nil, fmt.Errorf("failed to execute grpc.Dial(%q): %w", sockAddr, err)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/sdkclient/sink/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type options struct {
sockAddr string
serverInfoFilePath string
serverInfoReadinessTimeout time.Duration
maxMessageSize int
}

// Option is the interface to apply options.
Expand All @@ -31,3 +32,10 @@ func WithServerInfoReadinessTimeout(t time.Duration) Option {
o.serverInfoReadinessTimeout = t
}
}

// WithMaxMessageSize sets the max message size to the given size.
func WithMaxMessageSize(size int) Option {
return func(o *options) {
o.maxMessageSize = size
}
}
2 changes: 1 addition & 1 deletion pkg/sdkclient/udf/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type client struct {
// New creates a new client object.
func New(inputOptions ...Option) (*client, error) {
var opts = &options{
maxMessageSize: function.DefaultMaxMessageSize,
maxMessageSize: 1024 * 1024 * 64, // 64 MB
serverInfoFilePath: info.ServerInfoFilePath,
tcpSockAddr: function.TCP_ADDR,
udsSockAddr: function.UDS_ADDR,
Expand Down
2 changes: 1 addition & 1 deletion pkg/udf/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (b *Builtin) Start(ctx context.Context) error {
if err != nil {
return err
}
server.New().RegisterMapper(executor).Start(ctx)
server.New().RegisterMapper(executor).Start(ctx, server.WithMaxMessageSize(1024*1024*64))
return nil
}

Expand Down

0 comments on commit 1f19a74

Please sign in to comment.