1

I have a class with the following two methods.

public class Test1 {
    public Mono<String> blah1() {
        Mono<String> blah = Mono.just("blah1");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("blah1 done");
        return blah;
    }
    
    public Mono<String> blah2() {
        Mono<String> blah = Mono.just("blah2");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("blah2 done");
        return blah;
    }
}

I have the following JUnit:

@Test
public void blah1Test() {
    Flux<Tuple2<String, String>> s = Flux.zip(test1.blah1(), test1.blah2());    
}

My result is as follows:

blah1 done
blah2 done

I expect blah2 to finish before blah1. Thus I believe this is processing blocking instead of non blocking. What do I need to do to have the output switch to blah2 done then blah1 done? Basically why are these not processing in parallel?

Thanks in advance for your time!

lkatiforis
  • 5,703
  • 2
  • 16
  • 35
Brian
  • 556
  • 6
  • 26

1 Answers1

2

Both sleep and println are executed outside the reactive pipeline. Therefore, both blah1() and blah2() behave like regular, non-reactive methods.

Try this:

public Mono<String> blah1() {
    System.out.println("blah1 start");
    return Mono.just("blah1")
        .delayElement(Duration.ofMillis(5000))
        .doOnNext(e -> System.out.println("blah1 done"));
}

public Mono<String> blah2() {
    System.out.println("blah2 start");
    return Mono.just("blah2")
        .delayElement(Duration.ofMillis(1000))
        .doOnNext(e -> System.out.println("blah2 done"));
}

Here, we have the expected result because printing takes place within the reactive pipeline.

Output:

blah1  start 
blah2 start 
blah2 done
blah1 done
lkatiforis
  • 5,703
  • 2
  • 16
  • 35
  • I am trying to slowly migrate a large application to spring reactive over time. Is there a way to make it so I can process methods that have code outside the reactive pipeline still using a mono/flux? I have a lot of CompletableFutures, but would rather change them to Mono/flux for now, and change the code inside the method at a later time. – Brian Nov 15 '21 at 00:18
  • I may have answered my own question. It appears i can put code in .doOnNext and it will process within the reactive pipeline. Is that accurate? – Brian Nov 15 '21 at 00:39
  • @Brian Of course `doOnNext` will work but not for all cases. It's not a good idea to change the application state or execute an async operation there. – lkatiforis Nov 15 '21 at 05:04
  • The majority of my application is Async already using CompletableFutures, but not for everything. Thanks for all the great help! – Brian Nov 15 '21 at 13:30