4

Not that long ago I answered this question: Executing Dependent tasks in parallel in Java But using future.get() is blocking current thread, and there is the possibility that the thread pool runs out of threads if too many gets() are called at the one time. How does one compose futures from futures in Java?

Community
  • 1
  • 1
Derrops
  • 7,651
  • 5
  • 30
  • 60

3 Answers3

5

I thought I would answer this question myself, one can use CompletableFutures in java instead of Futures. CompletableFutures allow for composition via the thenCombine method, which is similiar to scalas flatMap. Now there is no blocking happening and only 3 threads are needed to achieve the fastest time.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;
import java.util.function.Supplier;

public class Barrista
{

    // number of threads used in executor
    static final int NOTHREADS = 3;

    // time of each task
    static final int HEATWATER = 1000;
    static final int GRINDBEANS = 1000;
    static final int FROTHMILK = 1000;
    static final int BREWING = 1000;
    static final int COMBINE = 1000;

    // method to simulate work (pause current thread without throwing checked exception)
    public static void pause(long t)
    {
        try
        {
            Thread.sleep(t);
        }
        catch(Exception e)
        {
            throw new Error(e.toString());
        }
    }

    // task to heat some water
    static class HeatWater implements Supplier<String>
    {
        @Override
        public String get()
        {
            System.out.println("Heating Water");
            pause(HEATWATER);
            return "hot water";
        }
    }

    // task to grind some beans
    static class GrindBeans implements Supplier<String>
    {
        @Override
        public String get()
        {
            System.out.println("Grinding Beans");
            pause(GRINDBEANS);
            return "grinded beans";
        }
    }

    // task to froth some milk
    static class FrothMilk implements Supplier<String>
    {
        @Override
        public String get()
        {
            System.out.println("Frothing some milk");
            pause(FROTHMILK);
            return "some milk";
        }
    }

    // task to brew some coffee
    static class Brew implements BiFunction<String,String, String>
    {
        @Override
        public String apply(String groundBeans, String heatedWater)
        {
            System.out.println("Brewing coffee with " + groundBeans + " and " + heatedWater);
            pause(BREWING);
            return "brewed coffee";
        }
    }

    // task to combine brewed coffee and milk
    static class Combine implements BiFunction<String,String, String>
    {
        @Override
        public String apply(String frothedMilk, String brewedCoffee)
        {
            System.out.println("Combining " + frothedMilk + " "+ brewedCoffee);
            pause(COMBINE);
            return "Final Coffee";
        }
    }

    public static void main(String[] args)
    {
        ExecutorService executor = Executors.newFixedThreadPool(NOTHREADS);

        long startTime = System.currentTimeMillis();

        try
        {
            // create all the tasks and let the executor handle the execution order
            CompletableFuture<String> frothMilk =       CompletableFuture.supplyAsync(new FrothMilk(), executor);
            CompletableFuture<String> heatWaterFuture = CompletableFuture.supplyAsync(new HeatWater(), executor);
            CompletableFuture<String> grindBeans =      CompletableFuture.supplyAsync(new GrindBeans(), executor);

            CompletableFuture<String> brew = heatWaterFuture.thenCombine(grindBeans, new Brew());
            CompletableFuture<String> coffee =          brew.thenCombine(frothMilk,  new Combine());

            // final coffee
            System.out.println("Here is the coffee:" + coffee.get());

            // analyzing times:
            System.out.println("\n\n");
            System.out.println("Actual time: \t\t\t\t" + (System.currentTimeMillis() - startTime)/1000.0);

            // compute the quickest possible time:
            long path1 = Math.max(GRINDBEANS, HEATWATER)+ BREWING + COMBINE;
            long path2 = FROTHMILK + COMBINE;
            System.out.println("Quickest time multi-threaded:\t\t" + Math.max(path1, path2)/1000.0);

            // compute the longest possible time:
            long longestTime = HEATWATER + GRINDBEANS + FROTHMILK + BREWING + COMBINE;
            System.out.println("Quickest time single-threaded thread:\t" + longestTime/1000.0);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally
        {
            executor.shutdown();
        }

    }
}
Derrops
  • 7,651
  • 5
  • 30
  • 60
  • This is a very nice solution with a caveat found: In practice, jobs can fail (in the form of status, say, terminated SUCCESSFUL, or terminated with FAILURE) but I still want to get as SOON and MANY jobs done as possible before debugging those failed jobs all at once. So what will be a good exception handling strategy here? – cpchung Dec 10 '19 at 20:04
  • Yes you are correct @cpchung! One way to deal with this is using Monads and mapping over containers is an elegant solution. In Java you almost always have `if(success)` statements to achieve the same thing although with a bit of extra effort you could do something a bit nicer. – Derrops Dec 10 '19 at 22:51
2

Java 8 introduces CompletableFuture, where you do not require particularly block a get call, except you trigger a callback depend upon completion stage.

A Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion.

Read more on documentation

Before java 8, this concept is available with google groovy library, read more on documentation and spring library too.

Subhrajyoti Majumder
  • 40,646
  • 13
  • 77
  • 103
  • Thanks yes I have used these in my answer. I did sort of have the answer to this question ready when asking the question but I thought it would be useful to people. I have had a very hard time finding how to do this. – Derrops May 25 '16 at 07:19
1

Dexecutor to the rescue here.

Disclaimer I am the owner

Dependent task execution, easily done with dexecutor.

Sequential

enter image description here

DefaultDexecutor<Integer, Integer> executor = newTaskExecutor();
//Building
executor.addDependency(1, 2);
executor.addDependency(2, 3);
executor.addDependency(3, 4);
executor.addDependency(4, 5);
//Execution
executor.execute(ExecutionConfig.TERMINATING);

Parallel

enter image description here

DefaultDexecutor<Integer, Integer> executor = newTaskExecutor();
//Building
executor.addIndependent(1);
executor.addIndependent(2);
executor.addIndependent(3);
executor.addIndependent(4);
//Execution
executor.execute(ExecutionConfig.TERMINATING);

Or even mix

enter image description here

DefaultDexecutor<Integer, Integer> executor = newTaskExecutor();

executor.addDependency(1, 2);
executor.addDependency(1, 2);
executor.addDependency(1, 3);
executor.addDependency(3, 4);
executor.addDependency(3, 5);
executor.addDependency(3, 6);

executor.addDependency(2, 7);
executor.addDependency(2, 9);
executor.addDependency(2, 8);
executor.addDependency(9, 10);
executor.addDependency(12, 13);
executor.addDependency(13, 4);
executor.addDependency(13, 14);
executor.addIndependent(11);

executor.execute(new ExecutionConfig().immediateRetrying(2));

Refer How Do I for more details

craftsmannadeem
  • 2,665
  • 26
  • 22