LittleHorse Connectors for Kafka Connect
These connectors allow data transfer between Apache Kafka and LittleHorse.
WfRunSinkConnector
This connector allows you to execute WfRuns into LittleHorse. It supports all the Variable Types provided by LittleHorse.
More about running workflows at LittleHorse Quickstart.
Expected Message Structure
Message Part | Description | Type | Valid Values |
---|---|---|---|
key | Ignored | any | any |
value | Define the variables field of the workflow | map | key-value not null |
More about run workflow fields at RunWfRequest.
You can manipulate the message structure with Single Message Transformations (SMTs).
Quick Example
Next workflow executes a task that receives a String
as parameter called name
:
Workflow workflow = Workflow.newWorkflow("greetings", wf -> {
WfRunVariable name = wf.declareStr("name");
wf.execute("greet", name);
});
There is a topic names
, with this data in the topic:
key: null, value: {"name":"Anakin Skywalker"}
key: null, value: {"name":"Luke Skywalker"}
key: null, value: {"name":"Leia Organa"}
key: null, value: {"name":"Padme Amidala"}
Next connector configuration will execute WfRuns
with the variable name
.
{
"tasks.max": 2,
"topics": "names",
"connector.class": "io.littlehorse.connect.WfRunSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"lhc.api.port": 2023,
"lhc.api.host": "localhost",
"lhc.tenant.id": "default",
"wf.spec.name": "greetings"
}
More configurations at WfRun Sink Connector Configurations.
ExternalEventSinkConnector
This connector allows you to execute External Events into LittleHorse.
More about running external events at LittleHorse External Events.
Expected Message Structure
Message Part | Description | Type | Valid Values |
---|---|---|---|
key | Define the associated wf_run_id | string | non-empty string |
value | Define the content of the external event | any | any not null |
More about external event fields at PutExternalEventRequest.
You can manipulate the message structure with Single Message Transformations (SMTs).
Quick Example
Next workflow waits for the event set-name
to assign the variable name
and then execute the task greet
.
Workflow workflow = Workflow.newWorkflow("greetings", wf -> {
WfRunVariable name = wf.declareStr("name");
name.assign(wf.waitForEvent("set-name"));
wf.execute("greet", name);
});
There is a topic name
with this data:
key: 64512de2a4b5470a9a8a2846b9a8a444, value: Anakin Skywalker
key: 79af0ae572bb4c19842c19dd7cad6598, value: Luke Skywalker
key: 30e1afe9a30748339594cadc3d537ecd, value: Leia Organa
key: e01547de3d294efdb6417abf35f3c960, value: Padme Amidala
Next configuration will execute external events where the message key will be the WfRunId
and
the message value will be the Content
(more at PutExternalEventRequest):
{
"tasks.max": 2,
"topics": "names",
"connector.class": "io.littlehorse.connect.ExternalEventSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"lhc.api.port": 2023,
"lhc.api.host": "localhost",
"lhc.tenant.id": "default",
"external.event.name": "set-name"
}
More configurations at ExternalEvent Sink Connector Configurations.
Idempotent Writes
To ensure idempotency, we generate a unique id
for each request to LH with the next format: {connector name}-{topic name}-{partition}-{offset}
.
Multiple Tasks
These connectors support parallelism by running more than one task.
Specify the number of tasks in the tasks.max
configuration parameter.
More configurations at Configure Sink Connector.
Dead Letter Queue
These connectors support Dead Letter Queue (DLQ).
More about DLQs at Kafka Connect Dead Letter Queue.
Converters
These connectors support Protobuf
, Json
and Avro
through converters.
More about converters at Kafka Connect Converters
External Secrets
Kafka connect ensures provisioning secrets through the ConfigProvider interface, so these connectors support external secrets by default.
More about secrets at Externalize Secrets.
Configurations
- WfRun Sink Connector Configurations.
- ExternalEvent Sink Connector Configurations.
- Kafka Sink Connector Configurations.
- LittleHorse Client Configurations.
Download
For all available versions go to GitHub Releases.
Versioning
We use Semantic Versioning
where major.minor
numbers indicate littlehorse version compatibility, and the patch
digit indicates the lh-kafka-connect
bundle version.
major
LittleHorse servermajor
version compatibility.minor
LittleHorse serverminor
version compatibility.patch
LittleHorse Connectors bundle version.
Examples
For more examples go to examples.
Dependencies
- Java version 11 or greater is required.
- Developed and tested against Apache Kafka version 3.8 and 3.9, equivalents to Confluent Platform 7.8 and 7.9.
- Developed and tested against LittleHorse version 0.12 and 0.13.