Skip to main content

Concepts

All three Apache Kafka Sink Connectors provided by LittleHorse Connect have similar structure and behaviors.

  • Expected Apache Kafka message structure.
  • Required configurations for the LittleHorse Clients.
  • Installation.

Background: Kafka Sink Connectors

All three Connectors are Sink Connectors, which means they read an Apache Kafka Connect SinkRecord. See the Kafka code for SinkRecord and the super class ConnectRecord.

A ConnectRecord has a key and a value, both of which have a Kafka Connect Schema. A Schema specifies either a primitive type or a Struct, which is a collection of fields and typed values.

The diagram below shows the lifecycle of a record as it moves from a Kafka Topic, through a Kafka Sink Connector, and into the external system:

A depiction of a SinkRecord's journey through a Kafka Sink Connector.
Kafka Sink Connector Overview

When the Kafka Connect worker polls records from the Kafka Topic, each record goes through the following steps:

  1. A consumer inside the Kafka Connect Worker gets a ConsumerRecord.
  2. The configured Converter (part of the Connector configuration) converts the ConsumerRecord into a ConnectRecord.
  3. Zero or more Kafka Connect Transformations are applied to the ConnectRecord, according to the Connector configuration. Transforms can rename, drop, or add fields, and even filter out messages.
  4. Lastly, the final ConnectRecord is passed into the Sink Connector, which processes it by sending it to an external system.

In the case of LittleHorse Connect, the system in step 4 is none other than the LittleHorse Kernel!

Processing Messages

The LittleHorse Connectors all make a GRPC request, utilizing both the message key and message value.

LittleHorse Variable Types

In the LittleHorse Kernel, all values are represented by the VariableValue protobuf message.

The following table explains how Kafka Connect ConnectRecord keys and values are mapped into LittleHorse VariableValues. As of LittleHorse 0.14, the following types are supported in LittleHorse:

LittleHorse TypeJava TypeKafka Connect Schema TypeNotes
INTLong or IntegerINT64, INT32, INT16, or INT8LittleHorse SDK's always cast it to 64-bits.
DOUBLEDouble or FloatFLOAT32 or FLOAT64LittleHorse SDK's always cast it to 64-bits.
STRStringSTRING
BYTESbyte[]BYTES
JSON_ARRListARRAYArrays in LittleHorse are stored in JSON format.
JSON_OBJMapMAP or StructObjects in LittleHorse are stored in JSON format.
BOOLBooleanBOOLEAN

GRPC Payloads

The following table explains how each LittleHorse Sink Connector behaves, including which GRPC request it makes, how it utilizes the ConnectRecord key, and how it utilizes the ConnectRecord value.

ConnectorGRPC RequestField Set by KeyField Set by ValueNotes
WfRunSinkConnectorrpc RunWfidvariablesThe ConnectRecord's value must be a Kafka Connect MAP or STRUCT. Each entry becomes a variable in the RunWfRequest.
ExternalEventSinkConnectorrpc PutExternalEventwf_run_idcontent
CorrelatedEventSinkConnectorrpc PutCorrelatedEventkeycontentThe ConnectRecord key cannot be null

If the ConnectRecord key is set, then all of the connectors are idempotent. Note that the CorrelatedEventSinkConnector requires that the key is set.

Developer Notes

The following section contains useful notes for developers using the connectors.

Data Types

Note that LittleHorse kernel is data type aware. When reading data from the Kafka topic with any of our three Connectors, the data types in the topic correlate with the data LittleHorse kernel expects.

A common issue is with the Boolean data type. If LittleHorse kernel expects a Boolean type "True" or "False", this must match Boolean data type in the schema of the topic.

For testing it is common to use kafka-console-producer.sh tool provided by Apache Kafka, this tool can only produce String or Integer values. In order to accuratly send a primitive type other than String or Integer you must use a converter in the Kafka Connect connector configuration.

Example:

{
"name": "external-identity-verified",
"config": {
"tasks.max": 2,
"topics": "names",
"connector.class": "io.littlehorse.connect.ExternalEventSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"lhc.api.port": 2023,
"lhc.api.host": "localhost",
"lhc.tenant.id": "default",
"transforms": "Cast",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "boolean",
"external.event.name": "identity-verified"
}
}

Note the lines that begin with "transforms", with those we are casting the String data type sent by kafka-console-producer.sh to the primitive Boolean.

For more information:

Converters

These connectors support Protobuf, Json and Avro through converters.

More about converters at Kafka Connect Converters

External Secrets

Kafka connect ensures provisioning secrets through the ConfigProvider interface, so these connectors support external secrets by default.

More about secrets at Externalize Secrets.