Basics
To develop a WfSpec in LittleHorse, you can use the Workflow struct or object in our SDK's. Generally, the Workflow entity constructor requires two arguments:
- The name of the
WfSpecto create. - A
ThreadFunc, which is function pointer, lambda function, or interface of some sort which contains the logic for the EntrypointThreadSpec.
The Workflow object translates your ThreadFunc into a WfSpec. As per the Metadata Management Documentation, you can easily deploy a WfSpec once you've gotten the Workflow object.
The ThreadFunc takes in one argument: a WorkflowThread. Everything you do goes through the ThreadFunc. The ThreadFunc defines a ThreadSpec, and the ThreadFunc passed into the Workflow object or struct is used to build the Entrypoint Thread.
Quickstart
Below you can find executable files that define a WfSpec with a single step: execute the greet TaskDef with the supplied first-name variable which comes as input. As a prerequisite, you need to have the greet TaskDef already registered in your LittleHorse Cluster.
If you haven't yet created a TaskDef named greet, you can do it by following our Task Worker Development Quickstart.
- Java
- Go
- Python
- C#
package io.littlehorse.quickstart;
import java.io.IOException;
import io.littlehorse.sdk.common.config.LHConfig;
import io.littlehorse.sdk.wfsdk.WfRunVariable;
import io.littlehorse.sdk.wfsdk.Workflow;
import io.littlehorse.sdk.wfsdk.WorkflowThread;
import io.littlehorse.sdk.common.proto.VariableType;
public class Main {
public static void main(String[] args) throws IOException, InterruptedException {
LHConfig config = new LHConfig();
// The `Workflow` object uses the DSL to compile the WfSpec
Workflow workflowGenerator = Workflow.newWorkflow("my-wfspec", Main::wfLogic);
// Convenience method to register the `WfSpec` automatically.
workflowGenerator.registerWfSpec(config.getBlockingStub());
}
// NOTE: this can be static or non-static.
static void wfLogic(WorkflowThread wf) {
// Required input variable.
WfRunVariable firstName = wf.declareStr("first-name").required();
// Execute the `greet` Task and pass in `first-name` as an argument.
wf.execute("greet", firstName);
}
}
package main
import (
"context"
"log"
"github.com/littlehorse-enterprises/littlehorse/sdk-go/littlehorse"
"github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto"
)
func wfLogic(wf *littlehorse.WorkflowThread) {
firstName := wf.DeclareStr("first-name").Required()
wf.Execute("greet", firstName)
}
func main() {
// Get a client
config := littlehorse.NewConfigFromEnv()
client, _ := config.GetGrpcClient()
workflowGenerator := littlehorse.NewWorkflow(wfLogic, "my-wfspec")
request, err := workflowGenerator.Compile()
if err != nil {
log.Fatal(err)
}
(*client).PutWfSpec(context.Background(), request)
}
from littlehorse.workflow import WorkflowThread, WfRunVariable, Workflow
from littlehorse.config import LHConfig
from littlehorse.model import VariableType, PutWfSpecRequest
def workflow_logic(wf: WorkflowThread) -> None:
first_name: WfRunVariable = wf.declare_str("first-name").required()
wf.execute("greet", first_name)
if __name__ == '__main__':
config = LHConfig()
client = config.stub()
workflow_generator = Workflow("my-wfspec", workflow_logic)
request: PutWfSpecRequest = workflow_generator.compile()
client.PutWfSpec(request)
using LittleHorse.Sdk;
using LittleHorse.Sdk.Common.Proto;
using LittleHorse.Sdk.Workflow.Spec;
public class Program
{
static void Main(string[] args)
{
var config = new LHConfig();
var lhClient = config.GetGrpcClientInstance();
var workflow = new Workflow("my-wfspec", WfLogic);
workflow.RegisterWfSpec(lhClient);
}
// NOTE: this can be static or non-static.
static void WfLogic(WorkflowThread wf)
{
// Required input variable.
WfRunVariable firstName = wf.DeclareStr("first-name").Required();
// Execute the `greet` Task and pass in `first-name` as an argument.
wf.Execute("greet", firstName);
}
}
At this point, whether you used python, go, or Java for the WfSpec, you should be able to run the WfSpec via the following command:
lhctl run my-wfspec first-name Obi-Wan
Defining a WfRunVariable
A ThreadSpec can have VariableDefs, which is similar to declaring a variable in programming. When declaring a Variable in LittleHorse, you need to:
- Provide the
nameof theVariable. - Specify the
VariableTypeor provide a default value from which the type is inferred.
A Variable's name must be a valid hostname, meaning lowercase alphanumeric characters separated by a -.
Recall the valid types of Variables:
STRINT(64-bit integer, represented as alongin Java andint64in Go)DOUBLE(64-bit floating point,doublein Java andfloat64in Go)BOOLJSON_OBJ(a dumped JSON String)JSON_ARR(a dumped JSON String)BYTES
Searchable and Required Variables
It is often desirable to be able to search for a WfRun based on the value of the Variables inside it. For example, how can I find the WfRun that has email=foo@bar.com? You can do that via the rpc SearchVariable
In order to do that, however, you must first put an index on your Variable by using the .searchable() method.
Additionally, you can use the .required() method to make a Variable required as input to the ThreadRun. If you do this on your Entrypoint ThreadRun, then the RunWfRequest must specify a value for that Variable.
Putting an Index on a Variable or making the Variable "Required" means that the Variable becomes part of the public API of the WfSpec. That means you will increment a "major version" upon adding or removing an Index on a Variable. For more info, check out our docs on WfSpec Versioning.
Defining Variables
- Java
- Go
- Python
- C#
To define a LittleHorse Variable in Java, just use any of the following methods:
declareStrdeclareIntdeclareDoubledeclareBooldeclareJsonObjdeclareJsonArrdeclareBytes
The only argument of a declare method is the name of the variable.
import io.littlehorse.sdk.wfsdk.WorkflowThread;
import io.littlehorse.sdk.wfsdk.WfRunVariable;
public void threadFunction(WorkflowThread thread) {
WfRunVariable myVar = thread.declareStr("my-variable");
}
You can assign a default value to the Variable using the WithDefault method.
public void threadFunction(WorkflowThread thread) {
WfRunVariable myVar = thread.declareStr("my-variable").WithDefault("Hello, there!");
}
You can set an index on the variable as follows:
public void threadFunction(WorkflowThread thread) {
WfRunVariable myVar = thread.declareStr("my-variable").WithDefault("Hello, there!").searchable();
}
And you can mark the Variable as Required as follows:
public void threadFunction(WorkflowThread thread) {
WfRunVariable myVar = thread.declareStr("my-variable").WithDefault("Hello, there!").required();
}
You can also use the addVariable method to pass the VariableType as an argument:
thread.addVariable("my-variable", VariableType.STR)
This will be useful to advanced users who want to dynamically generate WfSpec variables.
To define a LittleHorse Variable in Go, just use any of the following methods:
DeclareStrDeclareIntDeclareDoubleDeclareBoolDeclareJsonObjDeclareJsonArrDeclareBytes
The only argument of a declare method is the name of the variable.
func myThreadFunc(thread *littlehorse.WorkflowThread) {
myVar := thread.DeclareStr("my-variable")
}
You can assign a default value to the Variable using the WithDefault method.
func myThreadFunc(thread *littlehorse.WorkflowThread) {
nameVar := thread.DeclareStr("my-variable").WithDefault("Ahsoka Tano")
}
You can add an index on a WfRunVariable to make the variable searchable.
func myThreadFunc(thread *littlehorse.WorkflowThread) {
nameVar := thread.DeclareStr("my-variable").WithDefault("Ahsoka Tano").Searchable()
}
You can also use the AddVariable method to pass the VariableType as an argument:
thread.AddVariable("my-variable", lhproto.VariableType_STR)
This will be useful to advanced users who want to dynamically generate WfSpec variables.
To define a LittleHorse Variable in Python, just use any of the following methods:
declare_strdeclare_intdeclare_doubledeclare_booldeclare_json_objdeclare_json_arrdeclare_bytes
The first argument of a declare method is the name of the variable.
def thread_function(thread: WorkflowThread) -> None:
the_name = thread.declare_str("input-name")
You can optionally pass a second argument, which assigns the variable a default value.
def thread_function(thread: WorkflowThread) -> None:
the_name = thread.declare_str("input-name", "The Mandalorian")
You can set an index on the variable as follows:
def thread_function(thread: WorkflowThread) -> None:
the_name = thread.declare_str("input-name").searchable()
# optionally make the variable a Required variable
the_name.required()
You can also use the add_variable method to pass the VariableType as an argument:
thread.add_variable("my-variable", VariableType.STR)
This will be useful to advanced users who want to dynamically generate WfSpec variables.
To define a LittleHorse Variable in C#, just use any of the following methods:
DeclareStrDeclareIntDeclareDoubleDeclareBoolDeclareJsonObjDeclareJsonArrDeclareBytes
The only argument of a declare method is the name of the variable.
public void ThreadFunction(WorkflowThread thread)
{
WfRunVariable myVar = thread.DeclareStr("my-variable");
}
You can assign a default value to the Variable using the WithDefault method.
public void ThreadFunction(WorkflowThread thread)
{
WfRunVariable myVar = thread.DeclareStr("my-variable").WithDefault("Hello, there!");
}
You can set an index on the variable as follows:
public void ThreadFunction(WorkflowThread thread)
{
WfRunVariable myVar = thread.DeclareStr("my-variable").Searchable();
}
And you can mark the Variable as Required as follows:
public void ThreadFunction(WorkflowThread thread)
{
WfRunVariable myVar = thread.DeclareStr("my-variable").Required();
}
You can also use the AddVariable method to pass the VariableType as an argument:
thread.AddVariable("my-variable", VariableType.STR)
This will be useful to advanced users who want to dynamically generate WfSpec variables.
Masked Variables
In certain situations, you may need to mask the contents of a particular variable, ensuring that it remains hidden from users.
You can achieve this by using masked variables, which will only allow access to the variable's content within your WfRun.
- Java
- Go
- Python
- C#
public void threadFunction(WorkflowThread thread) {
WfRunVariable myVar = thread.declareStr("my-masked-variable").masked();
}
func myThreadFunc(thread *littlehorse.WorkflowThread) {
myVar := thread.DeclareStr("my-masked-variable")
myVar.Masked()
}
def thread_function(thread: WorkflowThread) -> None:
the_name = thread.declare_str("input-name").masked()
public void ThreadFunction(WorkflowThread thread)
{
WfRunVariable myVar = thread.DeclareStr("my-masked-variable").Masked();
}
Executing a TASK Node
The WorkflowThread::execute() method can be used to execute a Task. It is required that the TaskDef is already registered with the LittleHorse Kernel, and that you have a Task Worker that is polling for those tasks.
It is perfectly acceptable for a WfSpec written in one language to execute tasks that are defined and run in other languages.
To execute the foo task, you simply do the following:
- Java
- Go
- Python
- C#
public void myWf(WorkflowThread thread) {
NodeOutput output = thread.execute("foo");
}
func myThreadFunc(thread *littlehorse.WorkflowThread) {
taskOutput := thread.Execute("foo")
}
def thread_function(thread: WorkflowThread) -> None:
thread.execute("foo")
static void MyWf(WorkflowThread thread)
{
NodeOutput output = thread.Execute("foo");
}
Task Input Variables
You can pass input variables to a Task. Let's say, for example, I have a Python Task Function as follows:
async def my_task(some_str: str, some_int: int) -> str:
return f"Inputs were {some_str} and {some_int}"
The resulting TaskDef has two input variables, one of type STR and another of type INT.
You can hard-code the input variables in a call to that TaskDef as follows:
- Java
- Go
- Python
- C#
String inputStrVal = "input string value!";
int inputIntVal = 54321;
thread.execute("foo", inputStrVal, inputIntVal);
inputStrVal := "input string value!"
inputIntVal := 54321
thread.Execute("foo", inputStrVal, inputIntVal)
str_val = "input string value!"
int_val = 54321
thread.execute("foo", str_val, int_val)
string inputStrVal = "input string value!";
int inputIntVal = 54321;
thread.Execute("foo", inputStrVal, inputIntVal);
Alternatively, if you have a WfRunVariable, you can use it as input:
- Java
- Go
- Python
- C#
public void threadFunction(WorkflowThread thread) {
WfRunVariable myStr = thread.declareStr("my-str");
WfRunVariable myInt = thread.declareInt("my-int");
thread.execute("foo", myStr, myInt);
}
func threadFunction(thread *littlehorse.WorkflowThread) {
myStr := thread.DeclareStr("my-str")
myInt := thread.DeclareInt("my-int")
thread.Execute("foo", myStr, myInt)
}
def thread_function(thread: WorkflowThread) -> None:
my_str = thread.declare_str("my-str")
my_int = thread.declare_str("my-int")
thread.execute("foo", my_str, my_int)
public void ThreadFunction(WorkflowThread thread)
{
WfRunVariable myStr = thread.DeclareStr("my-str");
WfRunVariable myInt = thread.DeclareStr("my-int");
thread.Execute("foo", myStr, myInt);
}
You can also define an input masked variables by adding some metadata to the task argument or return values. Using the same Python example:
async def my_task(some_str: Annotated[str, LHType(name="some_str", masked=True)], some_int: int) -> str:
return f"Inputs were {some_str} and {some_int}"
Any type of variable can be masked, not limited to string types.
Setting Retention Hours
You can use the Workflow::withRetentionHours() method to set how long a WfRun should stay on the system. Remember that our default system hosts WfRuns for 168 hours (7 days). For example, if the WfSpec has a retention period of 2 hours, a WfRun will be deleted 2 hours after it is COMPLETED or ERROR:
- Java
- Go
- Python
Workflow wf = new WorkflowImpl(...)
wf.withRetentionHours(23);
wf.register(...);
client := ...;
wf := littlehorse.NewWorkflow(basic.MyWorkflow, "my-workflow")
putWf, _ := wf.Compile()
hours := int32(23)
putWf.WithRetentionHours(&hours)
resp, err := client.PutWfSpec(putWf)
wf = Workflow("my-wf", thread_function)
wf.retention_hours = 23
littlehorse.create_workflow_spec(wf, config)