• application.id

      An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.

    • bootstrap.servers

      A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form . Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

      Type:list
      Default:
      Valid Values:
      Importance:high
    • state.dir

      Directory location for state store. This path must be unique for each streams instance sharing the same underlying filesystem.

      Type:string
      Default:/var/folders/5w/m48dfpps5fj1byw1ldmq3v5w0000gp/T//kafka-streams
      Valid Values:
      Importance:high
    • acceptable.recovery.lag

      The maximum acceptable lag (number of offsets to catch up) for a client to be considered caught-up for an active task.Should correspond to a recovery time of well under a minute for a given workload. Must be at least 0.

      Type:long
      Default:10000
      Valid Values:[0,…]
      Importance:medium
    • cache.max.bytes.buffering

      Maximum number of memory bytes to be used for buffering across all threads

      Type:long
      Default:10485760
      Valid Values:[0,…]
      Importance:medium
    • client.id

      An ID prefix string used for the client IDs of internal consumer, producer and restore-consumer, with pattern ‘-StreamThread--‘.

      Type:string
      Default:“”
      Valid Values:
      Importance:medium
    • default.deserialization.exception.handler

      Exception handling class that implements the org.apache.kafka.streams.errors.DeserializationExceptionHandler interface.

      Type:class
      Default:org.apache.kafka.streams.errors.LogAndFailExceptionHandler
      Valid Values:
      Importance:medium
    • default.key.serde

      Default serializer / deserializer class for key that implements the org.apache.kafka.common.serialization.Serde interface. Note when windowed serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via ‘default.windowed.key.serde.inner’ or ‘default.windowed.value.serde.inner’ as well

      Type:class
      Default:null
      Valid Values:
      Importance:medium
    • default.list.key.serde.inner

      Default inner class of list serde for key that implements the org.apache.kafka.common.serialization.Serde interface. This configuration will be read if and only if default.key.serde configuration is set to org.apache.kafka.common.serialization.Serdes.ListSerde

      Type:class
      Default:null
      Valid Values:
      Importance:medium
    • default.list.key.serde.type

      Default class for key that implements the java.util.List interface. This configuration will be read if and only if default.key.serde configuration is set to org.apache.kafka.common.serialization.Serdes.ListSerde Note when list serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via ‘default.list.key.serde.inner’

      Type:class
      Default:null
      Valid Values:
      Importance:medium
    • default.list.value.serde.inner

      Default inner class of list serde for value that implements the org.apache.kafka.common.serialization.Serde interface. This configuration will be read if and only if configuration is set to org.apache.kafka.common.serialization.Serdes.ListSerde

      Type:class
      Default:null
      Valid Values:
      Importance:medium
    • default.list.value.serde.type

      Default class for value that implements the java.util.List interface. This configuration will be read if and only if default.value.serde configuration is set to org.apache.kafka.common.serialization.Serdes.ListSerde Note when list serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via ‘default.list.value.serde.inner’

      Type:class
      Default:null
      Valid Values:
      Importance:medium
    • default.production.exception.handler

      Exception handling class that implements the org.apache.kafka.streams.errors.ProductionExceptionHandler interface.

      Type:class
      Default:org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
      Valid Values:
      Importance:medium
    • default.timestamp.extractor

      Default timestamp extractor class that implements the org.apache.kafka.streams.processor.TimestampExtractor interface.

      Type:class
      Default:org.apache.kafka.streams.processor.FailOnInvalidTimestamp
      Valid Values:
      Importance:medium
    • default.value.serde

      Default serializer / deserializer class for value that implements the org.apache.kafka.common.serialization.Serde interface. Note when windowed serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via ‘default.windowed.key.serde.inner’ or ‘default.windowed.value.serde.inner’ as well

      Type:class
      Default:null
      Valid Values:
      Importance:medium
    • max.task.idle.ms

      Type:long
      Default:0
      Valid Values:
      Importance:medium
    • num.standby.replicas

      The number of standby replicas for each task.

    • num.stream.threads

      The number of threads to execute stream processing.

      Type:int
      Default:1
      Valid Values:
      Importance:medium
    • processing.guarantee

      The processing guarantee that should be used. Possible values are at_least_once (default) and exactly_once_v2 (requires brokers version 2.5 or higher). Deprecated options are (requires brokers version 0.11.0 or higher) and exactly_once_beta (requires brokers version 2.5 or higher). Note that exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production; for development you can change this, by adjusting broker setting transaction.state.log.replication.factor and transaction.state.log.min.isr.

      Type:string
      Default:at_least_once
      Valid Values:[at_least_once, exactly_once, exactly_once_beta, exactly_once_v2]
      Importance:medium
    • replication.factor

      The replication factor for change log topics and repartition topics created by the stream processing application. The default of -1 (meaning: use broker default replication factor) requires broker version 2.4 or newer

      Type:int
      Default:-1
      Valid Values:
      Importance:medium
    • security.protocol

      Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.

      Type:string
      Default:PLAINTEXT
      Valid Values:
      Importance:medium
    • task.timeout.ms

      The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of 0ms, a task would raise an error for the first internal error. For any timeout larger than 0ms, a task will retry at least once before an error is raised.

      Type:long
      Default:300000 (5 minutes)
      Valid Values:[0,…]
      Importance:medium
    • topology.optimization

      A configuration telling Kafka Streams if it should optimize the topology, disabled by default

      Type:string
      Default:none
      Valid Values:[none, all]
      Importance:medium
    • application.server

      A host:port pair pointing to a user-defined endpoint that can be used for state store discovery and interactive queries on this KafkaStreams instance.

      Type:string
      Default:“”
      Valid Values:
      Importance:low
    • buffered.records.per.partition

      Maximum number of records to buffer per partition.

      Type:int
      Default:1000
      Valid Values:
      Importance:low
    • built.in.metrics.version

      Version of the built-in metrics to use.

      Type:string
      Default:latest
      Valid Values:[latest]
      Importance:low
    • commit.interval.ms

      The frequency in milliseconds with which to save the position of the processor. (Note, if processing.guarantee is set to exactly_once_v2, exactly_once,the default value is 100, otherwise the default value is 30000.

      Type:long
      Default:30000 (30 seconds)
      Valid Values:[0,…]
      Importance:low
    • connections.max.idle.ms

      Close idle connections after the number of milliseconds specified by this config.

      Type:long
      Default:540000 (9 minutes)
      Valid Values:
      Importance:low
    • metadata.max.age.ms

      The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions.

      Type:long
      Default:300000 (5 minutes)
      Valid Values:[0,…]
      Importance:low
    • metric.reporters

      A list of classes to use as metrics reporters. Implementing the org.apache.kafka.common.metrics.MetricsReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.

      Type:list
      Default:“”
      Valid Values:
      Importance:low
    • metrics.num.samples

      Type:int
      Default:2
      Valid Values:[1,…]
      Importance:low
    • metrics.recording.level

      The highest recording level for metrics.

      Type:string
      Default:INFO
      Valid Values:[INFO, DEBUG, TRACE]
      Importance:low
    • metrics.sample.window.ms

      The window of time a metrics sample is computed over.

    • poll.ms

      The amount of time in milliseconds to block waiting for input.

      Type:long
      Default:100
      Valid Values:
      Importance:low
    • probing.rebalance.interval.ms

      The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have finished warming up and are ready to become active. Probing rebalances will continue to be triggered until the assignment is balanced. Must be at least 1 minute.

      Type:long
      Default:600000 (10 minutes)
      Valid Values:[60000,…]
      Importance:low
    • receive.buffer.bytes

      The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.

      Type:int
      Default:32768 (32 kibibytes)
      Valid Values:[-1,…]
      Importance:low
    • reconnect.backoff.max.ms

      The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.

      Type:long
      Default:1000 (1 second)
      Valid Values:[0,…]
      Importance:low
    • reconnect.backoff.ms

      The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.

      Type:long
      Default:50
      Valid Values:[0,…]
      Importance:low
    • request.timeout.ms

      The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.

      Type:int
      Default:40000 (40 seconds)
      Valid Values:[0,…]
      Importance:low
    • retries

      Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error. It is recommended to set the value to either zero or `MAX_VALUE` and use corresponding timeout parameters to control how long a client should retry a request.

      Type:int
      Default:0
      Valid Values:[0,…,2147483647]
      Importance:low
    • retry.backoff.ms

      The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

      Type:long
      Default:100
      Valid Values:[0,…]
      Importance:low
    • rocksdb.config.setter

      A Rocks DB config setter class or class name that implements the org.apache.kafka.streams.state.RocksDBConfigSetter interface

      Type:class
      Default:null
      Valid Values:
      Importance:low
    • send.buffer.bytes

      The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.

      Type:int
      Default:131072 (128 kibibytes)
      Valid Values:[-1,…]
      Importance:low
    • state.cleanup.delay.ms

      The amount of time in milliseconds to wait before deleting state when a partition has migrated. Only state directories that have not been modified for at least will be removed

      Type:long
      Default:600000 (10 minutes)
      Valid Values:
      Importance:low
    • upgrade.from

      Allows upgrading in a backward compatible way. This is needed when upgrading from [0.10.0, 1.1] to 2.0+, or when upgrading from [2.0, 2.3] to 2.4+. When upgrading from 2.4 to a newer version it is not required to specify this config. Default is `null`. Accepted values are “0.10.0”, “0.10.1”, “0.10.2”, “0.11.0”, “1.0”, “1.1”, “2.0”, “2.1”, “2.2”, “2.3” (for upgrading from the corresponding old version).

      Type:string
      Default:null
      Valid Values:[null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3]
      Importance:low
    • window.size.ms

      Sets window size for the deserializer in order to calculate window end times.

      Type:long
      Default:null
      Valid Values:
      Importance:low
    • windowed.inner.class.serde

      Type:string
      Default:null
      Valid Values:
      Importance:low
    • windowstore.changelog.additional.retention.ms

      Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day

      Type:long
      Default:86400000 (1 day)
      Valid Values:
      Importance:low