Skip to content

Commit

Permalink
Merge pull request fluent#825 from verrazzano/karak/support_file_syst…
Browse files Browse the repository at this point in the history
…em_as_storage

Support file system as storage layer in service section of fluenbit
  • Loading branch information
benjaminhuo authored and karan56625 committed Jul 11, 2023
1 parent d8debc6 commit 2e7b8ca
Show file tree
Hide file tree
Showing 18 changed files with 473 additions and 0 deletions.
46 changes: 46 additions & 0 deletions apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,27 @@ type FluentBitConfigSpec struct {
Namespace *string `json:"namespace,omitempty"`
}

type Storage struct {
// Select an optional location in the file system to store streams and chunks of data/
Path string `json:"path,omitempty"`
// Configure the synchronization mode used to store the data into the file system
// +kubebuilder:validation:Enum:=normal;full
Sync string `json:"sync,omitempty"`
// Enable the data integrity check when writing and reading data from the filesystem
// +kubebuilder:validation:Enum:=on;off
Checksum string `json:"checksum,omitempty"`
// This option configure a hint of maximum value of memory to use when processing these records
BacklogMemLimit string `json:"backlogMemLimit,omitempty"`
// If the input plugin has enabled filesystem storage type, this property sets the maximum number of Chunks that can be up in memory
MaxChunksUp *int64 `json:"maxChunksUp,omitempty"`
// If http_server option has been enabled in the Service section, this option registers a new endpoint where internal metrics of the storage layer can be consumed
// +kubebuilder:validation:Enum:=on;off
Metrics string `json:"metrics,omitempty"`
// When enabled, irrecoverable chunks will be deleted during runtime, and any other irrecoverable chunk located in the configured storage path directory will be deleted when Fluent-Bit starts.
// +kubebuilder:validation:Enum:=on;off
DeleteIrrecoverableChunks string `json:"deleteIrrecoverableChunks,omitempty"`
}

type Service struct {
// If true go to background on start
Daemon *bool `json:"daemon,omitempty"`
Expand Down Expand Up @@ -80,6 +101,8 @@ type Service struct {
LogLevel string `json:"logLevel,omitempty"`
// Optional 'parsers' config file (can be multiple)
ParsersFile string `json:"parsersFile,omitempty"`
// Configure a global environment for the storage layer in Service. It is recommended to configure the volume and volumeMount separately for this storage. The hostPath type should be used for that Volume in Fluentbit daemon set.
Storage *Storage `json:"storage,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down Expand Up @@ -149,6 +172,29 @@ func (s *Service) Params() *params.KVs {
if s.ParsersFile != "" {
m.Insert("Parsers_File", s.ParsersFile)
}
if s.Storage != nil {
if s.Storage.Path != "" {
m.Insert("storage.path", s.Storage.Path)
}
if s.Storage.Sync != "" {
m.Insert("storage.sync", s.Storage.Sync)
}
if s.Storage.Checksum != "" {
m.Insert("storage.checksum", s.Storage.Checksum)
}
if s.Storage.BacklogMemLimit != "" {
m.Insert("storage.backlog.mem_limit", s.Storage.BacklogMemLimit)
}
if s.Storage.Metrics != "" {
m.Insert("storage.metrics", s.Storage.Metrics)
}
if s.Storage.MaxChunksUp != nil {
m.Insert("storage.max_chunks_up", fmt.Sprint(*s.Storage.MaxChunksUp))
}
if s.Storage.DeleteIrrecoverableChunks != "" {
m.Insert("storage.delete_irrecoverable_chunks", s.Storage.DeleteIrrecoverableChunks)
}
}
return m
}

Expand Down
12 changes: 12 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/input/systemd_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ type Systemd struct {
// Remove the leading underscore of the Journald field (key). For example the Journald field _PID becomes the key PID.
// +kubebuilder:validation:Enum:=on;off
StripUnderscores string `json:"stripUnderscores,omitempty"`
// Specify the buffering mechanism to use. It can be memory or filesystem
// +kubebuilder:validation:Enum:=filesystem;memory
StorageType string `json:"storageType,omitempty"`
// Specifies if the input plugin should be paused (stop ingesting new data) when the storage.max_chunks_up value is reached.
// +kubebuilder:validation:Enum:=on;off
PauseOnChunksOverlimit string `json:"pauseOnChunksOverlimit,omitempty"`
}

func (_ *Systemd) Name() string {
Expand Down Expand Up @@ -85,6 +91,12 @@ func (s *Systemd) Params(_ plugins.SecretLoader) (*params.KVs, error) {
if s.StripUnderscores != "" {
kvs.Insert("Strip_Underscores", s.StripUnderscores)
}
if s.StorageType != "" {
kvs.Insert("storage.type", s.StorageType)
}
if s.PauseOnChunksOverlimit != "" {
kvs.Insert("storage.pause_on_chunks_overlimit", s.PauseOnChunksOverlimit)
}

return kvs, nil
}
12 changes: 12 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/input/tail_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ type Tail struct {
// This will help to reassembly multiline messages originally split by Docker or CRI
//Specify one or Multiline Parser definition to apply to the content.
MultilineParser string `json:"multilineParser,omitempty"`
// Specify the buffering mechanism to use. It can be memory or filesystem
// +kubebuilder:validation:Enum:=filesystem;memory
StorageType string `json:"storageType,omitempty"`
// Specifies if the input plugin should be paused (stop ingesting new data) when the storage.max_chunks_up value is reached.
// +kubebuilder:validation:Enum:=on;off
PauseOnChunksOverlimit string `json:"pauseOnChunksOverlimit,omitempty"`
}

func (_ *Tail) Name() string {
Expand Down Expand Up @@ -179,5 +185,11 @@ func (t *Tail) Params(_ plugins.SecretLoader) (*params.KVs, error) {
if t.MultilineParser != "" {
kvs.Insert("multiline.parser", t.MultilineParser)
}
if t.StorageType != "" {
kvs.Insert("storage.type", t.StorageType)
}
if t.PauseOnChunksOverlimit != "" {
kvs.Insert("storage.pause_on_chunks_overlimit", t.PauseOnChunksOverlimit)
}
return kvs, nil
}
5 changes: 5 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/output/open_search_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ type OpenSearch struct {
// Enables dedicated thread(s) for this output. Default value is set since version 1.8.13. For previous versions is 0.
Workers *int32 `json:"Workers,omitempty"`
*plugins.TLS `json:"tls,omitempty"`
// Limit the maximum number of Chunks in the filesystem for the current output logical destination.
TotalLimitSize string `json:"totalLimitSize,omitempty"`
}

// Name implement Section() method
Expand Down Expand Up @@ -240,5 +242,8 @@ func (o *OpenSearch) Params(sl plugins.SecretLoader) (*params.KVs, error) {
}
kvs.Merge(tls)
}
if o.TotalLimitSize != "" {
kvs.Insert("storage.total_limit_size", o.TotalLimitSize)
}
return kvs, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,58 @@ spec:
parsersFile:
description: Optional 'parsers' config file (can be multiple)
type: string
storage:
description: Configure a global environment for the storage layer
in Service. It is recommended to configure the volume and volumeMount
separately for this storage. The hostPath type should be used
for that Volume in Fluentbit daemon set.
properties:
backlogMemLimit:
description: This option configure a hint of maximum value
of memory to use when processing these records
type: string
checksum:
description: Enable the data integrity check when writing
and reading data from the filesystem
enum:
- "on"
- "off"
type: string
deleteIrrecoverableChunks:
description: When enabled, irrecoverable chunks will be deleted
during runtime, and any other irrecoverable chunk located
in the configured storage path directory will be deleted
when Fluent-Bit starts.
enum:
- "on"
- "off"
type: string
maxChunksUp:
description: If the input plugin has enabled filesystem storage
type, this property sets the maximum number of Chunks that
can be up in memory
format: int64
type: integer
metrics:
description: If http_server option has been enabled in the
Service section, this option registers a new endpoint where
internal metrics of the storage layer can be consumed
enum:
- "on"
- "off"
type: string
path:
description: Select an optional location in the file system
to store streams and chunks of data/
type: string
sync:
description: Configure the synchronization mode used to store
the data into the file system
enum:
- normal
- full
type: string
type: object
type: object
type: object
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,28 @@ spec:
not set, the plugin will use default paths to read local-only
logs.
type: string
pauseOnChunksOverlimit:
description: Specifies if the input plugin should be paused (stop
ingesting new data) when the storage.max_chunks_up value is
reached.
enum:
- "on"
- "off"
type: string
readFromTail:
description: Start reading new entries. Skip entries already stored
in Journald.
enum:
- "on"
- "off"
type: string
storageType:
description: Specify the buffering mechanism to use. It can be
memory or filesystem
enum:
- filesystem
- memory
type: string
stripUnderscores:
description: Remove the leading underscore of the Journald field
(key). For example the Journald field _PID becomes the key PID.
Expand Down Expand Up @@ -319,6 +334,14 @@ spec:
file as part of the record. The value assigned becomes the key
in the map.
type: string
pauseOnChunksOverlimit:
description: Specifies if the input plugin should be paused (stop
ingesting new data) when the storage.max_chunks_up value is
reached.
enum:
- "on"
- "off"
type: string
readFromHead:
description: For new discovered files on start (without a database
offset/position), read the content from the head of the file,
Expand All @@ -341,6 +364,13 @@ spec:
behavior and instruct Fluent Bit to skip long lines and continue
processing other lines that fits into the buffer size.
type: boolean
storageType:
description: Specify the buffering mechanism to use. It can be
memory or filesystem
enum:
- filesystem
- memory
type: string
tag:
description: Set a tag (with regex-extract fields) that will be
placed on lines read. E.g. kube.<namespace_name>.<pod_name>.<container_name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1573,6 +1573,10 @@ spec:
description: Hostname to be used for TLS SNI extension
type: string
type: object
totalLimitSize:
description: Limit the maximum number of Chunks in the filesystem
for the current output logical destination.
type: string
traceError:
description: When enabled print the elasticsearch API calls to
stdout when elasticsearch returns an error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1573,6 +1573,10 @@ spec:
description: Hostname to be used for TLS SNI extension
type: string
type: object
totalLimitSize:
description: Limit the maximum number of Chunks in the filesystem
for the current output logical destination.
type: string
traceError:
description: When enabled print the elasticsearch API calls to
stdout when elasticsearch returns an error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ spec:
{{- toYaml .Values.fluentbit.input.systemd.systemdFilter.filters | nindent 6 }}
{{- end }}
{{- end }}
storageType: {{ .Values.fluentbit.input.systemd.storageType }}
{{- if eq .Values.fluentbit.input.systemd.storageType "filesystem" }}
pauseOnChunksOverlimit: {{ .Values.fluentbit.input.systemd.pauseOnChunksOverlimit | quote }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ spec:
skipLongLines: {{ .Values.fluentbit.input.tail.skipLongLines }}
db: /fluent-bit/tail/pos.db
dbSync: Normal
storageType: {{ .Values.fluentbit.input.tail.storageType }}
{{- if eq .Values.fluentbit.input.tail.storageType "filesystem" }}
pauseOnChunksOverlimit: {{ .Values.fluentbit.input.tail.pauseOnChunksOverlimit | quote }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ metadata:
spec:
service:
parsersFile: parsers.conf
{{- if .Values.fluentbit.service.storage }}
storage:
{{ toYaml .Values.fluentbit.service.storage | indent 6 }}
{{- end }}
inputSelector:
matchLabels:
fluentbit.fluent.io/enabled: "true"
Expand Down
26 changes: 26 additions & 0 deletions charts/fluent-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ fluentbit:
# - name: hostSys
# hostPath:
# path: /sys/
# Uncomment the code if you intend to create the volume for buffer storage in case the storage type "filesystem" is being used in the configuration of the fluentbit service.
# - name: hostBuffer
# hostPath:
# path: /tmp/fluent-bit-buffer
# additionalVolumesMounts:
# - mountPath: /host/sys
# mountPropagation: HostToContainer
Expand All @@ -113,6 +117,11 @@ fluentbit:
# mountPropagation: HostToContainer
# name: hostProc
# readOnly: true
# Uncomment the code if you intend to mount the volume for buffer storage in case the storage type "filesystem" is being used in the configuration of the fluentbit service.
# - mountPath: /host/fluent-bit-buffer
# mountPropagation: HostToContainer
# name: hostBuffer


namespaceFluentBitCfgSelector: {}

Expand All @@ -130,6 +139,9 @@ fluentbit:
path: "/var/log/containers/*.log"
skipLongLines: true
readFromHead: false
# Use storageType as "filesystem" if you want to use filesystem as the buffering mechanism for tail input.
storageType: memory
pauseOnChunksOverlimit: "off"
systemd:
enable: true
systemdFilter:
Expand All @@ -138,6 +150,9 @@ fluentbit:
path: "/var/log/journal"
includeKubelet: true
stripUnderscores: "off"
# Use storageType as "filesystem" if you want to use filesystem as the buffering mechanism for systemd input.
storageType: memory
pauseOnChunksOverlimit: "off"
nodeExporterMetrics: {}
# uncomment below nodeExporterMetrics section if you want to collect node exporter metrics
# nodeExporterMetrics:
Expand Down Expand Up @@ -183,6 +198,17 @@ fluentbit:
# You can configure the opensearch-related configuration here
stdout:
enable: false
service:
storage: {}
# Remove the above storage section and uncomment below section if you want to configure file-system as storage for buffer
# storage:
# path: "/host/fluent-bit-buffer/"
# backlogMemLimit: "50MB"
# checksum: "off"
# deleteIrrecoverableChunks: "on"
# maxChunksUp: 128
# metrics: "on"
# sync: normal

# Configure the default filters in FluentBit.
# The `filter` will filter and parse the collected log information and output the logs into a uniform format. You can choose whether to turn this on or not.
Expand Down
Loading

0 comments on commit 2e7b8ca

Please sign in to comment.