Connector Examples
The following are some examples for each connector.
WfRunSinkConnector
The WfRunSinkConnector
runs a WfRun
(via rpc RunWf
) for every record.
Message Part | Description | Type | Valid Values |
---|---|---|---|
key | Optionally specify the WfRunId | STR | any |
value | Define the variables field of the workflow | map | key-value not null |
More about run workflow fields at RunWfRequest.
You can manipulate the message structure with Single Message Transformations (SMTs).
Quick Example
Next workflow executes a task that receives a String
as parameter called name
:
Workflow workflow = Workflow.newWorkflow("greetings", wf -> {
WfRunVariable name = wf.declareStr("name");
wf.execute("greet", name);
});
There is a topic names
, with this data in the topic:
key: null, value: {"name":"Anakin Skywalker"}
key: null, value: {"name":"Luke Skywalker"}
key: null, value: {"name":"Leia Organa"}
key: null, value: {"name":"Padme Amidala"}
Next connector configuration will execute WfRuns
with the variable name
.
{
"tasks.max": 2,
"topics": "names",
"connector.class": "io.littlehorse.connect.WfRunSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"lhc.api.port": 2023,
"lhc.api.host": "localhost",
"lhc.tenant.id": "default",
"wf.spec.name": "greetings"
}
ExternalEventSinkConnector
Message Part | Description | Type | Valid Values |
---|---|---|---|
key | Define the associated wf_run_id | string | non-empty string |
value | Define the content of the external event | any | any not null |
More about external event fields at PutExternalEventRequest.
You can manipulate the message structure with Single Message Transformations (SMTs).
Quick Example
Next workflow waits for the event set-name
to assign the variable name
and then execute the task greet
.
Workflow workflow = Workflow.newWorkflow("greetings", wf -> {
WfRunVariable name = wf.declareStr("name");
name.assign(wf.waitForEvent("set-name"));
wf.execute("greet", name);
});
There is a topic name
with this data:
key: 64512de2a4b5470a9a8a2846b9a8a444, value: Anakin Skywalker
key: 79af0ae572bb4c19842c19dd7cad6598, value: Luke Skywalker
key: 30e1afe9a30748339594cadc3d537ecd, value: Leia Organa
key: e01547de3d294efdb6417abf35f3c960, value: Padme Amidala
Next configuration will execute external events where the message key will be the WfRunId
and
the message value will be the Content
(more at PutExternalEventRequest):
{
"tasks.max": 2,
"topics": "names",
"connector.class": "io.littlehorse.connect.ExternalEventSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"lhc.api.port": 2023,
"lhc.api.host": "localhost",
"lhc.tenant.id": "default",
"external.event.name": "set-name"
}