Skip to main content

LittleHorse Connectors for Kafka Connect

These connectors allow data transfer between Apache Kafka and LittleHorse.

WfRunSinkConnector

This connector allows you to execute WfRuns into LittleHorse. It supports all the Variable Types provided by LittleHorse.

More about running workflows at LittleHorse Quickstart.

Expected Message Structure

Message PartDescriptionTypeValid Values
keyIgnoredanyany
valueDefine the variables field of the workflowmapkey-value not null

More about run workflow fields at RunWfRequest.

You can manipulate the message structure with Single Message Transformations (SMTs).

Quick Example

Next workflow executes a task that receives a String as parameter called name:

Workflow workflow = Workflow.newWorkflow("greetings", wf -> {
WfRunVariable name = wf.declareStr("name");
wf.execute("greet", name);
});

There is a topic names, with this data in the topic:

key: null, value: {"name":"Anakin Skywalker"}
key: null, value: {"name":"Luke Skywalker"}
key: null, value: {"name":"Leia Organa"}
key: null, value: {"name":"Padme Amidala"}

Next connector configuration will execute WfRuns with the variable name.

{
"tasks.max": 2,
"topics": "names",
"connector.class": "io.littlehorse.connect.WfRunSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"lhc.api.port": 2023,
"lhc.api.host": "localhost",
"lhc.tenant.id": "default",
"wf.spec.name": "greetings"
}

More configurations at WfRun Sink Connector Configurations.

ExternalEventSinkConnector

This connector allows you to execute External Events into LittleHorse.

More about running external events at LittleHorse External Events.

Expected Message Structure

Message PartDescriptionTypeValid Values
keyDefine the associated wf_run_idstringnon-empty string
valueDefine the content of the external eventanyany not null

More about external event fields at PutExternalEventRequest.

You can manipulate the message structure with Single Message Transformations (SMTs).

Quick Example

Next workflow waits for the event set-name to assign the variable name and then execute the task greet.

Workflow workflow = Workflow.newWorkflow("greetings", wf -> {
WfRunVariable name = wf.declareStr("name");
name.assign(wf.waitForEvent("set-name"));
wf.execute("greet", name);
});

There is a topic name with this data:

key: 64512de2a4b5470a9a8a2846b9a8a444, value: Anakin Skywalker
key: 79af0ae572bb4c19842c19dd7cad6598, value: Luke Skywalker
key: 30e1afe9a30748339594cadc3d537ecd, value: Leia Organa
key: e01547de3d294efdb6417abf35f3c960, value: Padme Amidala

Next configuration will execute external events where the message key will be the WfRunId and the message value will be the Content (more at PutExternalEventRequest):

{
"tasks.max": 2,
"topics": "names",
"connector.class": "io.littlehorse.connect.ExternalEventSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"lhc.api.port": 2023,
"lhc.api.host": "localhost",
"lhc.tenant.id": "default",
"external.event.name": "set-name"
}

More configurations at ExternalEvent Sink Connector Configurations.

Idempotent Writes

To ensure idempotency, we generate a unique id for each request to LH with the next format: {connector name}-{topic name}-{partition}-{offset}.

Multiple Tasks

These connectors support parallelism by running more than one task. Specify the number of tasks in the tasks.max configuration parameter.

More configurations at Configure Sink Connector.

Dead Letter Queue

These connectors support Dead Letter Queue (DLQ).

More about DLQs at Kafka Connect Dead Letter Queue.

Converters

These connectors support Protobuf, Json and Avro through converters.

More about converters at Kafka Connect Converters

External Secrets

Kafka connect ensures provisioning secrets through the ConfigProvider interface, so these connectors support external secrets by default.

More about secrets at Externalize Secrets.

Configurations

Download

For all available versions go to GitHub Releases.

Versioning

We use Semantic Versioning where major.minor numbers indicate littlehorse version compatibility, and the patch digit indicates the lh-kafka-connect bundle version.

  • major LittleHorse server major version compatibility.
  • minor LittleHorse server minor version compatibility.
  • patch LittleHorse Connectors bundle version.

Examples

For more examples go to examples.

Dependencies

  • Java version 11 or greater is required.
  • Developed and tested against Apache Kafka version 3.8 and 3.9, equivalents to Confluent Platform 7.8 and 7.9.
  • Developed and tested against LittleHorse version 0.12 and 0.13.