Workflows and Tasks
The code examples in the videos are in Java. The same examples are also available in Python and Go in the instruction notes.
LittleHorse is a developer-friendly workflow engine. In LittleHorse, users define workflow specifications in a programming language, such as Java, Python, Go and .Net. It delivers the durable execution of application code and automatically handles failures as if they didn’t exist. Developers can then focus on the business logic and purposes that applications deliver.
Workflows: WfSpec
and WfRun
Workflow is a series of different well-defined tasks or events to accomplish your business outcome. A workflow or process could consist of various tasks, systems and devices, cross-functional teams, and human interaction.
In LittleHorse, a workflow specification (WfSpec
) is a blueprint that defines the configuration of your workflow. It is a programming script that you write to tell the LittleHorse Engine how to create a workflow, the order of running different tasks and services, and how to handle system failure and business process-level exception.
Here is an example of a workflow. This workflow has one task, which prints a "hello" message.
A Workflow Example in LH Dashboard
In LittleHorse, developers can write the following WfSpec
to define such a workflow.
- Java
- Python
package io.littlehorse.quickstart;
import io.littlehorse.sdk.common.proto.VariableType;
import io.littlehorse.sdk.wfsdk.WfRunVariable;
import io.littlehorse.sdk.wfsdk.Workflow;
import io.littlehorse.sdk.wfsdk.WorkflowThread;
public class QuickstartWorkflow {
public static final String WF_NAME = "quickstart";
public static final String GREET = "hello-task";
public void quickstartWf(WorkflowThread wf) {
WfRunVariable name = wf.declareStr("input-name").searchable();
wf.execute(GREET, name);
}
public Workflow getWorkflow() {
return Workflow.newWorkflow(WF_NAME, this::quickstartWf);
}
}
import logging
import littlehorse
from littlehorse.config import LHConfig
from littlehorse.model.common_enums_pb2 import VariableType
from littlehorse.workflow import Workflow, WorkflowThread
from worker import greeting
logging.basicConfig(level=logging.INFO)
# The logic for our WfSpec (worfklow) lives in this function!
def get_workflow() -> Workflow:
def quickstart_workflow(wf: WorkflowThread) -> None:
# Define an input variable
the_name = wf.add_variable("input-name", VariableType.STR).searchable()
# Execute the 'greet' task and pass in the variable as an argument.
wf.execute("greet", the_name)
# Provide the name of the WfSpec and a function which has the logic.
return Workflow("quickstart", quickstart_workflow)
def main() -> None:
logging.info("Registering WfSpec and TaskDef")
# Configuration loaded from environment variables
config = LHConfig()
wf = get_workflow()
littlehorse.create_task_def(greeting, "greet", config)
littlehorse.create_workflow_spec(wf, config)
if __name__ == "__main__":
main()
After you run a WfSpec
, a WfRun
will be created as a running instance. If you run the same WfSpec
with a different input name, then a different WfRun
will be created in the LittleHorse server.
Tasks: TaskDef
and TaskRun
In LittleHorse, a task worker, which resides on your application server, initiates a connection to the LittleHorse Server and listens to the task queue. It does the actual "work" and executes a task in a workflow.
A Task Definition (TaskDef
) is a LittleHorse API object to tell LittleHorse Engine that a certain task will be included and executed in the workflow. Once a TaskDef
is executed, a TaskRun
is created in the LittleHorse Server, making the task available for execution in LittleHorse.
TaskDef to TaskRun
The task worker then executes the task in the workflow and reports the result back to the TaskRun
in the LittleHorse Server.
TaskWorker runs task
Here's a simple example of Java code with the LHTaskMethod
. In reality, this is your application code or a business service.
- Java
- Python
package io.littlehorse.quickstart;
import io.littlehorse.sdk.worker.LHTaskMethod;
public class Greet {
@LHTaskMethod("hello-task")
public String greet(String name) {
String message = "Hello " + name;
System.out.println(message);
return message;
}
}
import asyncio
import logging
import littlehorse
from littlehorse.config import LHConfig
from littlehorse.worker import WorkerContext, LHTaskWorker
logging.basicConfig(level=logging.INFO)
async def greeting(name: str, ctx: WorkerContext) -> str:
msg = f"Hello {name}!. WfRun {ctx.wf_run_id}"
print(msg, flush=True)
return msg
The LHTaskMethod
tells the LittleHorse SDK to call the greet()
method when there is a TaskDef
with the name "hello-task".
Let's write the code to register and run the Task Worker to executes the greet()
method.
- Java
- Python
package io.littlehorse.quickstart;
import java.io.IOException;
import io.littlehorse.quickstart.Greet;
import io.littlehorse.quickstart.QuickstartWorkflow;
import io.littlehorse.sdk.common.config.LHConfig;
import io.littlehorse.sdk.worker.LHTaskWorker;
public class AppWorker {
// This method registers the TaskDef and WfSpec to the LH Server
public static void main(String[] args) throws IOException, InterruptedException {
LHConfig config = new LHConfig();
// Generate the TaskDef
Greet executable = new Greet();
LHTaskWorker greetWorker = new LHTaskWorker(executable, "hello-task", config);
Runtime.getRuntime().addShutdownHook(new Thread(greetWorker::close));
greetWorker.registerTaskDef();
greetWorker.start();
// Register the WfSpec
QuickstartWorkflow quickstart = new QuickstartWorkflow();
quickstart.getWorkflow().registerWfSpec(config.getBlockingStub());
}
}
import asyncio
import logging
import littlehorse
from littlehorse.config import LHConfig
from littlehorse.worker import WorkerContext, LHTaskWorker
logging.basicConfig(level=logging.INFO)
async def greeting(name: str, ctx: WorkerContext) -> str:
msg = f"Hello {name}!. WfRun {ctx.wf_run_id}"
print(msg, flush=True)
return msg
async def main() -> None:
logging.info("Starting Task Worker!")
# Configuration loaded from environment variables
config = LHConfig()
await littlehorse.start(LHTaskWorker(greeting, "greet", config))
if __name__ == '__main__':
asyncio.run(main())
The LHTaskWorker
object, which is from the LittleHorse SDK, helps you create a TaskDef
for the task hello-task
, which, in our example, refers to the greet()
method.
Once you register the WfSpec, you can execute your workflow by lhctl
or through the LittleHorse Dashboard. After you run the task worker and start a workflow with an input-name, you will get a WfRun
and all the related information about the variables, tasks and workflow that you define in LittleHorse.
I encourage you to also check the quickstart course "Getting-started", which walks you through the LittleHorse installation process and runs a simple "hello" message step by step.
Wrapping Up
In this session, we covered the concepts of tasks and workflows: WfSpec
, WfRun
, TaskDef
and TaskRun
.
In the next video, we will get some hands-on practice to write a WfSpec
and a Task Worker with TaskDef
to add a second task into our workflow. We will also check the LittleHorse CLI and Dashboard in detail.
In the meantime, if you haven't done so already:
- Join the LittleHorse Slack Community
- Give us a star on GitHub
- Check out our documentation