Basics
To develop a WfSpec
in LittleHorse, you can use the Workflow
struct or object in our SDK's. Generally, the Workflow
entity constructor requires two arguments:
- The name of the
WfSpec
to create. - A
ThreadFunc
, which is function pointer, lambda function, or interface of some sort which contains the logic for the EntrypointThreadSpec
.
The Workflow
object translates your ThreadFunc
into a WfSpec
. As per the Metadata Management Documentation, you can easily deploy a WfSpec
once you've gotten the Workflow
object.
The ThreadFunc
takes in one argument: a WorkflowThread
. Everything you do goes through the ThreadFunc
. The ThreadFunc
defines a ThreadSpec
, and the ThreadFunc
passed into the Workflow
object or struct is used to build the Entrypoint Thread.
Quickstart
Below you can find executable files that define a WfSpec
with a single step: execute the greet
TaskDef with the supplied first-name
variable which comes as input. As a prerequisite, you need to have the greet
TaskDef
already registered in your LittleHorse Cluster.
If you haven't yet created a TaskDef
named greet
, you can do it by following our Task Worker Development Quickstart.
- Java
- Go
- Python
- C#
package io.littlehorse.quickstart;
import java.io.IOException;
import io.littlehorse.sdk.common.config.LHConfig;
import io.littlehorse.sdk.wfsdk.WfRunVariable;
import io.littlehorse.sdk.wfsdk.Workflow;
import io.littlehorse.sdk.wfsdk.WorkflowThread;
import io.littlehorse.sdk.common.proto.VariableType;
public class Main {
public static void main(String[] args) throws IOException, InterruptedException {
LHConfig config = new LHConfig();
// The `Workflow` object uses the DSL to compile the WfSpec
Workflow workflowGenerator = Workflow.newWorkflow("my-wfspec", Main::wfLogic);
// Convenience method to register the `WfSpec` automatically.
workflowGenerator.registerWfSpec(config.getBlockingStub());
}
// NOTE: this can be static or non-static.
static void wfLogic(WorkflowThread wf) {
// Required input variable.
WfRunVariable firstName = wf.declareStr("first-name").required();
// Execute the `greet` Task and pass in `first-name` as an argument.
wf.execute("greet", firstName);
}
}
package main
import (
"context"
"log"
"github.com/littlehorse-enterprises/littlehorse/sdk-go/littlehorse"
"github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto"
)
func wfLogic(wf *littlehorse.WorkflowThread) {
firstName := wf.DeclareStr("first-name").Required()
wf.Execute("greet", firstName)
}
func main() {
// Get a client
config := littlehorse.NewConfigFromEnv()
client, _ := config.GetGrpcClient()
workflowGenerator := littlehorse.NewWorkflow(wfLogic, "my-wfspec")
request, err := workflowGenerator.Compile()
if err != nil {
log.Fatal(err)
}
(*client).PutWfSpec(context.Background(), request)
}
from littlehorse.workflow import WorkflowThread, WfRunVariable, Workflow
from littlehorse.config import LHConfig
from littlehorse.model import VariableType, PutWfSpecRequest
def workflow_logic(wf: WorkflowThread) -> None:
first_name: WfRunVariable = wf.declare_str("first-name").required()
wf.execute("greet", first_name)
if __name__ == '__main__':
config = LHConfig()
client = config.stub()
workflow_generator = Workflow("my-wfspec", workflow_logic)
request: PutWfSpecRequest = workflow_generator.compile()
client.PutWfSpec(request)
using LittleHorse.Sdk;
using LittleHorse.Sdk.Common.Proto;
using LittleHorse.Sdk.Workflow.Spec;
public class Program
{
static void Main(string[] args)
{
var config = new LHConfig();
var lhClient = config.GetGrpcClientInstance();
var workflow = new Workflow("my-wfspec", WfLogic);
workflow.RegisterWfSpec(lhClient);
}
// NOTE: this can be static or non-static.
static void WfLogic(WorkflowThread wf)
{
// Required input variable.
WfRunVariable firstName = wf.DeclareStr("first-name").Required();
// Execute the `greet` Task and pass in `first-name` as an argument.
wf.Execute("greet", firstName);
}
}
At this point, whether you used python, go, or Java for the WfSpec, you should be able to run the WfSpec
via the following command:
lhctl run my-wfspec first-name Obi-Wan
Defining a WfRunVariable
A ThreadSpec
can have VariableDef
s, which is similar to declaring a variable in programming. When declaring a Variable
in LittleHorse, you need to:
- Provide the
name
of theVariable
. - Specify the
VariableType
or provide a default value from which the type is inferred.
A Variable
's name must be a valid hostname, meaning lowercase alphanumeric characters separated by a -
.
Recall the valid types of Variables:
STR
INT
(64-bit integer, represented as along
in Java andint64
in Go)DOUBLE
(64-bit floating point,double
in Java andfloat64
in Go)BOOL
JSON_OBJ
(a dumped JSON String)JSON_ARR
(a dumped JSON String)BYTES
Searchable and Required Variables
It is often desirable to be able to search for a WfRun
based on the value of the Variable
s inside it. For example, how can I find the WfRun
that has email=foo@bar.com
? You can do that via the rpc SearchVariable
In order to do that, however, you must first put an index on your Variable
by using the .searchable()
method.
Additionally, you can use the .required()
method to make a Variable
required as input to the ThreadRun
. If you do this on your Entrypoint ThreadRun
, then the RunWfRequest
must specify a value for that Variable
.
Putting an Index on a Variable
or making the Variable
"Required" means that the Variable
becomes part of the public API of the WfSpec
. That means you will increment a "major version" upon adding or removing an Index on a Variable
. For more info, check out our docs on WfSpec Versioning.
Defining Variables
- Java
- Go
- Python
- C#
To define a LittleHorse Variable in Java, just use any of the following methods:
declareStr
declareInt
declareDouble
declareBool
declareJsonObj
declareJsonArr
declareBytes
The only argument of a declare
method is the name of the variable.
import io.littlehorse.sdk.wfsdk.WorkflowThread;
import io.littlehorse.sdk.wfsdk.WfRunVariable;
public void threadFunction(WorkflowThread thread) {
WfRunVariable myVar = thread.declareStr("my-variable");
}
You can assign a default value to the Variable
using the WithDefault
method.
public void threadFunction(WorkflowThread thread) {
WfRunVariable myVar = thread.declareStr("my-variable").WithDefault("Hello, there!");
}
You can set an index on the variable as follows:
public void threadFunction(WorkflowThread thread) {
WfRunVariable myVar = thread.declareStr("my-variable").WithDefault("Hello, there!").searchable();
}
And you can mark the Variable
as Required as follows:
public void threadFunction(WorkflowThread thread) {
WfRunVariable myVar = thread.declareStr("my-variable").WithDefault("Hello, there!").required();
}
You can also use the addVariable
method to pass the VariableType
as an argument:
thread.addVariable("my-variable", VariableType.STR)
This will be useful to advanced users who want to dynamically generate WfSpec
variables.
To define a LittleHorse Variable in Go, just use any of the following methods:
DeclareStr
DeclareInt
DeclareDouble
DeclareBool
DeclareJsonObj
DeclareJsonArr
DeclareBytes
The only argument of a declare
method is the name of the variable.
func myThreadFunc(thread *littlehorse.WorkflowThread) {
myVar := thread.DeclareStr("my-variable")
}
You can assign a default value to the Variable
using the WithDefault
method.
func myThreadFunc(thread *littlehorse.WorkflowThread) {
nameVar := thread.DeclareStr("my-variable").WithDefault("Ahsoka Tano")
}
You can add an index on a WfRunVariable
to make the variable searchable.
func myThreadFunc(thread *littlehorse.WorkflowThread) {
nameVar := thread.DeclareStr("my-variable").WithDefault("Ahsoka Tano").Searchable()
}
You can also use the AddVariable
method to pass the VariableType
as an argument:
thread.AddVariable("my-variable", lhproto.VariableType_STR)
This will be useful to advanced users who want to dynamically generate WfSpec
variables.
To define a LittleHorse Variable in Python, just use any of the following methods:
declare_str
declare_int
declare_double
declare_bool
declare_json_obj
declare_json_arr
declare_bytes
The first argument of a declare
method is the name of the variable.
def thread_function(thread: WorkflowThread) -> None:
the_name = thread.declare_str("input-name")
You can optionally pass a second argument, which assigns the variable a default value.
def thread_function(thread: WorkflowThread) -> None:
the_name = thread.declare_str("input-name", "The Mandalorian")
You can set an index on the variable as follows:
def thread_function(thread: WorkflowThread) -> None:
the_name = thread.declare_str("input-name").searchable()
# optionally make the variable a Required variable
the_name.required()
You can also use the add_variable
method to pass the VariableType
as an argument:
thread.add_variable("my-variable", VariableType.STR)
This will be useful to advanced users who want to dynamically generate WfSpec
variables.
To define a LittleHorse Variable in C#, just use any of the following methods:
DeclareStr
DeclareInt
DeclareDouble
DeclareBool
DeclareJsonObj
DeclareJsonArr
DeclareBytes
The only argument of a declare
method is the name of the variable.
public void ThreadFunction(WorkflowThread thread)
{
WfRunVariable myVar = thread.DeclareStr("my-variable");
}
You can assign a default value to the Variable
using the WithDefault
method.
public void ThreadFunction(WorkflowThread thread)
{
WfRunVariable myVar = thread.DeclareStr("my-variable").WithDefault("Hello, there!");
}
You can set an index on the variable as follows:
public void ThreadFunction(WorkflowThread thread)
{
WfRunVariable myVar = thread.DeclareStr("my-variable").Searchable();
}
And you can mark the Variable
as Required as follows:
public void ThreadFunction(WorkflowThread thread)
{
WfRunVariable myVar = thread.DeclareStr("my-variable").Required();
}
You can also use the AddVariable
method to pass the VariableType
as an argument:
thread.AddVariable("my-variable", VariableType.STR)
This will be useful to advanced users who want to dynamically generate WfSpec
variables.
Masked Variables
In certain situations, you may need to mask the contents of a particular variable, ensuring that it remains hidden from users.
You can achieve this by using masked variables, which will only allow access to the variable's content within your WfRun
.
- Java
- Go
- Python
- C#
public void threadFunction(WorkflowThread thread) {
WfRunVariable myVar = thread.declareStr("my-masked-variable").masked();
}
func myThreadFunc(thread *littlehorse.WorkflowThread) {
myVar := thread.DeclareStr("my-masked-variable")
myVar.Masked()
}
def thread_function(thread: WorkflowThread) -> None:
the_name = thread.declare_str("input-name").masked()
public void ThreadFunction(WorkflowThread thread)
{
WfRunVariable myVar = thread.DeclareStr("my-masked-variable").Masked();
}
Executing a TASK
Node
The WorkflowThread::execute()
method can be used to execute a Task. It is required that the TaskDef
is already registered with the LittleHorse Server, and that you have a Task Worker that is polling for those tasks.
It is perfectly acceptable for a WfSpec
written in one language to execute tasks that are defined and run in other languages.
To execute the foo
task, you simply do the following:
- Java
- Go
- Python
- C#
public void myWf(WorkflowThread thread) {
NodeOutput output = thread.execute("foo");
}
func myThreadFunc(thread *littlehorse.WorkflowThread) {
taskOutput := thread.Execute("foo")
}
def thread_function(thread: WorkflowThread) -> None:
thread.execute("foo")
static void MyWf(WorkflowThread thread)
{
NodeOutput output = thread.Execute("foo");
}
Task Input Variables
You can pass input variables to a Task. Let's say, for example, I have a Python Task Function as follows:
async def my_task(some_str: str, some_int: int) -> str:
return f"Inputs were {some_str} and {some_int}"
The resulting TaskDef
has two input variables, one of type STR
and another of type INT
.
You can hard-code the input variables in a call to that TaskDef
as follows:
- Java
- Go
- Python
- C#
String inputStrVal = "input string value!";
int inputIntVal = 54321;
thread.execute("foo", inputStrVal, inputIntVal);
inputStrVal := "input string value!"
inputIntVal := 54321
thread.Execute("foo", inputStrVal, inputIntVal)
str_val = "input string value!"
int_val = 54321
thread.execute("foo", str_val, int_val)
string inputStrVal = "input string value!";
int inputIntVal = 54321;
thread.Execute("foo", inputStrVal, inputIntVal);
Alternatively, if you have a WfRunVariable
, you can use it as input:
- Java
- Go
- Python
- C#
public void threadFunction(WorkflowThread thread) {
WfRunVariable myStr = thread.declareStr("my-str");
WfRunVariable myInt = thread.declareInt("my-int");
thread.execute("foo", myStr, myInt);
}
func threadFunction(thread *littlehorse.WorkflowThread) {
myStr := thread.DeclareStr("my-str")
myInt := thread.DeclareInt("my-int")
thread.Execute("foo", myStr, myInt)
}
def thread_function(thread: WorkflowThread) -> None:
my_str = thread.declare_str("my-str")
my_int = thread.declare_str("my-int")
thread.execute("foo", my_str, my_int)
public void ThreadFunction(WorkflowThread thread)
{
WfRunVariable myStr = thread.DeclareStr("my-str");
WfRunVariable myInt = thread.DeclareStr("my-int");
thread.Execute("foo", myStr, myInt);
}
You can also define an input masked variables by adding some metadata to the task argument or return values. Using the same Python example:
async def my_task(some_str: Annotated[str, LHType(name="some_str", masked=True)], some_int: int) -> str:
return f"Inputs were {some_str} and {some_int}"
Any type of variable can be masked, not limited to string types.
Setting Retention Hours
You can use the Workflow::withRetentionHours()
method to set how long a WfRun
should stay on the system. Remember that our default system hosts WfRun
s for 168 hours (7 days). For example, if the WfSpec
has a retention period of 2 hours, a WfRun
will be deleted 2 hours after it is COMPLETED
or ERROR
:
- Java
- Go
- Python
Workflow wf = new WorkflowImpl(...)
wf.withRetentionHours(23);
wf.register(...);
client := ...;
wf := littlehorse.NewWorkflow(basic.MyWorkflow, "my-workflow")
putWf, _ := wf.Compile()
hours := int32(23)
putWf.WithRetentionHours(&hours)
resp, err := client.PutWfSpec(putWf)
wf = Workflow("my-wf", thread_function)
wf.retention_hours = 23
littlehorse.create_workflow_spec(wf, config)