The module lifecycle stage: Preview
The module has requirements for installation
KafkaClass
KafkaClass is the primary resource administrators work with. The managed-kafka module manages Kafka instances in Deckhouse Kubernetes Platform, and KafkaClass is the cluster-scoped resource that governs every Kafka instance associated with it: it defines allowed sizing combinations, sets broker configuration defaults, controls which parameters users are allowed to override, and enforces validation rules.
Every Kafka resource must reference an existing KafkaClass. All configuration is validated against the referenced class before the service is deployed.
A default KafkaClass is shipped with the module and contains ready-to-use baseline settings suitable for most workloads. You can use it as-is, or copy and modify it to create custom classes tailored to specific teams or environments.
Sizing Policies
The structure allows creating a set of sizing policies for associated Kafka resources.
This helps avoid uneven distribution of CPU and Memory resources on cluster nodes.
The determining factor for choosing a particular policy is falling within the cores interval.
Then compliance with other fields of the specific policy will be checked.
spec:
sizingPolicies:
- cores:
min: 1
max: 3
memory:
min: "0.5Gi"
max: "8Gi"
step: "512Mi"
coreFractions: ["25%", "50%", "100%"]
- cores:
min: 4
max: 16
memory:
min: "8Gi"
max: "64Gi"
step: "1Gi"
coreFractions: ["50%", "100%"]Validation Rules
CEL (Common Expression Language) syntax is used to create flexible validation mechanisms.
We provide a set of predefined variables that can be used in rule:
instance.memory.sizeint— pod memory limit in bytesinstance.cpu.coresint— number of CPU coresconfiguration.logRetentionHoursint— log retention in hoursconfiguration.logRetentionMsint— log retention in milliseconds (-1= disabled)configuration.logRetentionBytesint— log retention in bytes (-1= disabled)configuration.logSegmentBytesint— maximum log segment size in bytesconfiguration.messageMaxBytesint— maximum message size in bytesconfiguration.compressionTypestring— broker compression typeconfiguration.numNetworkThreadsint— number of network threadsconfiguration.numIoThreadsint— number of I/O threadsconfiguration.logCleanerThreadsint— number of log cleaner threadsconfiguration.socketRequestMaxBytesint— maximum socket request size in bytes
spec:
validations:
- message: "messageMaxBytes must not exceed logSegmentBytes (a message must fit in one segment)"
rule: "configuration.messageMaxBytes <= configuration.logSegmentBytes"
- message: "logRetentionHours must be at least 1"
rule: "configuration.logRetentionHours >= 1"
- message: "logRetentionMs and logRetentionHours are mutually exclusive — set only one"
rule: "configuration.logRetentionMs == 0 || configuration.logRetentionHours == 0"
- message: "socketRequestMaxBytes must be >= messageMaxBytes (otherwise the broker rejects the request)"
rule: "configuration.socketRequestMaxBytes >= configuration.messageMaxBytes"
- message: "numNetworkThreads must not exceed 2 × CPU cores"
rule: "configuration.numNetworkThreads <= instance.cpu.cores * 2"
- message: "numIoThreads must not exceed 4 × CPU cores"
rule: "configuration.numIoThreads <= instance.cpu.cores * 4"
- message: "logCleanerThreads must not exceed the number of CPU cores"
rule: "configuration.logCleanerThreads <= instance.cpu.cores"Overridable Configuration
A whitelist of configuration parameters that can be overridden in the Kafka resource.
List of all possible parameters.
spec:
overridableConfiguration:
- logRetentionHours
- logRetentionMs
- logRetentionBytes
- messageMaxBytes
- compressionType
- autoCreateTopicsEnableConfiguration
Configuration parameters that can be defined at the KafkaClass level.
Values of these parameters will override defaults for all associated Kafka resources.
Note: Parameters that were allowed and overridden in overridableConfiguration will take priority.
spec:
configuration:
logRetentionHours: 168
logSegmentBytes: "512Mi"
logCleanupPolicy: delete
compressionType: producer
numNetworkThreads: 3
numIoThreads: 8
numRecoveryThreadsPerDataDir: 1
socketSendBufferBytes: "1Mi"
socketReceiveBufferBytes: "1Mi"
socketRequestMaxBytes: "100Mi"
logCleanerThreads: 1
logCleanerDeleteRetentionMs: 86400000
logRetentionCheckIntervalMs: 300000
deleteTopicEnable: true
groupInitialRebalanceDelayMs: 3000Configuration Fields
logRetentionHours
Type: integer | Kafka param: log.retention.hours | Min: 1 | Example: 168
Default number of hours to keep a log file. Mutually exclusive with logRetentionMs.
logRetentionMs
Type: integer | Kafka param: log.retention.ms | Min: -1 | Example: 604800000
Default millisecond-precision retention period. Takes priority over logRetentionHours.
Set to -1 to disable time-based retention. Mutually exclusive with logRetentionHours.
logRetentionBytes
Type: int-or-string | Kafka param: log.retention.bytes | Example: 1Gi
Default maximum log size per partition. Old segments are deleted when this limit is exceeded.
Use -1 to disable size-based retention.
logSegmentBytes
Type: int-or-string | Kafka param: log.segment.bytes | Example: 512Mi
Default maximum size of a single log segment file. A message must fit within one segment.
messageMaxBytes
Type: int-or-string | Kafka param: message.max.bytes | Example: 1Mi
Default maximum size of a message the broker can receive.
Must not exceed logSegmentBytes or socketRequestMaxBytes.
compressionType
Type: string | Kafka param: compression.type
Allowed values: producer, uncompressed, gzip, snappy, lz4, zstd
Default broker-level compression for topic messages.
producer retains whatever compression the producer used (recommended to avoid re-compression).
autoCreateTopicsEnable
Type: boolean | Kafka param: auto.create.topics.enable
Default setting for automatic topic creation on first access.
Disable in production for explicit topic lifecycle control.
logCleanupPolicy
Type: string | Kafka param: log.cleanup.policy
Allowed values: delete, compact, delete,compact
Default cleanup policy for log segments when retention limits are exceeded.
numNetworkThreads
Type: integer | Kafka param: num.network.threads | Min: 1 | Example: 3
Number of threads handling network requests.
Increase when broker metrics show high NetworkProcessorAvgIdlePercent.
Validation: must not exceed 2 × CPU cores.
numIoThreads
Type: integer | Kafka param: num.io.threads | Min: 1 | Example: 8
Number of threads performing disk I/O for requests.
A good starting point is 8 × number of disks (typically 1 disk per broker).
Validation: must not exceed 4 × CPU cores.
numRecoveryThreadsPerDataDir
Type: integer | Kafka param: num.recovery.threads.per.data.dir | Min: 1 | Example: 1
Number of threads used for log recovery at startup and flushing at shutdown per data directory.
socketSendBufferBytes
Type: int-or-string | Kafka param: socket.send.buffer.bytes | Example: 1Mi
SO_SNDBUF buffer size for socket send operations. Increase for high-latency or high-throughput connections.
socketReceiveBufferBytes
Type: int-or-string | Kafka param: socket.receive.buffer.bytes | Example: 1Mi
SO_RCVBUF buffer size for socket receive operations. Increase for high-latency or high-throughput connections.
socketRequestMaxBytes
Type: int-or-string | Kafka param: socket.request.max.bytes | Example: 100Mi
Maximum number of bytes in a socket request.
Must be ≥ messageMaxBytes, otherwise the broker cannot accept a full-sized message.
logCleanerThreads
Type: integer | Kafka param: log.cleaner.threads | Min: 1 | Example: 1
Number of background threads used for log compaction.
Validation: must not exceed the number of CPU cores.
logCleanerDeleteRetentionMs
Type: integer | Kafka param: log.cleaner.delete.retention.ms | Min: 0 | Example: 86400000
How long a tombstone marker (delete marker for a compacted topic) is retained after the associated key is deleted.
Gives consumers time to observe the deletion before the marker is removed.
logRetentionCheckIntervalMs
Type: integer | Kafka param: log.retention.check.interval.ms | Min: 1000 | Example: 300000
How often the log cleaner checks for segments eligible for deletion (in milliseconds).
deleteTopicEnable
Type: boolean | Kafka param: delete.topic.enable
Controls whether topics can be deleted via the admin API.
Disable to prevent accidental data loss in production environments.
groupInitialRebalanceDelayMs
Type: integer | Kafka param: group.initial.rebalance.delay.ms | Min: 0 | Max: 300000 | Example: 3000
How long the group coordinator waits for more consumers to join before triggering the first rebalance.
A shorter delay reduces startup time; a longer delay produces more balanced consumer groups.
Default values set by Kafka Operator:
logRetentionHours:168(7 days)compressionType:producerautoCreateTopicsEnable:false
Affinity
Standard Kubernetes mechanism for managing scheduling of deployed pods.
spec:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: "node.deckhouse.io/group"
operator: "In"
values:
- "kafka"Tolerations
Standard Kubernetes mechanism for managing scheduling of deployed pods.
spec:
tolerations:
- key: primary-role
operator: Equal
value: kafka
effect: NoScheduleNode Selector
Standard Kubernetes mechanism for managing scheduling of deployed pods.
spec:
nodeSelector:
"node.deckhouse.io/group": "kafka"Usage Examples
Basic Usage
apiVersion: managed-services.deckhouse.io/v1alpha1
kind: KafkaClass
metadata:
name: default
spec:
overridableConfiguration:
- logRetentionHours
- logRetentionMs
- logRetentionBytes
- messageMaxBytes
- compressionType
- autoCreateTopicsEnable
configuration:
logRetentionHours: 168
logSegmentBytes: "512Mi"
logCleanupPolicy: delete
compressionType: producer
numNetworkThreads: 3
numIoThreads: 8
numRecoveryThreadsPerDataDir: 1
socketSendBufferBytes: "1Mi"
socketReceiveBufferBytes: "1Mi"
socketRequestMaxBytes: "100Mi"
logCleanerThreads: 1
logCleanerDeleteRetentionMs: 86400000
logRetentionCheckIntervalMs: 300000
deleteTopicEnable: true
groupInitialRebalanceDelayMs: 3000
sizingPolicies:
- cores:
min: 1
max: 3
memory:
min: "0.5Gi"
max: "8Gi"
step: "512Mi"
coreFractions: ["25%", "50%", "100%"]
- cores:
min: 4
max: 16
memory:
min: "8Gi"
max: "64Gi"
step: "1Gi"
coreFractions: ["50%", "100%"]
validations:
- message: "messageMaxBytes must not exceed logSegmentBytes (a message must fit in one segment)"
rule: "configuration.messageMaxBytes <= configuration.logSegmentBytes"
- message: "logRetentionHours must be at least 1"
rule: "configuration.logRetentionHours >= 1"
- message: "logRetentionMs and logRetentionHours are mutually exclusive — set only one"
rule: "configuration.logRetentionMs == 0 || configuration.logRetentionHours == 0"
- message: "socketRequestMaxBytes must be >= messageMaxBytes (otherwise the broker rejects the request)"
rule: "configuration.socketRequestMaxBytes >= configuration.messageMaxBytes"
- message: "numNetworkThreads must not exceed 2 × CPU cores"
rule: "configuration.numNetworkThreads <= instance.cpu.cores * 2"
- message: "numIoThreads must not exceed 4 × CPU cores"
rule: "configuration.numIoThreads <= instance.cpu.cores * 4"
- message: "logCleanerThreads must not exceed the number of CPU cores"
rule: "configuration.logCleanerThreads <= instance.cpu.cores"