Variables
A WfSpec in LittleHorse gives developers a distributed version of all of the primitives you use when writing code that runs on a single machine. This includes variables.
Concepts
In LittleHorse, a Variable is defined by a WfSpec and is created when a WfRun is run. A Variable can be used inside a WfRun and it can also be fetched as a LittleHorse API Object.
Variables allow you to:
- Pass different inputs into different
WfRuninstances (via therpc RunWfrequest). - Pass data between different nodes in the
WfSpec(eg. pass data between tasks). - Search for specific
WfRuninstances based on variable values.
Variables are defined in the WfSpec. Each variable definition has a name and a type. When you run your WfSpec and create a WfRun, LittleHorse creates a Variable object for each variable definition in the WfSpec. These Variables are then used by the WfRun and can also be inspected and fetched from the LittleHorse API.
Declaring Variables
Just like in programming languages, LittleHorse supports different types of variables that allow you to store various kinds of data and handle a wide range of tasks. Before you can use a Variable, you need to declare it. The following shows how to define a variable of each type in your WfSpec.
- Java
- Python
- Go
- C#
WfRunVariable itemId = wf.declareStr("item-id");
WfRunVariable quantity = wf.declareInt("quantity");
WfRunVariable price = wf.declareDouble("price");
WfRunVariable isPaid = wf.declareBool("is-paid");
WfRunVariable order = wf.declareJsonObj("order");
WfRunVariable items = wf.declareJsonArr("items");
itemId = wf.declare_str("item-id");
quantity = wf.declare_int("quantity");
price = wf.declare_double("price");
isPaid = wf.declare_bool("is-paid");
order = wf.declare_json_obj("order");
items = wf.declare_json_arr("items");
itemId := wf.DeclareStr("item-id")
quantity := wf.DeclareInt("quantity")
price := wf.DeclareDouble("price")
isPaid := wf.DeclareBool("is-paid")
order := wf.DeclareJsonObj("order")
items := wf.DeclareJsonArr("items")
WfRunVariable itemId = wf.DeclareStr("item-id");
WfRunVariable quantity = wf.DeclareInt("quantity");
WfRunVariable price = wf.DeclareDouble("price");
WfRunVariable isPaid = wf.DeclareBool("is-paid");
WfRunVariable order = wf.DeclareJsonObj("order");
WfRunVariable items = wf.DeclareJsonArr("items");
Modifying Variables
When you execute a Task in LittleHorse, the SDK returns a NodeOutput object. This object contains the output of the task and can be used to modify the value of a variable in the WfSpec. You can use this handle to modify the value of a variable in the WfSpec by using the WfRunVariable's assign() method.
- Java
- Python
- Go
- C#
// The .required() makes it a required input variable
WfRunVariable itemId = wf.declareStr("item-id").required();
WfRunVariable price = wf.declareDouble("price");
NodeOutput priceOutput = wf.execute("get-price", itemId);
price.assign(itemId);
# The .required() makes it a required input variable
itemId = wf.declare_str("item-id").required()
price = wf.declare_double("price")
priceOutput = wf.execute("get-price", itemId)
price.assign(priceOutput)
// The .required() makes this variable required
itemId := wf.DeclareStr("item-id").Required()
price := wf.DeclareDouble("price")
priceOutput := wf.Execute("get-price", itemId)
price.Assign(priceOutput)
// The .Required() makes it a required input variable
WfRunVariable itemId = wf.DeclareStr("item-id").Required();
WfRunVariable price = wf.DeclareDouble("price");
NodeOutput priceOutput = wf.Execute("get-price", itemId);
price.Assign(itemId);
You can form complex expressions with LittleHorse Variables. Check out the .add(), .jsonPath(), and other methods on the WfRunVariable 😃.
JSON Variables
LittleHorse's SDK has advanced support for JSON. The .jsonPath() method in the WfRunVariable allows you to access and modify sub-fields of a JSON_ARR or JSON_OBJ variable. You can also use .jsonPath() on a NodeOutput (i.e. the output of a Task that returns a JSON Object) to access fields within the output of a Task.
- Java
- Python
- Go
- C#
WfRunVariable itemId = wf.declareStr("item-id").required();
WfRunVariable item = wf.declareJsonObj("item");
NodeOutput itemOutput = wf.execute("fetch-item", itemId);
item.assign(itemOutput);
// Accessing a sub-field of a JSON Object
wf.execute("charge-credit-card", item.jsonPath("$.price"));
// Mutate a subfield of a JSON Object
item.jsonPath("$.isSold").assign(true);
itemId = wf.declare_str("item-id").required()
item = wf.declare_json_obj("item")
itemOutput = wf.execute("fetch-item", itemId)
item.assign(itemOutput)
#Accessing a subfield of JSON Object
wf.execute("charge-credit-card", item.json_path("$.price"))
#Mutate a subfield of a JSON Object
item.with_json_path("$.is_sold").assign(True)
itemId := wf.DeclareStr("item-id").Required()
item := wf.DeclareJsonObj("item")
itemOutput := wf.Execute("fetch-item", itemId)
item.Assign(itemOutput)
//Accessing sub-field of Json object
wf.Execute("charge-credit-card", item.JsonPath("$.price"))
//Mutate sub-field of Json object
isSold := item.JsonPath("$.is_sold")
isSold.Assign(true)
WfRunVariable itemId = wf.DeclareStr("item-id").Required();
WfRunVariable item = wf.DeclareJsonObj("item");
NodeOutput itemOutput = wf.Execute("fetch-item", itemId);
item.Assign(itemOutput);
// Accessing a sub-field of a JSON Object
wf.Execute("charge-credit-card", item.WithJsonPath("$.Price"));
// Mutate a subfield of a JSON Object
item.WithJsonPath("$.IsSold").Assign(true);
JSON variables in LittleHorse do not have type information or schemas. This can make debugging with them quite difficult. In order to address this limitation, we are working on adding a new type of variable to LittleHorse, called a Struct. For more information, follow this GitHub Issue.
Variable Modifiers
Just like Java (private, public, final, static, etc), variables in LittleHorse can have modifiers. Modifiers in LittleHorse include:
.required(), which makes aVariablea required input to the workflow (specifically, theThreadSpec)..searchable(), which allows you to search for instances of the variable by their value..public(), which makes the Variable considered as part of theWfSpec's public API. This is important for upcoming (very advanced) LittleHorse features such as the Output Topic.
In Practice
In order to use Variables in a workflow, you need to:
- Register a
WfSpecthat defines someVariables. - Run a
WfRun, passing in those variables (if they are required as input variables).
After this, you can observe the result on the LittleHorse Dashboard.
Registering the WfSpec
In this example, we will build a WfSpec that models a simple user notification workflow. The WfSpec requires as input a user-id and a message. It then loads information about the user (this is a TaskRun that most likely hits a user microservice) and uses that information to formulate a message to send via the email TaskDef.
Background: the Tasks
We'll create two TaskDefs for this example. The first one will take in a String userId and return a User object (which will be serialized as a JSON_OBJ variable value in LittleHorse). The second takes in an email address and a message and sends an email.
- Java
- Python
- Go
- C#
package io.littlehorse.quickstart;
import io.littlehorse.sdk.common.config.LHConfig;
import io.littlehorse.sdk.common.exception.LHTaskException;
import io.littlehorse.sdk.worker.LHTaskMethod;
import io.littlehorse.sdk.worker.LHTaskWorker;
class User {
public String email;
public String title;
public int age;
public User() {}
public User(String email, String title, int age) {
this.email = email;
this.title = title;
this.age = age;
}
}
class MyTasks {
// This Task Method returns a POJO, which is automatically serialized to a JSON_OBJ variable
// value in LittleHorse.
@LHTaskMethod("fetch-user")
public User fetchUser(String userId) {
if (userId.equals("obiwan")) {
return new User("obiwan@jedi.temple", "Master Kenobi", 37);
} else if (userId.equals("anakin")) {
return new User("anakin@jedi.temple", "Padawan Skywalker (not Master)", 22);
} else {
throw new LHTaskException("user-not-found", "Could not find specified user");
}
}
@LHTaskMethod("send-email")
public String sendEmail(String toAddress, String message) {
String result = "sent email " + message + " to address " + toAddress;
System.out.println(result);
return result;
}
}
public class Main {
public static void main(String[] args) throws Exception {
LHConfig config = new LHConfig();
MyTasks taskFuncs = new MyTasks();
LHTaskWorker emailer = new LHTaskWorker(taskFuncs, "send-email", config);
LHTaskWorker userService = new LHTaskWorker(taskFuncs, "fetch-user", config);
emailer.registerTaskDef();
userService.registerTaskDef();
Runtime.getRuntime().addShutdownHook(new Thread(emailer::close));
Runtime.getRuntime().addShutdownHook(new Thread(userService::close));
emailer.start();
userService.start();
}
}
import littlehorse
from littlehorse import create_task_def
from littlehorse.exceptions import LHTaskException
from littlehorse.config import LHConfig
from littlehorse.worker import LHTaskWorker
import asyncio
from typing import Any
config = LHConfig()
async def fetch_user(user_id: str) -> dict[str, Any]:
if user_id == "obiwan":
return {"email": "obiwan@jedi.temple", "title": "Master Kenobi", "age": 37}
elif user_id == "anakin":
return {"email": "anakin@jedi.temple", "title": "Padawan Anakin", "age": 22}
else:
LHTaskException("user-not-found", "Could not find specified user")
async def send_email(to_address: str, message: str) -> str:
result = "sent email " + message + " to address " + to_address
print(result)
return result
async def main():
workers = [
LHTaskWorker(fetch_user, "fetch-user", config),
LHTaskWorker(send_email, "send-email", config)
]
await littlehorse.start(*workers)
if __name__ == "__main__":
create_task_def(fetch_user, "fetch-user", config)
create_task_def(send_email, "send-email", config)
asyncio.run(main())
import (
"errors"
"fmt"
"github.com/littlehorse-enterprises/littlehorse/sdk-go/littlehorse"
)
type User struct {
Email string `json:"email"`
Title string `json:"title"`
Age int `json:"age"`
}
func FetchUser(userId string) (*User, error) {
if userId == "obiwan" {
return &User{
Email: "obiwan@jedi.temple",
Title: "Master Kenobi",
Age: 37}, nil
} else if userId == "anakin"{
return &User{
Email: "anakin@jedi.temple",
Title: "Padawan Skywalker (not Master)",
Age: 22}, nil
} else {
return nil, errors.New("user-not-found: Could not find specified user")
}
}
func SendEmail(toAdress string, message string) string {
result := "sent email " + message + " to " + toAdress
fmt.Println(result)
return result
}
func main() {
config := littlehorse.NewConfigFromEnv()
emailer, _ := littlehorse.NewTaskWorker(config, SendEmail, "send-email")
userService, _ := littlehorse.NewTaskWorker(config, FetchUser, "fetch-user")
//Register TaskDef
emailer.RegisterTaskDef()
userService.RegisterTaskDef()
//Start Task Worker
go func() {
emailer.Start()
}()
userService.Start()
}
using LittleHorse.Sdk;
using LittleHorse.Sdk.Exceptions;
using LittleHorse.Sdk.Worker;
namespace Quickstart;
class User
{
public string Email;
public string Title;
public int Age;
public User(string email, string title, int age)
{
Email = email;
Title = title;
Age = age;
}
}
class MyTasks
{
// This Task Method returns a POJO, which is automatically serialized to a JSON_OBJ variable
// value in LittleHorse.
[LHTaskMethod("fetch-user")]
public User FetchUser(string userId)
{
if (userId.Equals("obiwan"))
{
return new User("obiwan@jedi.temple", "Master Kenobi", 37);
}
if (userId.Equals("anakin"))
{
return new User("anakin@jedi.temple", "Padawan Skywalker (not Master)", 22);
}
throw new LHTaskException("user-not-found", "Could not find specified user");
}
[LHTaskMethod("send-email")]
public string SendEmail(string toAddress, string message)
{
string result = "sent email " + message + " to address " + toAddress;
Console.WriteLine(result);
return result;
}
}
public abstract class Program
{
static void Main(string[] args)
{
var config = new LHConfig();
MyTasks myFuncs = new MyTasks();
LHTaskWorker<MyTasks> emailer = new LHTaskWorker<MyTasks>(myFuncs, "send-email", config);
LHTaskWorker<MyTasks> userService = new LHTaskWorker<MyTasks>(myFuncs, "fetch-user", config);
emailer.RegisterTaskDef();
userService.RegisterTaskDef();
emailer.Start();
userService.Start();
}
}
At this point, you should have two new TaskDefs available in your LittleHorse cluster.
The WfSpec
We'll create the following variables in our WfSpec:
user-id, of typeSTR, which is required as input.email, of typeSTR, which is searchable.age, of typeINT, which is just used internally.
- Java
- Python
- Go
- C#
package io.littlehorse.quickstart;
import io.littlehorse.sdk.common.config.LHConfig;
import io.littlehorse.sdk.wfsdk.LHFormatString;
import io.littlehorse.sdk.wfsdk.NodeOutput;
import io.littlehorse.sdk.wfsdk.WfRunVariable;
import io.littlehorse.sdk.wfsdk.Workflow;
import io.littlehorse.sdk.wfsdk.WorkflowThread;
import io.littlehorse.sdk.common.proto.VariableType;
public class Main {
public static String WF_NAME = "variables-example";
public static void wfLogic(WorkflowThread wf) {
WfRunVariable userId = wf.declareStr("user-id").required().searchable();
WfRunVariable userObject = wf.declareJsonObj("user-obj");
WfRunVariable age = wf.declareInt("age");
// This task returns a json blob (the `User` class)
NodeOutput userOutput = wf.execute("fetch-user", userId);
// You can take a Json output from a task and just copy it into a JSON_OBJ variable
userObject.assign(userOutput);
// You can also use specific fields of the task output to edit a non-json variable
age.assign(userOutput.jsonPath("$.age"));
// This is somewhat advanced; we create an expression which combines a few
// strings together.
LHFormatString message = wf.format(
"Hello there, {0}! You are {1} years old",
userObject.jsonPath("$.title"),
age);
wf.execute("send-email", userObject.jsonPath("$.email"), message);
}
public static void main(String[] args) throws Exception {
LHConfig config = new LHConfig();
Workflow wfGenerator = Workflow.newWorkflow(WF_NAME, Main::wfLogic);
wfGenerator.registerWfSpec(config.getBlockingStub());
}
}
from littlehorse.workflow import Workflow, WorkflowThread
from littlehorse import create_workflow_spec
from littlehorse.config import LHConfig
config = LHConfig()
def get_workflow() -> Workflow:
def wfLogic(wf: WorkflowThread) -> None:
user_id = wf.declare_str("user-id").required().searchable()
user = wf.declare_json_obj("user")
age = wf.declare_int("age")
#This task returns a dict containing user attributes
fetched_user = wf.execute("fetch-user", user_id)
#A dict can be directly assigned to a Littlehorse JSON Object
user.assign(fetched_user)
#You can also use specific fields of the task output to edit a non-json variable
age.assign(fetched_user.with_json_path("$.age"))
#This is somewhat advanced; we create an expression which combines a few
#strings together
message = wf.format(
"Hello there, {0}! You are {1} years old",
user.with_json_path("$.title"),
age
)
wf.execute("send-email", user.with_json_path("$.email"), message)
return Workflow("variables-example", wfLogic)
if **name** == "**main**":
create_workflow_spec(get_workflow(), config)
import (
"context"
"log"
"github.com/littlehorse-enterprises/littlehorse/sdk-go/littlehorse"
)
func Wflogic(wf *littlehorse.WorkflowThread) {
userId := wf.DeclareStr("user-id").Required().Searchable()
userObject := wf.DeclareJsonObj("user-obj")
age := wf.DeclareInt("age")
// This task returns a json blob (the `User` class)
userOutput := wf.Execute("fetch-user", userId)
// You can take a Json output from a task and just copy it into a JSON_OBJ variable
userObject.Assign(userOutput)
// You can also use specific fields of the task output to edit a non-json variable
age.Assign(userOutput.Output.JsonPath("$.age"))
// This is somewhat advanced; we create an expression which combines a few
// strings together.
message := wf.Format("Hello there, {0}! You are {1} years old",
userId, age)
wf.Execute("send-email", userObject.JsonPath("$.email"), message)
}
func main() {
// Get a client
config := littlehorse.NewConfigFromEnv()
client, _ := config.GetGrpcClient()
workflowGenerator := littlehorse.NewWorkflow(Wflogic, "variables-example")
request, err := workflowGenerator.Compile()
if err != nil {
log.Fatal(err)
}
(*client).PutWfSpec(context.Background(), request)
}
using LittleHorse.Sdk;
using LittleHorse.Sdk.Workflow.Spec;
namespace Quickstart;
public abstract class Program
{
public static string WfName = "variables-example";
public static void WfLogic(WorkflowThread wf)
{
WfRunVariable userId = wf.DeclareStr("user-id").Required().Searchable();
WfRunVariable userObject = wf.DeclareJsonObj("user-obj");
WfRunVariable age = wf.DeclareInt("age");
// This task returns a json blob (the `User` class)
NodeOutput userOutput = wf.Execute("fetch-user", userId);
// You can take a Json output from a task and just copy it into a JSON_OBJ variable
userObject.Assign(userOutput);
// You can also use specific fields of the task output to edit a non-json variable
age.Assign(userOutput.WithJsonPath("$.Age"));
// This is somewhat advanced; we create an expression which combines a few
// strings together.
LHFormatString message = wf.Format(
"Hello there, {0}! You are {1} years old",
userObject.WithJsonPath("$.Title"),
age);
wf.Execute("send-email", userObject.WithJsonPath("$.Email"), message);
}
static void Main(string[] args)
{
var config = new LHConfig();
Workflow wfGenerator = new Workflow(WfName, WfLogic);
wfGenerator.RegisterWfSpec(config.GetGrpcClientInstance());
}
}
In the above example,
At this point, you should see the WfSpec in the dashboard. It will look something like this:

Running the WfRun
Now let's run the workflow!
->lhctl run variables-example
2025/03/08 11:32:31 rpc error: code = InvalidArgument desc = Must provide required input variable user-id of type STR
Oops, that didn't go too well. The LittleHorse Kernel got grumpy with us because our WfSpec has a required variable user-id and we didn't pass it in. Let's fix that:
lhctl run variables-example user-id anakin
Now, if we go to the Dashboard, we can see the WfRun is COMPLETED. When we click on the WfRun, we can easily inspect the ending state of the variables after all of the mutations happened:

If you inspect the user-object variable, you'll notice it exactly matches the output of the fetch-user task, which is just the User object serialized to json.
{
"email": "anakin@jedi.temple",
"title": "Padawan Skywalker (not Master)",
"age": 22
}
Have some fun and explore! Look at what was passed into each of your TaskRuns and compare that to our WfSpec code. This will help you understand the parallels between LittleHorse Variables + WfSpecs and Java variables + programs.
Searching by Variables
Before we start searching, let's run one more WfRun:
lhctl run variables-example user-id obiwan
You can search for a Variable using rpc SearchVariable, or via lhctl as follows:
lhctl search variable --wfSpecName variables-example --name user-id --varType STR --value anakin
The output looks like:
{
"results": [
{
"wfRunId": {
"id": "52357bdec63241ed9b3dfef2b5f4a1d4"
},
"threadRunNumber": 0,
"name": "user-id"
}
]
}
The command above searches for Variables with a specific type. The VariableId includes a WfRunId, so you can easily find the associated WfRun.
Let's look at the actual Variable:
lhctl get variable 52357bdec63241ed9b3dfef2b5f4a1d4 0 user-id
The output shows that the value is indeed anakin!
{
"id": {
"wfRunId": {
"id": "52357bdec63241ed9b3dfef2b5f4a1d4"
},
"threadRunNumber": 0,
"name": "user-id"
},
"value": {
"str": "anakin"
},
"createdAt": "2025-03-08T19:33:45.431Z",
"wfSpecId": {
"name": "variables-example",
"majorVersion": 0,
"revision": 0
},
"masked": false
}
Lastly, you can search for variables on the dashboard, by going to the WfSpec page and clicking Search By Variable.
Please note that, while Obi-Wan Kenobi is indeed a Jedi Master, Anakin Skywalker is not.
Gotchas
LittleHorse Variables are designed to be as close to programming language variables as possible. However, there are some key differences to be aware of.
Why WfRunVariable is Special
It's important to note that your Workflow Function (using the WorkflowThread) is not executed when you run a WfRun. Rather, it is executed once to compile a WfSpec using the LittleHorse DSL, and gets translated into a WfSpec protobuf object which is a directed graph of nodes and edges.
The WorkflowThread#declareStr() method tells LittleHorse to create a LittleHorse Variable inside the WfSpec. At runtime, when an instance of the workflow (WfRun) is started, LittleHorse schedules and executes the tasks defined in the WfSpec, and the LittleHorse variables in the WfSpec will also be created and stored in the LittleHorse Kernel.
Variable Payload Size
Variables are great for passing small pieces of data around (and even works for JSON files up to a few dozen KB). However, if you need to pass around large files or data, you should consider using a different mechanism, you should store the data payload in an external system and use a LittleHorse Variable to pass around a reference to the data (eg. an S3 URL). There is a hard limit to LittleHorse Variables of size 250KB.
Further Resources
Congrats on learning how to use Variables in your LittleHorse workflows! There's a lot more you can do with Variables, though, so keep on reading:
- Check out our
WfSpecDevelopment Guide. - Join the LittleHorse Slack Community
- Give us a star on GitHub