Developing Task Workers
Each LittleHorse SDK provides an LHTaskWorker object or struct which lets you turn an arbitrary function or method into a LittleHorse Task.
Quickstart
The LHTaskWorker object allows you to create and start a Task Worker in our SDK's. Below, you will find compiler-ready programs that you can run which will:
- Register a
TaskDefcalledgreetwhich takes in oneSTRvariable as input. - Starts a Task Worker to poll the LittleHorse Cluster asking for a task to execute.
- Java
- Go
- Python
- C#
To create a Task Worker, you need to do the following:
- Create an
LHConfig(see this configuration documentation). - Write a Task Worker class with an annotated
@LHTaskMethodmethod. - Create an
LHTaskWorkerobject with your config and Task Worker Object - Register the
TaskDefwithworker.registerTaskDef() - And finally call
.start().
Let's build a Task Worker for a TaskDef named greet that takes in a String and returns a String. First, the Task Worker Object:
package io.littlehorse.quickstart;
import java.io.IOException;
import io.littlehorse.sdk.common.config.LHConfig;
import io.littlehorse.sdk.worker.LHTaskMethod;
import io.littlehorse.sdk.worker.LHTaskWorker;
class MyWorker {
@LHTaskMethod("greet")
public String greeting(String firstName) {
String result = "Hello there, " + firstName + "!";
System.out.println(result);
return result;
}
}
public class Main {
public static void main(String[] args) throws IOException, InterruptedException {
LHConfig config = new LHConfig();
MyWorker executable = new MyWorker();
LHTaskWorker greetWorker = new LHTaskWorker(executable, "greet", config);
Runtime.getRuntime().addShutdownHook(new Thread(greetWorker::close));
greetWorker.registerTaskDef();
greetWorker.start();
}
}
Java does not preserve method parameters names after compilation. This means, by default, the above code will generate a TaskDef with the input_var labeled arg0 instead of firstName.
If you want your input_vars to match your Java method definition, use the -parameters flag in your Java compiler args.
- Gradle
- Maven
With Gradle, add the following configuration to your build.gradle:
compileJava {
options.compilerArgs << '-parameters'
}
With Maven, use the maven-compiler-plugin:
<project>
[...]
<build>
[...]
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.13.0</version>
<configuration>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
[...]
</build>
[...]
</project>
To create a Task Worker, you need to do three things:
- Create a
common.LHConfig(see this configuration documentation). - Write a GoLang
funcwhich you will use as your Task Function. - Use the
taskworker.NewTaskWorker()function to create anLHTaskWorkerwith your config and Task Function.
At this point, you can use your LHTaskWorker to register your TaskDef and to start executing tasks.
package main
import (
"github.com/littlehorse-enterprises/littlehorse/sdk-go/littlehorse"
)
func Greet(firstName string) string {
return "Hello there, " + firstName + "!"
}
func main() {
config := littlehorse.NewConfigFromEnv()
worker, _ := littlehorse.NewTaskWorker(config, Greet, "greet")
worker.RegisterTaskDef()
worker.Start()
}
To create a Task Worker, you need to do the following:
- Create an
LHConfig(see this configuration documentation). - Write an
asyncpython function which you will use as your Task Function. - Create and start an
LHTaskWorkerwith that function.
Here is an example:
import asyncio
import littlehorse
from littlehorse.config import LHConfig
from littlehorse.worker import LHTaskWorker
async def greeting(first_name: str) -> str:
msg = f"Hello there, {first_name}!"
print(msg)
return msg
async def main() -> None:
config = LHConfig()
worker = LHTaskWorker(greeting, "greet", config)
worker.register_task_def()
await asyncio.sleep(1.0)
await littlehorse.start(worker)
if __name__ == "__main__":
asyncio.run(main())
To create a Task Worker, you need to do the following:
- Create an
LHConfig(see this configuration documentation). - Write a Task Worker class with an attribute
LHTaskMethodmethod. - Create an
LHTaskWorkerobject with your config and Task Worker Object - Register the
TaskDefwithworker.RegisterTaskDef() - And finally call
worker.Start().
Let's build a Task Worker for a TaskDef named greet that takes in a String and returns a String. First, the Task Worker Object:
using LittleHorse.Sdk;
using LittleHorse.Sdk.Worker;
namespace Examples.BasicExample
{
public class MyWorker
{
[LHTaskMethod("greet")]
public string Greeting(string name)
{
var message = $"Hello there {name}!";
Console.WriteLine($"Executing task greet: " + message);
return message;
}
}
public class Program
{
static void Main(string[] args)
{
var config = new LHConfig();
MyWorker executable = new MyWorker();
var worker = new LHTaskWorker<MyWorker>(executable, "greet", config);
worker.RegisterTaskDef();
Thread.Sleep(1000);
worker.Start();
}
}
}
Advanced Usage
The Task Worker library has some features that make advanced use cases easier.
Throwing Workflow EXCEPTIONs
As described in our Failure Handling Concept Docs, LittleHorse distinguishes between technical ERRORs and business EXCEPTIONs:
- A technical
ERRORdenotes a technological failure, such as a Timeout caused by a network outage, or an unexpected error returned by your Task Worker. - A Business
EXCEPTIONrepresents an unhappy-path case in your business logic, such as when an item is out of stock or a credit card got declined.
If your Task Worker throws an uncaught error (depending on your language), then it is treated as a LittleHorse ERROR with the error code LHErrorType.TASK_FAILURE. However, sometimes your Task Worker notices that a business process-level failure (what LittleHorse calls an EXCEPTION) has occurred. For example, the Task Worker could notice that a credit card got declined. In this case, you can make the TaskRun throw a LittleHorse EXCEPTION by using the LHTaskException object.
The LittleHorse EXCEPTION result is NOT retryable. That means that if your Task Method throws an LHTaskException, it will not be retried. If it throws any error/exception other than the LHTaskException, it will be treated as a LittleHorse ERROR, which is retryable.
In the following example, we will throw the out-of-stock user-defined business EXCEPTION if the item is out of stock.
- Java
- Go
- Python
- C#
package io.littlehorse.quickstart;
import io.littlehorse.sdk.common.exception.LHTaskException;
import io.littlehorse.sdk.worker.LHTaskMethod;
class MyWorker {
@LHTaskMethod("ship-item")
public String shipItem(String itemSku) {
if (isOutOfStock(itemSku)) {
throw new LHTaskException("out-of-stock", "Some human readable message");
}
return "Item " + itemSku + " successfully shipped!";
}
}
package quickstart
import (
"github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto"
"github.com/littlehorse-enterprises/littlehorse/sdk-go/littlehorse"
)
const TaskDefName string = "ship-item"
func ShipItem(itemSku string) (string, error) {
if isOutOfStock(itemSku) {
return "", &littlehorse.LHTaskException{
Name: "out-of-stock",
Message: "some human readable message",
Content: &lhproto.VariableValue{},
}
}
return "Item " + itemSku + " successfully shipped!", nil
}
from littlehorse.exceptions import LHTaskException
async def ship_item(item_sku: str) -> str:
if is_out_of_stock():
raise LHTaskException("out-of-stock", "some descriptive message")
return f"successfully shipped {item_sku}!"
using LittleHorse.Sdk.Exceptions;
using LittleHorse.Sdk.Worker;
namespace Examples.BasicExample
{
public class MyWorker
{
[LHTaskMethod("ship-item")]
public String ShipItem(string itemSku)
{
if (IsOutOfStock(itemSku))
{
throw new LHTaskException("out-of-stock", "Some human readable message");
}
return $"Item {itemSku} shipped!";
}
}
}
The first argument to the LHTaskException constructor is the name of the EXCEPTION we are going to throw. This is useful if you want to be able to catch specific EXCEPTIONs with specific types in your Failure Handlers. The second argument is a human-readable error message that shows up on the NodeRun's output as the error_message field, which is useful for debugging purposes.
If you want to throw a Failure that has content which can be caught in your Failure Handler using the INPUT variable name, you can use a third argument named content. It is optional in python and is available in an overloaded method signature in Java. The below is an example of how you might throw such an EXCEPTION:
- Java
- Go
- Python
- C#
package io.littlehorse.quickstart;
import io.littlehorse.sdk.common.exception.LHTaskException;
import io.littlehorse.sdk.worker.LHTaskMethod;
class MyWorker {
@LHTaskMethod("ship-item")
public String shipItem(String itemSku) {
if (isOutOfStock(itemSku)) {
int daysUntilBackInStock = calculateDaysUntilBackInStock(itemSku);
// The `content` of the `Failure` that is thrown will be an INT variable containing
// the number of days until the item is expected to be back in stock.
throw new LHTaskException(
"out-of-stock",
"Some human readable message",
daysUntilBackInStock);
}
return "Item " + itemSku + " successfully shipped!";
}
}
package quickstart
import (
"github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto"
"github.com/littlehorse-enterprises/littlehorse/sdk-go/littlehorse"
)
const WorkflowName string = "basic-workflow"
const TaskDefName string = "ship-item"
func ShipItem(itemSku string) (string, error) {
if isOutOfStock(itemSku) {
daysUntilBackInStock := calculateDaysUntilBackInStock(itemSku)
return "", &littlehorse.LHTaskException{
Name: "out-of-stock",
Message: "some human readable message",
Content: &lhproto.VariableValue{
Value: &lhproto.VariableValue_Int{
Int: daysUntilBackInStock,
},
},
}
}
return "Item " + itemSku + " successfully shipped!", nil
}
from littlehorse import to_variable_value
from littlehorse.exceptions import LHTaskException
async def ship_item(item_sku: str) -> None:
if is_out_of_stock(item_sku):
days_until_back_in_stock = get_days_until_back_in_stock(item_sku)
failure_content = to_variable_value(days_until_back_in_stock)
raise LHTaskException("out-of-stock", "some descriptive message", content=failure_content)
return f"successfully shipped {item_sku}!"
using LittleHorse.Sdk.Common.Proto;
using LittleHorse.Sdk.Worker;
using LHTaskException = LittleHorse.Sdk.Exceptions.LHTaskException;
namespace Examples.BasicExample
{
class MyWorker
{
[LHTaskMethod("ship-item")]
public string shipItem(string itemSku)
{
if (IsOutOfStock(itemSku))
{
int daysUntilBackInStock = CalculateDaysUntilBackInStock(itemSku);
VariableValue content = new VariableValue
{
Int = daysUntilBackInStock
};
// The `content` of the `Failure` that is thrown will be an INT variable containing
// the number of days until the item is expected to be back in stock.
throw new LHTaskException(
"out-of-stock",
"Some human readable message",
content);
}
return $"Item {itemSku} successfully shipped!";
}
}
}
Json Deserialization
In some SDK's, LittleHorse will automatically deserialize JSON variables into objects or structs for you.
- Java
- Go
- Python
- C#
Let's say we have a class MyCar as follows:
class MyCar {
String make;
String model;
public MyCar(String make, String model) {
this.make = make;
this.model = model;
}
// getters, setters omitted
}
And one of the Variables (for example, my-obj) in our WfSpec is of type JSON_OBJ.
Let's say there's a TaskDef called json-example with one input variable of type JSON_OBJ. We can have a Task Worker defined as follows:
class MyWorker {
@LHTaskMethod("json-example")
public void executeTask(MyCar input) {
System.out.println(input.getMake());
System.out.println(input.getModel());
}
}
The Library will deserialize the JSON from something like: {"make": "Ford", "model": "Explorer"} to an actual MyCar object.
Let's say we have a struct MyCar as follows:
car := &MyCar{
Make: "Ford",
Model: "Explorer",
}
And one of the Variables (for example, my-obj) in our WfSpec is of type JSON_OBJ.
Let's say there's a TaskDef called json-example with one input variable of type JSON_OBJ. We can have a Task Function that looks like:
func MyTaskFunc(car *MyCar) string {
return "the make of your car is " + car.Make + "!"
}
The Library will deserialize the JSON from something like: {"make": "Ford", "model": "Explorer"} to an actual MyCar struct.
Let's say we have a python Task Function as follows:
async def describe_car(car: dict[str, Any]) -> str:
msg = f"You drive a {car['brand']} model {car['model']}"
return msg
The Library will deserialize the JSON from something like: {"brand": "Ford", "model": "Explorer"} to a python dict.
Let's say we have a class MyCar as follows:
public class MyCar
{
public string Make { get; set; }
public string Model { get; set; }
}
And one of the Variables (for example, my-obj) in our WfSpec is of type JSON_OBJ.
Let's say there's a TaskDef called json-example with one input variable of type JSON_OBJ. We can have a Task Worker defined as follows:
class MyWorker
{
// Deserialization from json string to custom object
[LHTaskMethod("show-car")]
public void ShowCar(MyCar car)
{
Console.WriteLine($"Make: {car.Make}");
Console.WriteLine($"Model: {car.Model}");
}
// Deserialization from json string to dictionary object
[LHTaskMethod("print-car")]
public void PrintCar(Dictionary<string, string> car)
{
Console.WriteLine($"Make: {car["Make"]}");
Console.WriteLine($"Model: {car["Model"]}");
}
}
The Library will deserialize the JSON from something like: {"Make": "Ford", "Model": "Explorer"} to an actual MyCar or Dictionary object.
Accessing Metadata
Sometimes, your Task Worker needs to know something about where the TaskRun came from. Each LittleHorse SDK offers a WorkerContext object or struct that exposes this metadata to the Task Worker.
- Java
- Go
- Python
- C#
If you need to access metadata about the Task Run that is being executed, you can add a WorkerContext parameter to the end of your method signature for the Task Method.
Let's say you have a TaskDef with one input parameter of type INT. You can access the WorkerContext by doing the following:
class SomeWorker {
@LHTaskMethod("my-task")
public void doTask(long inputLong, WorkerContext context) {
String wfRunId = context.getWfRunId();
TaskRunId taskRunId = context.getTaskRunId();
NodeRunId nodeRunId = context.getNodeRunId();
Date timeWhenTaskWasScheduled = context.getScheduledTime();
context.log(
"This is a message that gets sent to the log output on the scheduler"\
);
int attemptNumber = context.getAttemptNumber();
if (attemptNumber == 0) {
// then this is the first time this Task Run has been attempted.
} else {
// then this is a retry.
}
// This is a constant value between all attempts for this TaskRun.
// Useful to allow retries to third-party API's that accept idempotency
// keys, such as Stripe.
String idempotencyKey = context.getIdempotencyKey();
}
}
If you need to access metadata about the Task Run that is being executed, you can add a WorkerContext parameter to the end of your method signature for the Task Method.
Let's say you have a TaskDef with one input parameter of type INT. You can access the WorkerContext by doing the following:
func DoTask(long inputLong, context *littlehorse.WorkerContext) {
wfRunId := context.GetWfRunId();
taskRunId := context.GetTaskRunId();
nodeRunId := context.GetNodeRunId();
timeWhenTaskWasScheduled := context.GetScheduledTime();
context.Log(
"This is a message that gets sent to the log output on the scheduler",
);
attemptNumber := context.GetAttemptNumber();
if (attemptNumber == 0) {
// then this is the first time this Task Run has been attempted.
} else {
// then this is a retry.
}
idempotencyKey := context.GetIdempotencyKey();
}
If you need to access metadata about the Task Run that is being executed, you can add an LHWorkerContext parameter to the end of your method signature for the Task Method.
Let's say you have a TaskDef with one input parameter of type INT. You can access the LHWorkerContext by doing the following:
async def greeting(name: str, ctx: LHWorkerContext) -> str:
task_run_id = ctx.task_run_id
node_run_id = ctx.node_run_id
wf_run_id = ctx.node_run_id
time_task_was_scheduled = ctx.scheduled_time
attempt_number = ctx.attempt_number
if attempt_number > 0:
# this is a retry
pass
else:
# this is not a retry
pass
idempotency_key = ctx.idempotency_key
return "asdf"
If you need to access metadata about the Task Run that is being executed, you can add a LHWorkerContext parameter to the end of your method signature for the Task Method.
Let's say you have a TaskDef with one input parameter of type INT. You can access the LHWorkerContext by doing the following:
public class MyWorker
{
[LHTaskMethod("task")]
public void ProcessTask(long requestTime, LHWorkerContext context)
{
context.Log("ProcessPayment");
Console.WriteLine($"Processing request time: {requestTime}");
Console.WriteLine($"The Workflow Run Id is: {context.GetWfRunId()}");
Console.WriteLine($"The Node Run Id is: {context.GetNodeRunId()}");
Console.WriteLine($"The Task Run Id is: {context.GetTaskRunId()}");
Console.WriteLine($"The Idempotency Key is: {context.GetIdempotencyKey()}");
Console.WriteLine($"The Attempt Number is: {context.GetAttemptNumber()}");
Console.WriteLine($"The Scheduled Time is: {context.GetScheduledTime()}");
Console.WriteLine($"The User Group is: {context.GetUserGroup()}");
Console.WriteLine($"The User Id is: {context.GetUserId()}");
}
}
Best Practices
Client ID
Every Task Worker instance should have a unique LHC_CLIENT_ID set in its configuration. This is important so that you can audit which client executed which Task, and also so that the LittleHorse Kernel can efficiently assign partitions of work to your Task Workers.
Idempotence
With all workflow engines, it is best when your tasks are idempotent. You can use the NodeRunIdPb from WorkerContext::getNodeRunId() as an idempotency key.