6

Let's pretend I have something like this:

<TextBox Text="{Binding Text, Mode=TwoWay}" />

And something like this:

public class MyViewModel : INotifyPropertyChanged
{
    public MyViewModel()
    {
        // run DoWork() when this.Text changes
        Observable.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
            .Where(x => x.EventArgs.PropertyName.Equals("Text"))
            .Subscribe(async x => await DoWork());
    }

    private async Task DoWork()
    {
        await Task.Delay(this.Text.Length * 100);
    }

    public event PropertyChangedEventHandler PropertyChanged;

    private string _Text = "Hello World";
    public string Text
    {
        get { return _Text; }
        set
        {
            _Text = value;
            if (this.PropertyChanged != null)
                this.PropertyChanged(this, new PropertyChangedEventArgs("Text"));
        }
    }
}

In this scenario, the user could be typing very quickly. I need:

  1. DoWork() must not run while DoWork() is already running

  2. The user may type in spurts, some changes, pause, some changes

  3. DoWork() is NOT required for every change, only the last change

  4. There is no need for DoWork() to be called more frequently than 1 second

  5. DoWork() cannot wait until the last change, if the spurt is > 1 second

  6. DoWork() should not be called while the system is idle

  7. The duration of DoWork() varies based on the length of this.Text

The question isn't if Rx can do this. I know it can. What's the proper syntax?

Jerry Nixon
  • 31,313
  • 14
  • 117
  • 233
  • 2
    Freaky, I'm doing this exact thing now and was wondering the exact thing(s) –  Feb 18 '14 at 18:14
  • It is not a requirement that this be a pure RX solution. – Jerry Nixon Feb 18 '14 at 19:27
  • 3
    Come up with a better title lol – Ana Betts Feb 18 '14 at 19:28
  • None of the presented solutions are easy. I think that given that the most experienced Rx people on SO clearly had to work hard to get working solutions belies that fact that this is patently *not* pretty easy? I don't think any of the code here is easy to read, let alone write. That's why I still gravitate towards doing this outside of the stream. In my experience, managing slow consumers outside the monad is just easier all round. – James World Feb 19 '14 at 09:26
  • IMHO, it certainly has the least and most readable code of all solutions presented! – James World Feb 19 '14 at 09:32
  • @JamesWorld Almost any RX code that does something non-trivial is hard to read for a people less familiar with RX. But still, if the usage us easy, who cares? In my solution, it is one-liner - clicks.WorkSequencer(DoWork).Subscribe(); BTW, this is exactly the point of RX - instead of thinking of the detail of the algorithm, you're combining macro "concepts" to achieve the desired result. – Sergey Aldoukhov Feb 23 '14 at 02:07
  • Yes, none of that is news to me. My comments were in response to "It's actually pretty easy to write an extension for this..." which talks to the difficulty of arriving at a solution, not the ease of use of it. – James World Feb 23 '14 at 10:55

5 Answers5

3

You may be surprised how hard this is as a pure RX solution. It's subtly different to the similar (and typical Rx 101 example) of submitting a throttled search in response to textbox changes - in that case, it's ok to fire off concurrent searches, cancelling all but the latest one.

In this case, once DoWork() is off and running it can't be replaced or interrupted.

The problem is that Rx streams flow in one direction and can't "talk backwards" - so events queue up against slow consumers. To drop events due to slow consumers is quite hard in Rx.

It's much easier in a world where DoWork() can be cancelled and replaced when a new (probably throttled) event arrives.

First I present a pure Rx solution. Then at the end, a simpler approach where the slow consumer is dealt with by a dispatching mechanism outside of Rx.

For the pure approach, you'll need this helper extension method to drop events queued against a slow consumer which you can read about here:

public static IObservable<T> ObserveLatestOn<T>(
    this IObservable<T> source, IScheduler scheduler)
{
    return Observable.Create<T>(observer =>
    {
        Notification<T> outsideNotification = null;
        var gate = new object();
        bool active = false;

        var cancelable = new MultipleAssignmentDisposable();
        var disposable = source.Materialize().Subscribe(thisNotification =>
        {
            bool wasNotAlreadyActive;
            lock (gate)
            {
                wasNotAlreadyActive = !active;
                active = true;
                outsideNotification = thisNotification;
            }

            if (wasNotAlreadyActive)
            {
                cancelable.Disposable = scheduler.Schedule(self =>
                {
                    Notification<T> localNotification = null;
                    lock (gate)
                    {
                        localNotification = outsideNotification;
                        outsideNotification = null;
                    }
                    localNotification.Accept(observer);
                    bool hasPendingNotification = false;
                    lock (gate)
                    {
                        hasPendingNotification = active = (outsideNotification != null);
                    }
                    if (hasPendingNotification)
                    {
                        self();
                    }
                });
            }
        });
        return new CompositeDisposable(disposable, cancelable);
    });
}

With this available you can then do something like:

// run DoWork() when this.Text changes
Observable.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
          .Where(x => x.EventArgs.PropertyName.Equals("Text"))
          .Sample(TimeSpan.FromSeconds(1)) // get the latest event in each second
          .ObservableLatestOn(Scheduler.Default) // drop all but the latest event
          .Subscribe(x => DoWork().Wait()); // block to avoid overlap

Remarks

To be honest, you are probably better off avoiding the pure Rx solution here, and instead DON'T call DoWork() directly from a subscriber. I would wrap it with an intermediate dispatching mechanism called from the Subscribe method that handles not calling it if it's already running - the code would be way simpler to maintain.

EDIT:

After thinking on this for a few days, I didn't do any better than some of the other answers here - I'll leave the above for interest, but I think I like Filip Skakun approach the best.

James World
  • 29,019
  • 9
  • 86
  • 120
  • It's actually pretty easy to write an extension for this in a few lines of pure RX. No boolean gates or thread locking required. See my answer for some combinators to help. – bradgonesurfing Feb 19 '14 at 08:57
  • Not sure which comment you mean. You are suggesting that to drop events in RX due to a slow consumer is hard to do. It's not really. If that is all you want to do then it is pretty easy. You just have to create a Replay(1) buffer and consume it with a sampler triggered by the completion of the consumer. – bradgonesurfing Feb 19 '14 at 09:34
  • As well your suggestion using the ``_running`` flag will not work. It will miss the last event. – bradgonesurfing Feb 19 '14 at 09:36
  • The comment starting "None of the above...". You have lost touch with the reality of groking Rx if you think what you just said is going to appear easy to the average person!!! – James World Feb 19 '14 at 09:37
  • Ah yes, it was a requirement... I'll fix that! – James World Feb 19 '14 at 09:39
  • You will have to maintain some more state and I think it will get a bit ugly. I'm curious. – bradgonesurfing Feb 19 '14 at 09:48
  • 1
    Well actually, if you look at from far enough away, every solution presented really has the same idea of having to statefully track the running task. Since any combinator is just subscribing upstream anyway, I'm sure I'll end up repacking this into something similar. Got to get on with the day job right now, but I'll revisit this soon. Your solution is growing on me though! – James World Feb 19 '14 at 09:51
  • Hi James, chanced on this answer (http://stackoverflow.com/a/11040837) on another question related to ObserverLatestOn, where the 'slack' is introduced with a special overload of Subscribe. It seems a lot simpler (to write in any case). Very difficult to test though! – Benjol Oct 23 '15 at 06:37
3

While I kind of agree with James World, I think you can Do Better, if we use just a bit of mutable state. What if DoWork looked like this:

AsyncSubject<Unit> doingWork;
public IObservable<Unit> DoWork()
{
    if (doingWork != null) return doingWork;

    doingWork = Observable.Start(() => {
        // XXX: Do work
        Thread.Sleep(1000);

        // We only kick off this 1sec timeout *after* we finish doing work
        Observable.Timer(TimeSpan.FromSeconds(1.0), DispatcherScheduler.Instance)
            .Subscribe(_ => doingWork = null);
    });

    return doingWork;
}

Now, DoWork debounces itself Automagically™, and we can get rid of this await-in-Subscribe silliness; we set the throttle to 250ms to be Quick-But-Not-Too-Quick.

This initially appears to violate requirement #5 above, but we've ensured that anyone calling DoWork too quickly just gets the previous run's results - the effect will be that DoWork will be called many times, but not necessarily do anything. This ensures though, that if we aren't doing work, we won't have a 1sec delay after the user stops typing, like Throttle(1.seconds) would

    Observable.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
        .Where(x => x.EventArgs.PropertyName.Equals("Text"))
        .Throttle(TimeSpan.FromMilliseconds(250), DispatcherScheduler.Instance)
        .SelectMany(_ => DoWork())
        .Catch<Unit, Exception>(ex => {
            Console.WriteLine("Oh Crap, DoWork failed: {0}", ex);
            return Observable.Empty<Unit>();
        })
        .Subscribe(_ => Console.WriteLine("Did work"));
Ana Betts
  • 73,868
  • 16
  • 141
  • 209
  • That's a nice trick - although on balance I think I prefer the "mess" in the subscriber. It's still going in my playbook though :). Btw - I think you want `Sample` instead of `Throttle` (see point 5 in OP.). – James World Feb 18 '14 at 20:45
  • I actually do want Throttle here - the trick is, the SelectMany will fire DoWork > 1x per 1sec interval, but *some* of those subscriptions won't do anything, because they'll just get the previous DoWork's AsyncSubject. – Ana Betts Feb 18 '14 at 20:59
  • 3
    Actually, there's some stuff to fix here. Neither `Throttle` nor `Sample` work. If you use `Throttle`, a steady stream of events at less than 250ms intervals will suppress the event continuously won't it? Also, shouldn't `doingWork` just be declared as `IObservable`? (To be sure I wasn't going mad, I did run the code hooked up to a button click and pressed that button like crazy and no events came out...) – James World Feb 18 '14 at 21:17
3

I think a simpler and reusable way to solve your problem might actually be async/await-based rather than RX-based. Check out the single threaded EventThrottler class implementation I got as an answer to my 'Is there such a synchronization tool as “single-item-sized async task buffer”?' question. With that you can rewrite your DoWork() method as simply:

private void DoWork()
{
    EventThrottler.Default.Run(async () =>
    {
        await Task.Delay(1000);
        //do other stuff
    });
}

and call it every time your text changes. No RX required. Also, if you are already using WinRT XAML Toolkit - the class is in there.

Here's a copy of the throttler class code as a quick reference:

public class EventThrottler
{
    private Func<Task> next = null;
    private bool isRunning = false;

    public async void Run(Func<Task> action)
    {
        if (isRunning)
            next = action;
        else
        {
            isRunning = true;

            try
            {
                await action();

                while (next != null)
                {
                    var nextCopy = next;
                    next = null;
                    await nextCopy();
                }
            }
            finally
            {
                isRunning = false;
            }
        }
    }

    private static Lazy<EventThrottler> defaultInstance =
        new Lazy<EventThrottler>(() => new EventThrottler());
    public static EventThrottler Default
    {
        get { return defaultInstance.Value; }
    }
}
Community
  • 1
  • 1
Filip Skakun
  • 31,624
  • 6
  • 74
  • 100
  • This is a pretty neat solution. Again similar in approach when viewed from a distance, but I think makes the most "sense" from the perspective of dealing with back pressure. – James World Feb 23 '14 at 11:01
2

Here is what I have (the code is tested, btw). It is based on the event throttling extension I created a few years ago. I think a good name for it would be Ouroboros. The major thing about it is that opposite to when using Throttle, it starts the work immediately if a cooldown has passed.

public static IObservable<TResult> CombineVeryLatest<TLeft, TRight, TResult>(
    this IObservable<TLeft> leftSource,
    IObservable<TRight> rightSource, 
    Func<TLeft, TRight, TResult> selector)
{
    return Observable.Defer(() =>
    {
        int l = -1, r = -1;
        return Observable.CombineLatest(
            leftSource.Select(Tuple.Create<TLeft, int>),
            rightSource.Select(Tuple.Create<TRight, int>),
                (x, y) => new { x, y })
            .Where(t => t.x.Item2 != l && t.y.Item2 != r)
            .Do(t => { l = t.x.Item2; r = t.y.Item2; })
            .Select(t => selector(t.x.Item1, t.y.Item1));
    });
}

public static IObservable<TWork> WorkSequencer<T, TWork>(
    this IObservable<T> source, Func<Task<TWork>> work)
{
    return source.Publish(src =>
    {
        var fire = new Subject<T>();
        var fireCompleted = fire.SelectMany(x => work()).Publish();
        fireCompleted.Connect();
        var whenCanFire = fireCompleted.StartWith(default(TWork));

        var subscription = src
            .CombineVeryLatest(whenCanFire, (x, flag) => x)
            .Subscribe(fire);

        return fireCompleted.Finally(subscription.Dispose);
    });
}

Then usage would be:

    private int _counter;

    public MainWindow()
    {
        InitializeComponent();
        var clicks = Observable
            .FromEventPattern(TestBn, "Click")
            .Do(_ =>
            {
                Console.WriteLine("click");
                _counter++;
            });
        clicks.WorkSequencer(DoWork).Subscribe();
    }

    private async Task<int> DoWork()
    {
        var workNumber = _counter;
        Console.WriteLine("Work Start " + workNumber);
        await Task.WhenAll(Task.Delay(_counter*100), Task.Delay(1000));
        Console.WriteLine("Work Done " + workNumber);
        return _counter;
    }
Community
  • 1
  • 1
Sergey Aldoukhov
  • 22,316
  • 18
  • 72
  • 99
2

I have a couple of combinators called SubscribeWithoutOverlap I use in the UI for this purpose. All incoming events are discarded except the last one are discarded till the work is finished. When the work is finished the event buffer is asked for the next event.

    /// <summary>
    /// Subscribe to the observable whilst discarding all events that are
    /// recieved whilst the action is being processed. Can be
    /// used to improve resposiveness of UI's for example 
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="source"></param>
    /// <param name="action"></param>
    /// <param name="scheduler"></param>
    /// <returns></returns>
    public static IDisposable SubscribeWithoutOverlap<T>
    (this IObservable<T> source, Action<T> action, IScheduler scheduler = null)
    {
        var sampler = new Subject<Unit>();
        scheduler = scheduler ?? Scheduler.Default;
        var p = source.Replay(1);

        var subscription = sampler.Select(x=>p.Take(1))
            .Switch()
            .ObserveOn(scheduler)
            .Subscribe(l =>
            {
                action(l);
                sampler.OnNext(Unit.Default);
            });

        var connection = p.Connect();
        sampler.OnNext(Unit.Default);

        return new CompositeDisposable(connection, subscription);
    }

and

    public static IDisposable SubscribeWithoutOverlap<T>
    (this IObservable<T> source, Func<T,Task> action, IScheduler scheduler = null)
    {
        var sampler = new Subject<Unit>();
        scheduler = scheduler ?? Scheduler.Default;
        var p = source.Replay(1);

        var subscription = sampler.Select(x=>p.Take(1))
            .Switch()
            .ObserveOn(scheduler)
            .Subscribe(async l =>
            {
                await action(l);
                sampler.OnNext(Unit.Default);
            });

        var connection = p.Connect();
        sampler.OnNext(Unit.Default);

        return new CompositeDisposable(connection, subscription);
    }

so the following should then meet your requirements.

IObservable<string> source;

source
   .Throttle(TimeSpan.FromMilliSeconds(100))
   .Merge(source.Sample(TimeSpan.FromSeconds(1))
   .SubscribeWithoutOverlap(DoWork)

Note the mix of Throttle and Sample to get both timing behaviors asked for in the question.

With regards to some of the other answers. If you find yourself putting complex RX logic into your business logic then extract into a custom combinator that has a clear purpose. You will thank yourself later when trying to understand what it does.

bradgonesurfing
  • 30,949
  • 17
  • 114
  • 217
  • 1
    This is similar to what I have in my answer. I might be mistaken, but this one has a bug of last action request might "stuck" if it comes during a period when previous action is still executing. In my solution, the last work is guaranteed after the cooldown. – Sergey Aldoukhov Feb 19 '14 at 08:13
  • That is interesting. Instead of throwing all the events away I should buffer the last one. I'll take a look and update. – bradgonesurfing Feb 19 '14 at 08:25
  • I think it is just a matter of change ``var p = source.Publish()`` to ``var p = source.Replay(1)`` The last event from source is now buffered. You think that will fix it? – bradgonesurfing Feb 19 '14 at 08:30
  • Fixed the code. The logic should now be that the ``latest`` event from source is available as soon as the subscriber is finished. If nothing is available then the next one will be processed. – bradgonesurfing Feb 19 '14 at 08:33
  • 1
    Plus one from me, even though I'm still not convinced it's best (see OP comments) - this is a very interesting and useful approach. – James World Feb 19 '14 at 09:34
  • 1
    I think you will see something like this from the RX team eventually. If you scan the googleverse you will find the concept of ``backpressure`` is floating around. It's not a solved problem so it isn't in the library yet but I expect it to be there sometime. – bradgonesurfing Feb 19 '14 at 09:50