Trigger Trigger

type: "io.kestra.plugin.kafka.Trigger"

Consume messages periodically from Kafka topics and create one execution per batch.

Note that you don't need an extra task to consume the message from the event trigger. The trigger will automatically consume messages and you can retrieve their content in your flow using the {{ trigger.uri }} variable. If you would like to consume each message from a Kafka topic in real-time and create one execution per message, you can use the io.kestra.plugin.kafka.RealtimeTrigger instead.


id: kafka_trigger

  - id: log
    type: io.kestra.plugin.core.log.Log
    message: "{{ trigger.value }}"

  - id: trigger
    type: io.kestra.plugin.kafka.Trigger
    topic: test_kestra
      bootstrap.servers: localhost:9092
      schema.registry.url: http://localhost:8085
      keyDeserializer: STRING
      valueDeserializer: AVRO
    interval: PT30S
    maxRecords: 5
    groupId: kafkaConsumerGroupId



  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Kafka consumer group ID.

Using a consumer group, we will fetch only records that haven't been consumed yet.


  • Type: object
  • Dynamic:
  • Required: ✔️


  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: 5.000000000
  • Format: duration

How often to poll for a record.

If no records are available, the maximum wait duration to wait for new records.


  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required: ✔️

Kafka connection properties.

The bootstrap.servers property is a minimal required configuration to connect to a Kafka topic. This property can reference any valid Consumer Configs or Producer Configs as key-value pairs.

If you want to pass a truststore or a keystore, you must provide a base64 encoded string for ssl.keystore.location and ssl.truststore.location.


  • Type: object
  • Dynamic:
  • Required: ✔️


  • Type: array
  • SubType: Condition
  • Dynamic:
  • Required:

List of conditions in order to limit the flow trigger.


  • Type: string
  • Dynamic:
  • Required:
  • Default: 60.000000000
  • Format: duration

Interval between polling.

The interval between 2 different polls of schedule, this can avoid to overload the remote system with too many calls. For most of the triggers that depend on external systems, a minimal interval must be at least PT30S. See ISO_8601 Durations for more information of available interval values.


  • Type: string
  • Dynamic:
  • Required:
  • Format: duration

The maximum duration to wait for new records before stopping the consumption process.

It's a soft limit evaluated every second.


  • Type: integer
  • Dynamic:
  • Required:

The maximum number of records to fetch before stopping the consumption process.

It's a soft limit evaluated every second.


  • Type: array
  • SubType: integer
  • Dynamic:
  • Required:

Topic partitions to consume messages from.

Manually assign a list of partitions to the consumer.


  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:
  • Default: {}

Serializer configuration

Configuration that will be passed to serializer or deserializer. The avro.use.logical.type.converters is always passed when you have any values set to true.


  • Type: string
  • Dynamic: ✔️
  • Required:

Timestamp of a message to start consuming messages from.

By default, we consume all messages from the topics with no consumer group or depending on the configuration of the auto.offset.reset property. However, you can provide an arbitrary start time. This property is ignored if a consumer group is used. It must be a valid ISO 8601 date.


  • Type: array
  • SubType: string
  • Dynamic:
  • Required:

List of execution states after which a trigger should be stopped (a.k.a. disabled).


  • Type: object
  • Dynamic: ✔️
  • Required:

Kafka topic(s) to consume messages from.

It can be a string or a list of strings to consume from one or multiple topics.


  • Type: string
  • Dynamic: ✔️
  • Required:

Kafka topic pattern to consume messages from.

Consumer will subscribe to all topics matching the specified pattern to get dynamically assigned partitions.



  • Type: integer
  • Required:

Number of messages consumed from a Kafka topic.


  • Type: string
  • Required:
  • Format: uri

URI of a file in Kestra's internal storage containing the messages.


Was this page helpful?