diff --git a/apis/fluentbit/v1alpha2/clusterinput_types.go b/apis/fluentbit/v1alpha2/clusterinput_types.go
index dbe3f0fe9..e75f58f57 100644
--- a/apis/fluentbit/v1alpha2/clusterinput_types.go
+++ b/apis/fluentbit/v1alpha2/clusterinput_types.go
@@ -59,6 +59,8 @@ type InputSpec struct {
OpenTelemetry *input.OpenTelemetry `json:"openTelemetry,omitempty"`
// HTTP defines forward input plugin configuration
HTTP *input.HTTP `json:"http,omitempty"`
+ // MQTT defines forward input plugin configuration
+ MQTT *input.MQTT `json:"mqtt,omitempty"`
}
// +kubebuilder:object:root=true
diff --git a/apis/fluentbit/v1alpha2/plugins/input/mqtt.go b/apis/fluentbit/v1alpha2/plugins/input/mqtt.go
new file mode 100644
index 000000000..9602e105e
--- /dev/null
+++ b/apis/fluentbit/v1alpha2/plugins/input/mqtt.go
@@ -0,0 +1,38 @@
+package input
+
+import (
+ "fmt"
+
+ "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
+ "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/params"
+)
+
+// +kubebuilder:object:generate:=true
+
+// The MQTT input plugin, allows to retrieve messages/data from MQTT control packets over a TCP connection.
+// The incoming data to receive must be a JSON map.
+// **For full documentation, refer to https://docs.fluentbit.io/manual/pipeline/inputs/mqtt**
+type MQTT struct {
+ // Listener network interface, default: 0.0.0.0
+ Listen string `json:"listen,omitempty"`
+ // TCP port where listening for connections, default: 1883
+ // +kubebuilder:validation:Minimum:=1
+ // +kubebuilder:validation:Maximum:=65535
+ Port *int32 `json:"port,omitempty"`
+}
+
+func (_ *MQTT) Name() string {
+ return "mqtt"
+}
+
+// implement Section() method
+func (m *MQTT) Params(_ plugins.SecretLoader) (*params.KVs, error) {
+ kvs := params.NewKVs()
+ if m.Listen != "" {
+ kvs.Insert("Listen", m.Listen)
+ }
+ if m.Port != nil {
+ kvs.Insert("Port", fmt.Sprint(*m.Port))
+ }
+ return kvs, nil
+}
diff --git a/apis/fluentbit/v1alpha2/plugins/input/zz_generated.deepcopy.go b/apis/fluentbit/v1alpha2/plugins/input/zz_generated.deepcopy.go
index 4d676498c..2ccb6983d 100644
--- a/apis/fluentbit/v1alpha2/plugins/input/zz_generated.deepcopy.go
+++ b/apis/fluentbit/v1alpha2/plugins/input/zz_generated.deepcopy.go
@@ -120,6 +120,26 @@ func (in *HTTP) DeepCopy() *HTTP {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *MQTT) DeepCopyInto(out *MQTT) {
+ *out = *in
+ if in.Port != nil {
+ in, out := &in.Port, &out.Port
+ *out = new(int32)
+ **out = **in
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MQTT.
+func (in *MQTT) DeepCopy() *MQTT {
+ if in == nil {
+ return nil
+ }
+ out := new(MQTT)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NodeExporterMetrics) DeepCopyInto(out *NodeExporterMetrics) {
*out = *in
diff --git a/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go b/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go
index 0f4033b6a..ed49f9665 100644
--- a/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go
+++ b/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go
@@ -1088,6 +1088,11 @@ func (in *InputSpec) DeepCopyInto(out *InputSpec) {
*out = new(input.HTTP)
(*in).DeepCopyInto(*out)
}
+ if in.MQTT != nil {
+ in, out := &in.MQTT, &out.MQTT
+ *out = new(input.MQTT)
+ (*in).DeepCopyInto(*out)
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputSpec.
diff --git a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterinputs.yaml b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterinputs.yaml
index ed29b3526..19d804354 100644
--- a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterinputs.yaml
+++ b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterinputs.yaml
@@ -236,6 +236,20 @@ spec:
- debug
- trace
type: string
+ mqtt:
+ description: MQTT defines forward input plugin configuration
+ properties:
+ listen:
+ description: 'Listener network interface, default: 0.0.0.0'
+ type: string
+ port:
+ description: 'TCP port where listening for connections, default:
+ 1883'
+ format: int32
+ maximum: 65535
+ minimum: 1
+ type: integer
+ type: object
nodeExporterMetrics:
description: NodeExporterMetrics defines Node Exporter Metrics Input
configuration.
diff --git a/config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml b/config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml
index ed29b3526..19d804354 100644
--- a/config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml
+++ b/config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml
@@ -236,6 +236,20 @@ spec:
- debug
- trace
type: string
+ mqtt:
+ description: MQTT defines forward input plugin configuration
+ properties:
+ listen:
+ description: 'Listener network interface, default: 0.0.0.0'
+ type: string
+ port:
+ description: 'TCP port where listening for connections, default:
+ 1883'
+ format: int32
+ maximum: 65535
+ minimum: 1
+ type: integer
+ type: object
nodeExporterMetrics:
description: NodeExporterMetrics defines Node Exporter Metrics Input
configuration.
diff --git a/docs/fluentbit.md b/docs/fluentbit.md
index 4c6ace504..1f46c4290 100644
--- a/docs/fluentbit.md
+++ b/docs/fluentbit.md
@@ -420,7 +420,8 @@ InputSpec defines the desired state of ClusterInput
| customPlugin | CustomPlugin defines Custom Input configuration. | *custom.CustomPlugin |
| forward | Forward defines forward input plugin configuration | *[input.Forward](plugins/input/forward.md) |
| openTelemetry | OpenTelemetry defines forward input plugin configuration | *[input.OpenTelemetry](plugins/input/opentelemetry.md) |
-| http | HTTP defines forward input plugin configuration | *[input.OpenTelemetry](plugins/input/opentelemetry.md) |
+| http | HTTP defines forward input plugin configuration | *[input.HTTP](plugins/input/http.md) |
+| mqtt | MQTT defines forward input plugin configuration | *[input.MQTT](plugins/input/mqtt.md) |
[Back to TOC](#table-of-contents)
# NamespacedFluentBitCfgSpec
diff --git a/docs/plugins/fluentbit/input/mqtt.md b/docs/plugins/fluentbit/input/mqtt.md
new file mode 100644
index 000000000..e8fffd36d
--- /dev/null
+++ b/docs/plugins/fluentbit/input/mqtt.md
@@ -0,0 +1,9 @@
+# MQTT
+
+The MQTT input plugin, allows to retrieve messages/data from MQTT control packets over a TCP connection.
The incoming data to receive must be a JSON map.
**For full documentation, refer to https://docs.fluentbit.io/manual/pipeline/inputs/mqtt**
+
+
+| Field | Description | Scheme |
+| ----- | ----------- | ------ |
+| listen | Listener network interface, default: 0.0.0.0 | string |
+| port | TCP port where listening for connections, default: 1883 | *int32 |
diff --git a/manifests/setup/fluent-operator-crd.yaml b/manifests/setup/fluent-operator-crd.yaml
index bde8e6ab9..ee0548b9c 100644
--- a/manifests/setup/fluent-operator-crd.yaml
+++ b/manifests/setup/fluent-operator-crd.yaml
@@ -1985,6 +1985,20 @@ spec:
- debug
- trace
type: string
+ mqtt:
+ description: MQTT defines forward input plugin configuration
+ properties:
+ listen:
+ description: 'Listener network interface, default: 0.0.0.0'
+ type: string
+ port:
+ description: 'TCP port where listening for connections, default:
+ 1883'
+ format: int32
+ maximum: 65535
+ minimum: 1
+ type: integer
+ type: object
nodeExporterMetrics:
description: NodeExporterMetrics defines Node Exporter Metrics Input
configuration.
diff --git a/manifests/setup/setup.yaml b/manifests/setup/setup.yaml
index 849f9b498..544fdd978 100644
--- a/manifests/setup/setup.yaml
+++ b/manifests/setup/setup.yaml
@@ -1985,6 +1985,20 @@ spec:
- debug
- trace
type: string
+ mqtt:
+ description: MQTT defines forward input plugin configuration
+ properties:
+ listen:
+ description: 'Listener network interface, default: 0.0.0.0'
+ type: string
+ port:
+ description: 'TCP port where listening for connections, default:
+ 1883'
+ format: int32
+ maximum: 65535
+ minimum: 1
+ type: integer
+ type: object
nodeExporterMetrics:
description: NodeExporterMetrics defines Node Exporter Metrics Input
configuration.