Skip to main content

Threads

A single program can execute multiple things at once (in parallel) using threads. Likewise, a single WfRun can do multiple things at once using threads.

Concepts

Every NodeRun in LittleHorse is part of a ThreadRun, and every ThreadRun is part of a WfRun. Up until this point in our LittleHorse Concepts series, every WfSpec we have seen has only one ThreadSpec and every WfRun has only one ThreadRun. Using multiple threads in your workflow allows you to execute work in parallel. Some use-cases for threads include:

  • Parallelizing computation for performance reasons.
  • Executing different business processes in parallel (for example, requesting approval from different departments).

Thread Hierarchy

Every WfSpec has a special ThreadSpec called the Entrypoint thread. When you run a WfRun, you are actually starting the entrypoint ThreadSpec. No matter how many ThreadRuns you have in your WfRun, the status and lifecycle of the WfRun always reflects the status of your entrypoint ThreadRun.

Every ThreadRun in a WfRun except for the Entrypoint ThreadRun has a Parent ThreadRun. Any ThreadRun can create a Child by explicitly starting a child ThreadRun (we will see how to do that very soon). Crucially, every ThreadRun in your WfRun executes in parallel.

Variable Scoping

Previously, we saw how you can declare variables in your WfSpec. What we actually did was declare variables at the entrypoint ThreadSpec level. Every ThreadSpec can declare its own variables.

In LittleHorse, a ThreadRun has the ability to access (both read and write) its own variables and also any variables that its parent has access to (this is recursive all the way up to the Entrypoint ThreadRun). However, a Parent ThreadRun cannot access the variables of its children ThreadRuns.

Waiting for Threads

If a ThreadRun starts a Child ThreadRun, it can deliberately wait for that Child to terminate (either complete or fail). As we will see in the Exception Handling section, any failure thrown by the Child is received by the Parent when the Parent waits for the Child.

If a Parent ThreadRun reaches the EXIT node before all of its Children have terminated, the EXIT NodeRun will remain RUNNING until all Children have terminated. This guarantees that when a ThreadRun is terminated, then all of its children are terminated.

note

By extension, when an Entrypoint ThreadRun is terminated, we know that all Children of the Entrypoint are terminated, which is why we use the Entrypoint's status as the WfRun's status.

In Practice

In this example, we will define a fictitious WfSpec that illustrates some useful concepts about how threads work in LittleHorse. Our WfSpec's Entrypoint thread will create a Child ThreadRun, sleep a little bit, execute a TaskRun, and then wait for the Child to finish.

note

In this example, we will show how you can deliberately start a Child ThreadRun in your WfRun. However, Child ThreadRuns can also be created in two other ways as well:

  1. Through Interrupts
  2. Through Exception Handling
warning

One potential pitfall when using threads in LittleHorse is a limit to the number of threads that you can put into one WfRun. Since each ThreadRun shows up as part of the WfRun protobuf object (much unlike NodeRuns which have no limits), you don't want to go beyond 100-200 ThreadRuns within a single WfRun.

Building the WfSpec

We will build a demonstration WfSpec that has complex interactions between a parent and child thread. Specifically, the parent and child will both utilize and modify a shared variable (owned by the Parent, of course). We will use the sleepSeconds() utility to make the ThreadRuns wait long enough for us to observe what's going on in the dashboard.

Background: the Task

Our Task Worker will make use of the WorkerContext to print out exactly which ThreadRun is calling it. This will let us look at the Task Worker logs in order to easily see what's going on as we build our mental model of threads in LittleHorse.

package io.littlehorse.quickstart;

import io.littlehorse.sdk.common.config.LHConfig;
import io.littlehorse.sdk.worker.LHTaskMethod;
import io.littlehorse.sdk.worker.LHTaskWorker;
import io.littlehorse.sdk.worker.WorkerContext;

class MyWorker {

@LHTaskMethod("my-task")
public String myTask(String input, WorkerContext context) {
int threadRunNumber = context.getNodeRunId().getThreadRunNumber();
String threadName = (threadRunNumber == 0 ? "parent" : "child");
String result = "Hello from the " + threadName + " thread: " + input;
System.out.println(result);
return result;
}
}

public class Main {
public static void main(String[] args) throws Exception {
LHConfig config = new LHConfig();
LHTaskWorker worker = new LHTaskWorker(new MyWorker(), "my-task", config);
worker.registerTaskDef();
Runtime.getRuntime().addShutdownHook(new Thread(worker::close));
worker.start();
}
}

The WfSpec Code

Here's the code for the WfSpec. We'll walk through everything that happens as we run it:

package io.littlehorse.quickstart;

import io.littlehorse.sdk.common.config.LHConfig;
import io.littlehorse.sdk.common.proto.VariableMutationType;
import io.littlehorse.sdk.wfsdk.SpawnedThread;
import io.littlehorse.sdk.wfsdk.SpawnedThreads;
import io.littlehorse.sdk.wfsdk.WfRunVariable;
import io.littlehorse.sdk.wfsdk.Workflow;
import io.littlehorse.sdk.wfsdk.WorkflowThread;
import java.util.Map;

class MyWorkflow {
public static final String WF_NAME = "threads-example";

// Put this variable here so that it's accessible in the child function.
private WfRunVariable parentVariable;

public void entrypointThreadLogic(WorkflowThread wf) {
parentVariable = wf.declareStr("parent-var").withDefault("This is the parent variable's initial value");

wf.execute("my-task", parentVariable);

// Launch a child thread and then sleep for 15 seconds
SpawnedThread child = wf.spawnThread(
this::childThreadLogic, "child", Map.of("child-input", "This is the input to the child thread"));

// Sleep 15 seconds (this gives time for the child to mutate our variable)
wf.sleepSeconds(25);
wf.execute("my-task", parentVariable);

// wait for the child!
wf.waitForThreads(SpawnedThreads.of(child));
}

public void childThreadLogic(WorkflowThread wf) {
// Child threads can take in input variables too! You must set them when
// starting the child thread in the calll to spawnThread()
WfRunVariable childInput = wf.declareStr("child-input").required();
wf.execute("my-task", childInput);

// Child threads can use the parents' variables
wf.execute("my-task", parentVariable);

// Child threads can also mutate the parents' variables:
wf.mutate(
parentVariable,
VariableMutationType.ASSIGN,
"This is the value of the parent variable set by the child.");

// Child will sleep before finishing
wf.sleepSeconds(45);
}

public Workflow getWorkflowGenerator() {
return Workflow.newWorkflow(WF_NAME, this::entrypointThreadLogic);
}
}

public class Main {

public static void main(String[] args) throws Exception {
LHConfig config = new LHConfig();
MyWorkflow mywf = new MyWorkflow();
mywf.getWorkflowGenerator().registerWfSpec(config.getBlockingStub());
}
}

Running the WfSpec

Open the WfSpec and run it from the dashboard. You won't have to pass any input variables—we've hard-coded it into our WfSpec using default values.

So what happens? First, you'll notice on the top of the page that there are two ThreadRuns that you can look at:

A WfRun diagram showing the entrypoint thread with the option to choose the child above
Two Threads Now Visible

The parent ThreadRun is waiting at a SLEEP node, just after starting the child (that happened on the node that is a diamond with a big + sign).

If you look at the value of the parent-variable below the WfRun diagram, you'll see that the child has already modified it.

note

If you click on the output of the my-task node, you can clearly see that the value is set to This is the parent variable's initial value.. This is because that TaskRun was executed before the Child was started.

If we click on the child-1 (or if we click on the START_THREADS node), we see the following:

A WfRun diagram showing the child threadrun
The Child ThreadRun

Note that the Child ThreadRun is waiiting on a SLEEP node that will take 45 seconds to mature. The Parent's SLEEP node will take 25 seconds tot mature, which means that the parent will advance to the WAIT_FOR_THREADS node and end up waiting for the child.

If we wait a few seconds longer, we'll notice that the Parent has gotten past the SLEEP node and is waiting for the child:

A WfRun diagram showing the parent waiting for the child
Parent Waiting for Child

Finally, you can see that the value of the parent-variable has been modified by the child thread (this is the wf.mutate() call we made from the childThreadLogic method):

A modal on the dashbord showing that the value of the parent's variable has been modified by the child
The Parent's Variable Modified by Child

Further Resources

Congrats on learning how threads work in LittleHorse!