Now that the dust is slowly settling after the release of Java 21 with its slew of new features, and the new possibilities offered to us. It seems like a good time to look towards the future as people are experimenting evermore with Virtual Threads and new concurrency possibilities.

A Software engineer in front of his computer

Now that the dust is slowly settling after the release of Java 21 with its slew of new features, and the new possibilities offered to us. It seems like a good time to look towards the future as people are experimenting evermore with Virtual Threads and new concurrency possibilities. 

While this certainly is a boon, given all the new, and evolving paradigms the necessity of proper, and swift observability is becoming increasingly vital. After all, we should have proper insights into the performance impacts and possible (new) bottlenecks that impede our move forward. (or possible pinning challenges).

One of the new functionalities that has entered its second preview* in Java 22 is Structured Concurrency (JEP 462, the follow-up of JEP 453). 

This proposal aims to as the name implies bring more structure to concurrent programming. This will be done by treating related tasks that are running in different threads as a single unit. Thus making it easier for us to manage the state, and also keep an eye on what’s happening.

Note: we won’t be diving into the synchronized keyword, or the Lock interfaces which offer additional capabilities compared to synchronized. But they’re certainly also worth looking into, but which flavour of lock fits best depends on your thread safety & performance requirements.


What are the current challenges?

In the “traditional” manner using the ThreadExecutionerService if we’re planning a trip, and want to fetch some information to plan things we could do:

GarderobeSelectionInput handle(String userId) throws ExecutionException, InterruptedException {
    Future<Person> personFuture = executorService.submit(() -> findPerson(userId));
    Future<Weather> weatherFuture = executorService.submit.submit(() -> fetchWeather());
    Future<Activity> activityFuture = executorService.submit.submit(() -> findActivity(userId));
    Person person = personFuture.get();   // Join findPerson
    Weather weather = weatherFuture.get(); // Join fetchWeather
    Activity activity = activityFuture.get(); // Join findActivity
    return new GarderobeSelectionInput(person, weather, activity);
}

Whilst this is nice from a latency point of view, especially if these tasks go to different sources (database, webservice, …)

It does pose some risks:

  • thread leakage: if our 3 calls are started, but the weather service is unavailable the wrapping thread will be released without the requesting threads potentially leading to possibly indefinitely running orphan threads
  • wasted resources: these tasks are related, do we really need to keep a potentially resource-intensive task running if one of the other, cheaper requests fails?

Furthermore, there’s also a challenge in that this relationship only exists in our mind, these tasks are not related in the code.

This makes troubleshooting a tad of a challenge when issues happen, since when observing what’s going on, using for example in a thread dump these tasks will be shown on the call stack of unrelated threads. We will have no indication that these tasks are in fact related. 

Now of course, some of these risks can be mitigated by wrapping our tasks with try-finally, and invoking the cancel methods of the other tasks in the catch block.

But as you can imagine, when there are a lot of tasks this will very quickly become very cumbersome, tricky to keep track of, and error-prone.

Additionally, as this is all unstructured, there are no constraints upon the threads involved. 

So the following things can happen (and are certainly not recommended):

  • a thread can submit work, and a completely different thread can await the result.
  • a different thread can join in on the fun if it has a reference to the Future by calling get()

Now how can we mitigate this?

The StructuredConcurrency API allows us to bind blocks of concurrent code to a scope which allows us to preserve the relationship between the different tasks. Thanks to this all incomplete threads can be shut down when a scope finishes (be it a success or failure).

Subtasks are executed in their own thread by forking them individually and joined as a unit (and maybe canceling) the result (failure or not) is then handled by the parent.

Structured concurrency is based upon the premise that if a task splits into concurrent subtasks then they all return to the same place, namely the task’s code block.

Subtasks perform an action on behalf of a task, and the task awaits the results of the subtasks and observes them for failures. 

This means that:

  • the entry and exit point of blocks of code is clear
  • there is a clear nesting of operations
  • lifetime of concurrent subtasks is restricted to the syntactic block of its parent

Thanks to this sibling (sub)tasks can be treated as a unit and given the hierarchical nature, the runtime can reify it into a tree. This facilitates policy applications to a subtree and also allows observability tools to represent subtasks as junior to their parent, thus also increasing the clarity by a large degree.

So let’s implement this again:

try (final var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Supplier<Person> personTask =
                    scope.fork(() -> findPerson(userId));
            Supplier<Weather> weatherTask =
                    scope.fork(this::fetchWeather);
            Supplier<Activity> activityTask =
                    scope.fork(() -> findActivity(userId));
            scope.join();

            final var person = personTask.get();
            final var weather = weatherTask.get();
            final var activity = activityTask.get();

            return new GarderobeSelectionInput(person, weather, activity);

        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
}


Keep in mind, each scope .fork() creates a subtask that is bound to the scope.

As you can see, this allows us to clearly reflect the structure and tasks that should be completed together and to manage their state. This allows us to more easily observe the behaviour, and manage their state. Ideally speaking, this will make this all more approachable. Given we’re using ShutdownOnFailure in case one of our tasks fails (say the weather service is unavailable, the other subtasks will also be canceled if they haven’t wrapped up yet).

Some API information

You can use StructuredTaskScope directly,  but quite often you’ll use one of the subclasses that have a defined ShutdownPolicy (you can certainly write your own, but these 2 cover the most common use cases).

ShutdownOnSuccess: useful when we just need the result of the first successfully wrapped-up subtask. 

ShutdownOnFailure: in case we’re just interested in the first exception to happen

In both cases, the other tasks will be interrupted, and the owner awaken. (examples of short-circuiting patterns). These implement AutoClosable, so there’s no need to close them ourselves when we use these in Try With Resources.

To recap, StructuredTaskScope facilitates:

  • error handling with short-circuiting
  • cancellation propagation
  • clarity thanks to the clear scope
  • observability thanks to the clear task hierarchy in thread dumps

Virtual threads?

are database processing/network related then we can use VTs whilst we wait on the result. Thus saving us resources, and allowing us to service more requests without a major increase in resource usage.

Note: In this case, it’s a preview, that can already be used, so the development team can gather feedback to improve the workflow.

Note: to use this new functionality you need to use `–release 21 –enable-preview` when compiling your class

Observability

Whilst this new approach does showcase that concurrency will become easier with this new API, it is also apparent that the application landscape is rapidly evolving, and that our applications are becoming increasingly complex.

Thus proper attention needs to be taken to gather insights into what is going on in our applications.

Virtual Threads (VT) are fully integrated with the JDK Flight Recorder, so events are emitted when a VT starts/ends/doesn’t start/blocks while pinned. The JSON thread-dump format also shows StructuredTaskScope’s grouping of threads in a hierarchy (jcmd <pid> Thread.dump_to_file -format=json <file>)

We can observe what’s going on in a StructuredTaskScope and its forked subtasks by using: 

jcmd <pid> Thread.dump_to_file -format=json <output_file>

In the generated output we can then see the StructuredTaskScope within the threadContainers segment and the threads of its forked subtasks:

{
  "threadDump": {
    ...
    "threadContainers": [
      {
        "container": "<root>",
        "parent": null,
        "owner": null,
        "threads": [
      ...
      {
        "container": "java.util.concurrent.StructuredTaskScope$ShutdownOnFailure@4f923272",
        "parent": "<root>",
        "owner": "1",
        "threads": [
         ... (subtasks)
        ],
        "threadCount": "3"
      }
    ]
  }
}

Now, of course, we obviously can’t forget about OpenTelemetry which makes it very easy to track what’s going on in your applications by emitting the proper signals, which can then be collected and interpreted. Ideally, one should strive for continuous feedback, and interpret this information as soon as possible to determine how their changes are impacting the overall behavior of the application. One such tool is Digma, which easily integrates with IntelliJ IDEA.

References:

Structured Concurrency – https://openjdk.org/jeps/462

Virtual threads – https://openjdk.org/jeps/444 – 

OpenTelemetry – https://opentelemetry.io/ 

JDK Flight Recorder – https://docs.oracle.com/javacomponents/jmc-5-4/jfr-runtime-guide/about.htm#JFRUH170 – 

Conclusion: Structured Concurrency

This article was contributed to the Digma blog by Simon Verhoeven, a Senior Software Engineer, and Java Consultant with a particular focus on Cloud quality and Maintainability.

Install  Digma Free: Here

Spread the news:

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *