diff --git a/defaults/main.yml b/defaults/main.yml index 09b564b..0865433 100644 --- a/defaults/main.yml +++ b/defaults/main.yml @@ -76,12 +76,6 @@ kafka_transaction_state_log_min_isr: 1 ############################# Log Flush Policy -# The number of messages to accept before forcing a flush of data to disk -kafka_log_flush_interval_messages: 10000 - -# The maximum amount of time a message can sit in a log before we force a flush -kafka_log_flush_interval_ms: 1000 - ############################# Log Retention Policy # The minimum age of a log file to be eligible for deletion due to age @@ -94,6 +88,9 @@ kafka_log_retention_bytes: 1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. kafka_log_segment_bytes: 1073741824 +# The largest size of the message that can be received by the broker from a producer. +kafka_message_max_bytes: 10000000 + # The interval at which log segments are checked to see if they can be deleted according # to the retention policies kafka_log_retention_check_interval_ms: 300000 diff --git a/tasks/sasl-auth.yml b/tasks/sasl-auth.yml index 84ee029..9d84e9b 100644 --- a/tasks/sasl-auth.yml +++ b/tasks/sasl-auth.yml @@ -20,7 +20,9 @@ - name: Set Kafka inter-broker communication protocol set_fact: kafka_security_inter_broker_protocol: "SASL_PLAINTEXT" - when: kafka_security_inter_broker_protocol is not defined + when: + - kafka_security_inter_broker_protocol is not defined + - kafka_listener_security_protocol_map is not defined - name: Set SASL mechanism for Kafka inter-broker communication protocol set_fact: diff --git a/templates/server.properties.j2 b/templates/server.properties.j2 index a8d328e..0cc4bca 100644 --- a/templates/server.properties.j2 +++ b/templates/server.properties.j2 @@ -33,6 +33,11 @@ broker.id={{ kafka_broker_id }} listeners={{ kafka_listeners }} {% endif %} +# Kafka brokers communicate between themselves, usually on the internal network. To define which listener to use, The host/IP used must be accessible from the broker machine to others. +{% if kafka_inter_broker_listener_name is defined %} +inter.broker.listener.name={{ kafka_inter_broker_listener_name }} +{% endif %} + # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). @@ -75,6 +80,8 @@ socket.receive.buffer.bytes={{ kafka_socket_receive_buffer_bytes }} # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes={{ kafka_socket_request_max_bytes }} +# The largest size of the message that can be received by the broker from a producer. +message.max.bytes={{ kafka_message_max_bytes }} ############################# Log Basics ############################# @@ -108,12 +115,6 @@ transaction.state.log.min.isr={{ kafka_transaction_state_log_min_isr }} # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. -# The number of messages to accept before forcing a flush of data to disk -log.flush.interval.messages={{ kafka_log_flush_interval_messages }} - -# The maximum amount of time a message can sit in a log before we force a flush -log.flush.interval.ms={{ kafka_log_flush_interval_ms }} - ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can