Skip to content

Workflow inner workings

Introduction

A published workflow can be synchronous or asynchronous.
The differences are in how the workflow is used via the API and the inner workings, described below, which affect performance and economics.

The possibility to publish a workflow in one way or another depends on:

Synchronous workflows

API behavior

A synchronous workflow responds to API requests in a synchronous way: the client software makes one call that sends the request, hangs while the workflow produces the response and unblocks when it receives the response.

A possible comparison is with buying an hot dog from a street vendor: the customer shows up, asks for his hot dog and waits until it is ready.

Synchronous mode is suitable for stream processing with simple workflows.

All that is needed to use a synchronous workflow is one API endpoint.

Architecture

If a runtime is set up to host synchronous workflows its Kubernetes cluster contains:

  • A pod for the API gateway
  • A pod for the workflow orchestration service, shared between all workflows, which also takes care of the execution of JavaScript Interpreter v. 1.0.0 blocks and operators' blocks.
  • For each published synchronous workflow, if there are model, processor or custom components' blocks, a pod for each replica of each of those blocks.

If a workflow consists only of JavaScript Interpreter v. 1.0.0 blocks and operators' blocks, there are no workflow specific pods and everything is in charge of the orchestration service which is one per runtime, shared between all workflows.

How they work

When a new request for a workflow execution arrives, the API gateway puts it in the work queue of the orchestration service. The orchestration service finds the request and starts executing the workflow. It directly takes care of executing blocks of these components:

When it encounters a model or processor block in the flow, it prepares the JSON input for it, following the rules of the input mapping or taking the implicit input, and then invokes the service inside the block. If the block has replicas, one is chosen randomly.
The orchestration service receives a response from the block and continues processing the flow with the next block or, if it is finished, it returns the overall workflow output to the API gateway.
At the first error it encounters while processing the flow, the orchestration service returns the response containing the error detail to the caller via the API gateway and does not proceed further.

Parallelism

The fact that the workflow is synchronous does not mean that it cannot handle multiple requests at the same time. Synchronous is just the way a client interacts with the workflow, but internally one block of the workflow can be engaged with one request while another block manages another.

The comparison is with a car assembly line: the various stations of the assembly line can each work on a different car. In the same way, in a workflow each block is comparable to an assembly station and therefore a workflow made of at least two blocks can handle multiple requests at the same time: while block B is dealing with request 1, block A can already be dealing with request 2: multiple requests can be in processing within the workflow at the same time.

Clearly a workflow made up of a single block cannot handle multiple requests at the same time, unless the block is replicated.
Having replicas of a block is like having replicas of an assembly station.
Let's say that the first station installs the rear suspension while the second installs the engine. Installing the engine takes longer, say 20 minutes, than installing the rear suspension, say 10 minutes. If the other stations take 10 minutes and the semi-finished products to assemble a car are ready every 10 minutes, what happens is that:

  1. A growing queue of semi-finished products accumulates at the assembly stations.
  2. The first station is blocked with a car it already processed for half the time.
  3. The stations downstream of the second are inactive for half the time.
  4. The assembly line churns out a car every 20 minutes, the time required by the slowest station.

A similar workflow might have the first block take 100 milliseconds on average, the second 200 milliseconds, and the remaining 100 milliseconds. If a request arrives every 100 milliseconds, this happens:

  1. Requests pile up at the workflow entrance.
  2. The first block wastes half its time waiting for the second block to become available.
  3. Blocks downstream from the second are idle half the time.
  4. The workflow produces a result every 200 milliseconds.

The secret of the assembly line is to have stations, possibly excluding the last one, that work more or less at the same speed, dividing the work into phases of similar duration. If this is not possible, the slower stations can be replicated.
With two replicas of the second station, this happens: the first station completes the first car in 10 minutes and passes it to replica 1 of the second station, which begins a 20-minute job. After 10 minutes, the first station has a second car ready and passes it to replica 2 of the second station. After another 10 minutes, the first station has a third car ready and can move on to replica 1 of the second station, which has just finished working on the first car. Thanks to the two replicas of the second station:

  1. No semi-finished products accumulate at the assembly stations.
  2. All assembly stations are always active: there are no blocks and no inactivity.
  3. The assembly line churns out a car every 10 minutes.

The same goes for the analogous workflow. By adding a replica of the second block, both the first block and the blocks downstream of the second work more and the workflow's response rate approaches the request rate.

Having replicas increases the workflow's ability to handle more requests at the same time, because while one replica of a block is busy with a request, another replica of the same block can work on another.
Adding replicas is useful, in case of multiple simultaneous clients, even with a single block: instead of all queuing on the single replica, concurrent requests are distributed among all replicas, reducing queuing, reducing response time and increasing throughput; a bit like having two identical workflows with a load balancer in front of them.

When determining the number of replicas, as for the assembly line, give more replicas to the slower blocks. To determine the speed of individual blocks, the debug mode in interactive tests is useful.
For example, if block A takes on average twice as long as block B, it makes sense for it to have twice as many replicas. How many replicas to give to block B depends on the number of simultaneous clients and the performance you want to achieve.

Further parallelism can be obtained:

  • By creating independent flows, in fact a workflow can contain multiple flows and the only synchronization moments are at the beginning and end of the flows.
  • Creating branches with Fork and Join operators, either explicit or implicit.

Performance ceiling

Performance does not improve infinitely by adding replicas: given the role of the orchestration service, which is multi-threaded, but not replicated, after a certain number of replicas performance does not improve and by adding more replicas it actually gets worse.
As a rough guide, the peak of cumulative multi-client throughput, with a factory configuration, is with 10 concurrent clients and 10 replicas for the slowest block, but throughput can be increased—and considerably—by giving more computational resources to the orchestration service.

Throughput can further grow by using two or more runtimes with the same workflow behind a load balancer. This technique also adds fault tolerance to the system, because if one runtime fails the remaining ones respond. The stateless nature of synchronous workflows makes this configuration quite easy to implement.

Cost

Keeping a workflow running, processing or ready to process, requires CPU and RAM, therefore computers that have a cost and consume energy.

If the NL Flow runtime cluster is on physical computers, typically the number of computers is established a priori and the computers are always on, whether the workflows are published or not. This setup has costs that we can define as fixed: the purchase or rental of the hardware, maintenance, the cost of energy and the network.

If instead, as is more frequent, the cluster is provided by Cloud services such as AWS EKS or Azure AKS, it can benefit from the Kubernetes autoscaler: new nodes—that is virtual computers—are procured on demand, turned on and added to the cluster when needed, then terminated when empty and no longer needed.

Publishing a synchronous workflow means requesting immediately and as long as the workflow remains published, the maximum number of replicas for each block and the allocation of all the CPU and RAM required by each replica. These are all settings decided by whoever designs the workflow.
This can lead to adding nodes to the cluster, if the existing nodes do not have sufficient resources.

The cost of the workflow can be calculated in required computers, dividing the computational footprint of the workflow by the capacity of a computer, which is chosen when installing the runtime.
For example, if a workflow has a computational footprint—visible in in the editor, on the left in the toolbar, and in the list of workflows—of 18000 thousandths of CPU and 40 GiB RAM and computers with 8 CPUs and 32 GiB of RAM are used, considering that a node has a quantity of resources available for workflows a little lower than its capacity due to the system software—operating system, Kubernetes—, three nodes are needed, one of which will remain partially empty or can be used in condominium with other workflows.

A published synchronous workflow costs the same whether it's used or not and this can be a waste of money and energy.
When a workflow is unpublished, all replicas of all blocks are deleted, freeing up computer resources. Following this and possible pod redistribution, one or more computers can become empty and the Kubernetes autoscaler then terminates them, zeroing their costs.
For this reason, if it is known that a workflow is not used on weekends, at night or in other time slots, it makes sense to agree with the installation administrator on the schedule of automatic unpublishing and republishing, or even shutdown and restart of the entire runtime cluster.

Asynchronous workflows

API Behavior

An asynchronous workflow responds to requests asynchronously: the client software sends a request to the task creation endpoint and immediately gets back a task ID and is free to do something else.
The workflow executes the task, taking as long as it needs. If the client software inquires about the task status during this phase, it will be RUNNING. After the task is finished, it will be COMPLETED.
The client software makes a special request to get back the result of COMPLETED requests, then another request to delete the task. Undeleted tasks are automatically deleted by the NL Flow runtime after 24 hours.

A possible comparison is that of an online purchase with delivery to a locker: the customer buys the goods and quickly receives confirmation of his order. Every now and then he checks for updates and when the goods are delivered he collects them from the locker.

Asynchronous is suitable for batch processing and complex workflows.

The API provides several endpoints for the workflow: one to create the task, one to inquire about its status, one to get the result, one to delete tasks and one to get the original request used to create the task, if desired.
Additional endpoints are also provided to get information about the tasks and delete them as a group, all or by status.

Architecture

If a runtime is set up to host asynchronous workflows, its Kubernetes cluster contains:

  • A pod for the API gateway
  • A pod for the task management service, shared between all workflows
  • A pod for the workflow orchestration service, shared between all workflows
  • For each published asynchronous workflow:

    If there are blocks of:

    a pod for each replica of each of these blocks.
    Then:

    • A pod for each replica of the service dedicated to producing the output.
    • A pod for each replica of the possible service that takes care of the execution of all the JavaScript Interpreter 2.0.0, Python Interpreter, Map and Switch blocks.
    • A pod for each replica of the service that takes care of the execution of all the blocks developed with legacy technology including JavaScript Interpreter v. 1.0.0.

The amount of replicas active at any given time for blocks and services is determined by the maximum number of replicas set by the designer, the possible activation of block autoscaling and, in that case, the instantaneous load and the autoscaling parameters.

How they work

Requests to execute a workflow are submitted via the API gateway to the task management service.
For each request received, the management service creates a new task, returns the task ID to the requester, stores the JSON input in a storage, writes a message to the queue of the workflow orchestration service and puts the task in the SUBMITTED status.

The workflow orchestration service reads the message from the task management service, writes a message to the queue of all the first blocks of the workflow flows and writes a message to the queue of the task management service to signal the fact that the workflow has started. In reaction to this, the task management service puts the task in the RUNNING status.

During the flow processing, the orchestration service, in addition to its main job of orchestrating the overall execution, directly takes care of the execution of the operator blocks, excluding Map and Switch that are executed by the shared script interpreter service as they are based on JavaScript.

Block and service replicas constantly poll their message queues.
As written above for the first blocks of the flows, to make a replica of a block work, the orchestration service writes a message to the queue of the block, that is a queue that all replicas access concurrently. The first replica that arrives takes the message, gets the input from storage, following the rules of the input mapping or taking the implicit input.
The replica does its job then writes the JSON output to storage and writes a message to the orchestration service queue to signal that it is done.
The orchestration service reads the messages in its queue and proceeds with the flow as above, possibly taking care of:

  • Sending messages to multiple downstream blocks when the flow branches (downstream of a Fork, a Join-Fork or possibly also a Switch when multiple conditions are satisfied).
  • Sending messages to the same block iteratively when the flow enters or is in a context generated by a splitter or remapper).
  • Wait for all upstream blocks to finish before proceeding, at convergence points (Join, Join-Fork, End Context and End Switch).

At the end of the flow or in case of errors exceeding the maximum number allowed, the orchestration service writes a message to the queue of the workflow output production service that stores the output itself in storage and communicates to the task management service to set the task status to COMPLETED or ERROR.

The task management service:

  1. Responds to requests on the status of tasks.
  2. Responds to requests to have the response of tasks.
  3. Can provide the initial request with which a task was created.
  4. Handles requests to delete tasks.
  5. Puts RUNNING tasks in the DROPPED status when task duration exceeds the workflow timeout or when the workflow is unpublished.

Parallelism

Like a synchronous workflow (see above), even in an asynchronous workflow the various blocks and services can be engaged at the same time on multiple different tasks, like an assembly line that works on multiple cars at the same time.
The difference here is that while in the synchronous workflow it is the orchestration service that "invokes" the replicas of the model and processor blocks, in the asynchronous workflow it is the replicas of blocks and services that procure work to do by themselves by reading the message queues (self-service mechanism).

Like synchronous workflows, asynchronous workflows can also be parallelized by creating independent flows and creating branches with explicit or implicit Fork and Join operators, but asynchronous workflows can then have a further type of parallelism with Switch and contexts.
In fact, a Switch block can provide that multiple downstream branches are followed if multiple conditions are satisfied, and this happens in parallel, similarly to a Join.

In a context, created with a splitter or a remapper, the part of the flow inside the context is executed with the maximum possible parallelism, based on the available replicas, to possibly process multiple items at the same time.
For example, if a PDF Splitter block produces 10 items, that is ten single-page PDFs, and in the context there is a TikaTesseract Converter block followed by a symbolic model, if these two blocks have a maximum of one replica each the workflow will only be able to process one item at a time, one after the other.
If instead the blocks have replicas, the items are distributed between them.
Continuing with the example, if the TikaTessercat Converter block has 2 replicas and the model block has 5, TikaTesseract Converter will be able to work on two PDF pages at the same time. When a replica produces the text of a page, any replica of the downstream model can work on that text, independently from the other pages. The 5 replicas of the model continuously poll the message queue of their block to grab page texts as they materialize, no matter in what order.
If TikaTesseract Converter is faster, on average, than the model, it makes sense to give more replicas to the model block because the page texts will be ready relatively quickly and the more replicas of the model can "consume" them the better for performance.
The end of the context represents a sync point: if the model replicas quickly finish 9 pages, perhaps there can be a page text that takes longer and the flow does not proceed downstream of the context until that is finished as well.
In principle, however, the more blocks are replicated within a context, the shorter the context lasts for a given workflow request, because more of the items produced by the splitter or remapper can be processed at the same time.

In general, the rule for determining the number of replicas is the same as for synchronous workflows: give more replicas to the slower blocks.

Another important difference with synchronous workflows is that the degree of parallelism of blocks and services of an asynchronous workflow can change dynamically based on the load by enabling workflow autoscaling.
At rest, the number of replicas of a block or service can be low or even zero. Then, as load materializes, a number of replicas that is proportional to the load is turned on, increasing the parallelism and hence the throughput.
When the load goes down, replicas are removed.

Performance ceiling

The ceiling phenomenon can occur for an asynchronous workflow as well as for a synchronous one, but in the case of less than elementary workflows, the asynchronous workflow has an advantage because the role of the orchestration service is reduced: the self-service capability of blocks and services reduces the couplings and performance ceiling tends to occur higher up.

By appropriately sizing the task management service, the workflow orchestration service, the replicas of blocks and services and, possibly, the autoscaling parameters based on the load, the phenomenon can be avoided altogether or moved relatively much higher up.

Due to the stateful nature of asynchronous workflows, the path of balancing multiple runtimes to scale performance and add fault tolerance is ess easy to follow than with synchronous workflows, but is still viable. The tricky part is configuring the affinity between the load balancer and the runtime, which is necessary for the correct management of tasks from the client perspective: the runtime that creates a task must be the same that knows about it status and can return its response.

Cost

In general, what is stated above about the cost of synchronous workflows is also true for asynchronous workflows. If workflow autoscaling is disabled, it's indeed the same: all the computers that are necessary to run all the replicas of every block and service are turned on immediately and kept on until the workflow is unpublished. This guarantees maximum readiness and performance, but is also the most expensive option and can lead to a waste of money and energy when the workflow is kept published but is not used.

With workflow autoscaling enabled there are interesting differences because the number of computers turned on can vary based on the load and theoretically drop to zero, if not considering the computers that host the basic software or other workflows.
This makes costs less preventable, but surely gives the opportunity for savings.
The choice between enabling or not enabling autoscaling and between the various autoscaling configurations is driven by the performance you want to achieve. Speed ​​has a price, so performance-oriented autoscaling settings will result in more computers turned on longer compared to an economy-oriented configuration that however is less responsive to load and has a lower average throughput.

Autoscaling

Overview

Autoscaling is a feature of asynchronous workflows and consists of dynamically increasing or decreasing the number of replicas of blocks and shared services as the load, that is processing requests, varies.

The user chooses whether or not to activate autoscaling when publishing the asynchronous workflow in a runtime.
The parameters that govern the autoscaling of replicas are chosen at that time and are the same for all blocks and shared services, however each block or service can scale differently because it can have a different load.
The instantaneous load on a block or service, in fact, depends on various variables: the point in the flow, the activity and speed of the block/service and of the upstream blocks, the quantity of requests that the workflow is managing at the same time. For example, a splitter can have a load of 1 when the blocks in its context have a load of n, with n equal to the number of items that the splitter produces. Also, a fast block has a lower load than a slow block because it clears its queue more quickly.

Autoscaling allows you to save money and energy because the computational footprint (CPU and RAM) of the workflow is reduced or even eliminated when there is no load. On the other hand, a workflow with autoscaling active cannot be as reactive as one without autoscaling that always has all the replicas of all the blocks and services always active.

If autoscaling is enabled, the system periodically measures the load on the single block or shared service that can have replicas.
Based on the measured load, it checks whether there are conditions to scale the replicas up or down.

Load measurement

Every time interval corresponding to the Polling Interval parameter, the system measures the load of each block or shared service that can have replicas.

If Mode is equal to Queue Length, the load is the number of messages in the queue of the block or service.
If Mode is equal to Message Rate, the load is the average amount of messages entered into the queue each second during the previous polling interval.

Number of required replicas

Once the load is measured, the system determines the number of required replicas.

The number of replicas cannot be less than Min Replicas and cannot be greater than the maximum value set by the designer for the block or service.

The target value is calculated by dividing the load by Value:

Target = load / Value

Considering the minimum threshold and the maximum value, the complete formula is:

Required replicas = MAX(Min Replicas, MIN(maximum set by designer, target))

Need, direction and confirmation of scaling

Need and direction of scaling—up or down—are determined by this rule:

  • Required replicas = replicas currently turned on? → no scaling
  • Required replicas > replicas currently turned on? → scale up
  • Required replicas < replicas currently turned on? → scale down

In the special case of scaling "from zero", that is with no replicas already turned on, the system evaluates the Activation Value parameter and confirms the scaling only if the load is higher than the value of this parameter.

Scale amount

If the scaling is confirmed, the replicas to be turned on or deleted are those that are needed to reach the number of required replicas.

In case of upscaling, the number of replicas to be turned on is:

Required replicas - replicas currently turned on

For example:

  • Required replicas: 5
  • Replicas currently turned on: 2
  • Replicas to be turned on: 3

In case of downscaling, the number of replicas to be deleted is:

Inverse of: required replicas - replicas currently turned on

For example:

  • Required replicas: 2
  • Replicas turned on: 5
  • Replicas to be deleted: inverse of 2 - 5 = inverse of -3 = 3

Performing the scaling

The system orders the upsacling immediately.
If there are replicas marked for deletion following a previous downscaling, the system unmarks a number of them equal to the smaller of the replicas to be turned on and the marked ones. For example:

  • If it has to turn on 3 replicas and finds 2 marked for deletion, it unmarks them all, because MIN (3, 2) is 2. It has only one replica left to turn on.
  • If it has to turn on 3 replicas and finds 6 marked, it unmarks 3, because MIN (3, 6) is 3. It does not need to turn on more replicas.

The unmarked replicas were turned on, they remain turned on and will no longer be deleted at the end of the cooldown period.

After that this calculation is done:

Replicas to turn on - unmarked replicas

and if the result is greater than zero, the system proceeds to ask the cluster to turn on that number of replicas.

Additional replicas are powered on immediately if there is space in the existing virtual computers. Otherwise, if the maximum number of computers set by the installation administrator has not been reached and the system is Cloud based, a number of virtual computers adequate to host the replicas that do not find space in those already active are procured and started. In this case the time to power on the replicas can be in the order of a few minutes.
Scaling remains pending if:

  • Adding the necessary computers would lead to exceeding the maximum number of computers of the runtime set by the installer.
  • The Cloud provider does not have the type of computer requested available.

Downscaling actually occurs after a cooling period determined by the Cooldown Period parameter. The replicas to be deleted are immediately marked for this fate, but if during the cooldown period the need to scale up is determined, one or more replicas marked for deletion—depending on the number of replicas to be powered on—are unmarked, thus reducing or avoiding the need to power on replicas (see above).
Replicas that remain marked until the end of the cooldown period are effectively deleted.

For example, the system determines that 3 replicas need to be deleted.
All are immediately marked for deletion and the cooldown period countdown begins.
For six consecutive polling intervals there is no change: the replicas remain marked for deletion.
At the seventh polling interval from the start of the countdown it is determined that scaling up is necessary and 2 replicas need to be turned on: 2 replicas marked for deletion are then unmarked.
In the subsequent polling intervals until the end of the cooldown period the only replica left marked for deletion remains unchanged, so that replica is eventually deleted. Deleting replicas frees up computational resources (CPU and RAM) in the virtual computers and this can lead to them becoming empty. The cluster autoscaler then takes care of terminating the empty computers.