Skip to main content

Connector Examples

The following are some examples for each connector.

WfRunSinkConnector

The WfRunSinkConnector runs a WfRun (via rpc RunWf) for every record.

Message PartDescriptionTypeValid Values
keyOptionally specify the WfRunIdSTRany
valueDefine the variables field of the workflowmapkey-value not null

More about run workflow fields at RunWfRequest.

You can manipulate the message structure with Single Message Transformations (SMTs).

Quick Example

Next workflow executes a task that receives a String as parameter called name:

Workflow workflow = Workflow.newWorkflow("greetings", wf -> {
WfRunVariable name = wf.declareStr("name");
wf.execute("greet", name);
});

There is a topic names, with this data in the topic:

key: null, value: {"name":"Anakin Skywalker"}
key: null, value: {"name":"Luke Skywalker"}
key: null, value: {"name":"Leia Organa"}
key: null, value: {"name":"Padme Amidala"}

Next connector configuration will execute WfRuns with the variable name.

{
"tasks.max": 2,
"topics": "names",
"connector.class": "io.littlehorse.connect.WfRunSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"lhc.api.port": 2023,
"lhc.api.host": "localhost",
"lhc.tenant.id": "default",
"wf.spec.name": "greetings"
}

ExternalEventSinkConnector

Message PartDescriptionTypeValid Values
keyDefine the associated wf_run_idstringnon-empty string
valueDefine the content of the external eventanyany not null

More about external event fields at PutExternalEventRequest.

You can manipulate the message structure with Single Message Transformations (SMTs).

Quick Example

Next workflow waits for the event set-name to assign the variable name and then execute the task greet.

Workflow workflow = Workflow.newWorkflow("greetings", wf -> {
WfRunVariable name = wf.declareStr("name");
name.assign(wf.waitForEvent("set-name"));
wf.execute("greet", name);
});

There is a topic name with this data:

key: 64512de2a4b5470a9a8a2846b9a8a444, value: Anakin Skywalker
key: 79af0ae572bb4c19842c19dd7cad6598, value: Luke Skywalker
key: 30e1afe9a30748339594cadc3d537ecd, value: Leia Organa
key: e01547de3d294efdb6417abf35f3c960, value: Padme Amidala

Next configuration will execute external events where the message key will be the WfRunId and the message value will be the Content (more at PutExternalEventRequest):

{
"tasks.max": 2,
"topics": "names",
"connector.class": "io.littlehorse.connect.ExternalEventSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"lhc.api.port": 2023,
"lhc.api.host": "localhost",
"lhc.tenant.id": "default",
"external.event.name": "set-name"
}