The module lifecycle stagePreview
The module has requirements for installation

KafkaClass

KafkaClass is the primary resource administrators work with. The managed-kafka module manages Kafka instances in Deckhouse Kubernetes Platform, and KafkaClass is the cluster-scoped resource that governs every Kafka instance associated with it: it defines allowed sizing combinations, sets broker configuration defaults, controls which parameters users are allowed to override, and enforces validation rules.

Every Kafka resource must reference an existing KafkaClass. All configuration is validated against the referenced class before the service is deployed.

A default KafkaClass is shipped with the module and contains ready-to-use baseline settings suitable for most workloads. You can use it as-is, or copy and modify it to create custom classes tailored to specific teams or environments.

Sizing Policies

The structure allows creating a set of sizing policies for associated Kafka resources.
This helps avoid uneven distribution of CPU and Memory resources on cluster nodes.
The determining factor for choosing a particular policy is falling within the cores interval.
Then compliance with other fields of the specific policy will be checked.

spec:
  sizingPolicies:
    - cores:
        min: 1
        max: 3
      memory:
        min: "0.5Gi"
        max: "8Gi"
        step: "512Mi"
      coreFractions: ["25%", "50%", "100%"]
    - cores:
        min: 4
        max: 16
      memory:
        min: "8Gi"
        max: "64Gi"
        step: "1Gi"
      coreFractions: ["50%", "100%"]

Validation Rules

CEL (Common Expression Language) syntax is used to create flexible validation mechanisms.
We provide a set of predefined variables that can be used in rule:

  • instance.memory.size int — pod memory limit in bytes
  • instance.cpu.cores int — number of CPU cores
  • configuration.logRetentionHours int — log retention in hours
  • configuration.logRetentionMs int — log retention in milliseconds (-1 = disabled)
  • configuration.logRetentionBytes int — log retention in bytes (-1 = disabled)
  • configuration.logSegmentBytes int — maximum log segment size in bytes
  • configuration.messageMaxBytes int — maximum message size in bytes
  • configuration.compressionType string — broker compression type
  • configuration.numNetworkThreads int — number of network threads
  • configuration.numIoThreads int — number of I/O threads
  • configuration.logCleanerThreads int — number of log cleaner threads
  • configuration.socketRequestMaxBytes int — maximum socket request size in bytes
spec:
  validations:
    - message: "messageMaxBytes must not exceed logSegmentBytes (a message must fit in one segment)"
      rule: "configuration.messageMaxBytes <= configuration.logSegmentBytes"
    - message: "logRetentionHours must be at least 1"
      rule: "configuration.logRetentionHours >= 1"
    - message: "logRetentionMs and logRetentionHours are mutually exclusive — set only one"
      rule: "configuration.logRetentionMs == 0 || configuration.logRetentionHours == 0"
    - message: "socketRequestMaxBytes must be >= messageMaxBytes (otherwise the broker rejects the request)"
      rule: "configuration.socketRequestMaxBytes >= configuration.messageMaxBytes"
    - message: "numNetworkThreads must not exceed 2 × CPU cores"
      rule: "configuration.numNetworkThreads <= instance.cpu.cores * 2"
    - message: "numIoThreads must not exceed 4 × CPU cores"
      rule: "configuration.numIoThreads <= instance.cpu.cores * 4"
    - message: "logCleanerThreads must not exceed the number of CPU cores"
      rule: "configuration.logCleanerThreads <= instance.cpu.cores"

Overridable Configuration

A whitelist of configuration parameters that can be overridden in the Kafka resource.
List of all possible parameters.

spec:
  overridableConfiguration:
    - logRetentionHours
    - logRetentionMs
    - logRetentionBytes
    - messageMaxBytes
    - compressionType
    - autoCreateTopicsEnable

Configuration

Configuration parameters that can be defined at the KafkaClass level.
Values of these parameters will override defaults for all associated Kafka resources.
Note: Parameters that were allowed and overridden in overridableConfiguration will take priority.

spec:
  configuration:
    logRetentionHours: 168
    logSegmentBytes: "512Mi"
    logCleanupPolicy: delete
    compressionType: producer
    numNetworkThreads: 3
    numIoThreads: 8
    numRecoveryThreadsPerDataDir: 1
    socketSendBufferBytes: "1Mi"
    socketReceiveBufferBytes: "1Mi"
    socketRequestMaxBytes: "100Mi"
    logCleanerThreads: 1
    logCleanerDeleteRetentionMs: 86400000
    logRetentionCheckIntervalMs: 300000
    deleteTopicEnable: true
    groupInitialRebalanceDelayMs: 3000

Configuration Fields

logRetentionHours

Type: integer | Kafka param: log.retention.hours | Min: 1 | Example: 168

Default number of hours to keep a log file. Mutually exclusive with logRetentionMs.

logRetentionMs

Type: integer | Kafka param: log.retention.ms | Min: -1 | Example: 604800000

Default millisecond-precision retention period. Takes priority over logRetentionHours.
Set to -1 to disable time-based retention. Mutually exclusive with logRetentionHours.

logRetentionBytes

Type: int-or-string | Kafka param: log.retention.bytes | Example: 1Gi

Default maximum log size per partition. Old segments are deleted when this limit is exceeded.
Use -1 to disable size-based retention.

logSegmentBytes

Type: int-or-string | Kafka param: log.segment.bytes | Example: 512Mi

Default maximum size of a single log segment file. A message must fit within one segment.

messageMaxBytes

Type: int-or-string | Kafka param: message.max.bytes | Example: 1Mi

Default maximum size of a message the broker can receive.
Must not exceed logSegmentBytes or socketRequestMaxBytes.

compressionType

Type: string | Kafka param: compression.type
Allowed values: producer, uncompressed, gzip, snappy, lz4, zstd

Default broker-level compression for topic messages.
producer retains whatever compression the producer used (recommended to avoid re-compression).

autoCreateTopicsEnable

Type: boolean | Kafka param: auto.create.topics.enable

Default setting for automatic topic creation on first access.
Disable in production for explicit topic lifecycle control.

logCleanupPolicy

Type: string | Kafka param: log.cleanup.policy
Allowed values: delete, compact, delete,compact

Default cleanup policy for log segments when retention limits are exceeded.

numNetworkThreads

Type: integer | Kafka param: num.network.threads | Min: 1 | Example: 3

Number of threads handling network requests.
Increase when broker metrics show high NetworkProcessorAvgIdlePercent.
Validation: must not exceed 2 × CPU cores.

numIoThreads

Type: integer | Kafka param: num.io.threads | Min: 1 | Example: 8

Number of threads performing disk I/O for requests.
A good starting point is 8 × number of disks (typically 1 disk per broker).
Validation: must not exceed 4 × CPU cores.

numRecoveryThreadsPerDataDir

Type: integer | Kafka param: num.recovery.threads.per.data.dir | Min: 1 | Example: 1

Number of threads used for log recovery at startup and flushing at shutdown per data directory.

socketSendBufferBytes

Type: int-or-string | Kafka param: socket.send.buffer.bytes | Example: 1Mi

SO_SNDBUF buffer size for socket send operations. Increase for high-latency or high-throughput connections.

socketReceiveBufferBytes

Type: int-or-string | Kafka param: socket.receive.buffer.bytes | Example: 1Mi

SO_RCVBUF buffer size for socket receive operations. Increase for high-latency or high-throughput connections.

socketRequestMaxBytes

Type: int-or-string | Kafka param: socket.request.max.bytes | Example: 100Mi

Maximum number of bytes in a socket request.
Must be ≥ messageMaxBytes, otherwise the broker cannot accept a full-sized message.

logCleanerThreads

Type: integer | Kafka param: log.cleaner.threads | Min: 1 | Example: 1

Number of background threads used for log compaction.
Validation: must not exceed the number of CPU cores.

logCleanerDeleteRetentionMs

Type: integer | Kafka param: log.cleaner.delete.retention.ms | Min: 0 | Example: 86400000

How long a tombstone marker (delete marker for a compacted topic) is retained after the associated key is deleted.
Gives consumers time to observe the deletion before the marker is removed.

logRetentionCheckIntervalMs

Type: integer | Kafka param: log.retention.check.interval.ms | Min: 1000 | Example: 300000

How often the log cleaner checks for segments eligible for deletion (in milliseconds).

deleteTopicEnable

Type: boolean | Kafka param: delete.topic.enable

Controls whether topics can be deleted via the admin API.
Disable to prevent accidental data loss in production environments.

groupInitialRebalanceDelayMs

Type: integer | Kafka param: group.initial.rebalance.delay.ms | Min: 0 | Max: 300000 | Example: 3000

How long the group coordinator waits for more consumers to join before triggering the first rebalance.
A shorter delay reduces startup time; a longer delay produces more balanced consumer groups.

Default values set by Kafka Operator:

  • logRetentionHours: 168 (7 days)
  • compressionType: producer
  • autoCreateTopicsEnable: false

Affinity

Standard Kubernetes mechanism for managing scheduling of deployed pods.

spec:
  nodeAffinity:
    requiredDuringSchedulingIgnoredDuringExecution:
      nodeSelectorTerms:
      - matchExpressions:
        - key: "node.deckhouse.io/group"
          operator: "In"
          values:
          - "kafka"

Tolerations

Standard Kubernetes mechanism for managing scheduling of deployed pods.

spec:
  tolerations:
  - key: primary-role
    operator: Equal
    value: kafka
    effect: NoSchedule

Node Selector

Standard Kubernetes mechanism for managing scheduling of deployed pods.

spec:
  nodeSelector:
    "node.deckhouse.io/group": "kafka"

Usage Examples

Basic Usage

apiVersion: managed-services.deckhouse.io/v1alpha1
kind: KafkaClass
metadata:
  name: default
spec:
  overridableConfiguration:
    - logRetentionHours
    - logRetentionMs
    - logRetentionBytes
    - messageMaxBytes
    - compressionType
    - autoCreateTopicsEnable

  configuration:
    logRetentionHours: 168
    logSegmentBytes: "512Mi"
    logCleanupPolicy: delete
    compressionType: producer
    numNetworkThreads: 3
    numIoThreads: 8
    numRecoveryThreadsPerDataDir: 1
    socketSendBufferBytes: "1Mi"
    socketReceiveBufferBytes: "1Mi"
    socketRequestMaxBytes: "100Mi"
    logCleanerThreads: 1
    logCleanerDeleteRetentionMs: 86400000
    logRetentionCheckIntervalMs: 300000
    deleteTopicEnable: true
    groupInitialRebalanceDelayMs: 3000

  sizingPolicies:
    - cores:
        min: 1
        max: 3
      memory:
        min: "0.5Gi"
        max: "8Gi"
        step: "512Mi"
      coreFractions: ["25%", "50%", "100%"]
    - cores:
        min: 4
        max: 16
      memory:
        min: "8Gi"
        max: "64Gi"
        step: "1Gi"
      coreFractions: ["50%", "100%"]

  validations:
    - message: "messageMaxBytes must not exceed logSegmentBytes (a message must fit in one segment)"
      rule: "configuration.messageMaxBytes <= configuration.logSegmentBytes"
    - message: "logRetentionHours must be at least 1"
      rule: "configuration.logRetentionHours >= 1"
    - message: "logRetentionMs and logRetentionHours are mutually exclusive — set only one"
      rule: "configuration.logRetentionMs == 0 || configuration.logRetentionHours == 0"
    - message: "socketRequestMaxBytes must be >= messageMaxBytes (otherwise the broker rejects the request)"
      rule: "configuration.socketRequestMaxBytes >= configuration.messageMaxBytes"
    - message: "numNetworkThreads must not exceed 2 × CPU cores"
      rule: "configuration.numNetworkThreads <= instance.cpu.cores * 2"
    - message: "numIoThreads must not exceed 4 × CPU cores"
      rule: "configuration.numIoThreads <= instance.cpu.cores * 4"
    - message: "logCleanerThreads must not exceed the number of CPU cores"
      rule: "configuration.logCleanerThreads <= instance.cpu.cores"