Problem: I have a subscription to a never ending messaging service, my code needs to check if any message satisfies the condition, if it is satisfied, then close the subscription before all the messages are processed and return true. If I have processed all the messages and the condition isn't satisfied then I need to close the subscription and return false.
For example, the condition is foo = 5
:
message dataset early success :
msg1: foo=1
msg2: foo=2
msg3: foo=5 <= condition satisfied, return true and stop processing
msg4: foo=6
message dataset failure :
msg1: foo=1
msg2: foo=2
msg3: foo=3
msg4: foo=4 <= no more messages, return false and stop processing
The subscription I use has a synchronous method that I have to pass an async EventHandler
.
Here is my functioning code that works for both scenarios, lastMessageReceivedDateTime
tracks when a message was last received (to identify the end of the messages) and _conditionStatisfied
tells me if I've got my data:
private DateTime lastMessageReceivedDateTime;
private bool _conditionSatisfied;
public Task<bool> CheckSubscription(IThirdParyCode connection)
{
var subscription = connection.Subscribe(async (obj, args) =>
{
lastMessageReceivedDateTime = DateTime.Now;
if(args.Message.foo == 5)
{
_conditionSatisfied = true;
}
});
while (lastMessageReceivedDateTime.AddSeconds(1) > DateTime.Now && !_conditionSatisfied)
{
Thread.Sleep(500);
}
subscription?.Unsubscribe();
return _activityCheckSatisfied;
}
This works, but I wanted to know if there was a better solution.
Note: I can't simply await the async method, as it never returns/completes until I unsubscribe.
More info: The type of the connection
is an IStanConnection
(from NATS), and the signature of Subscribe
is:
IStanSubscription Subscribe(string subject, StanSubscriptionOptions options,
EventHandler<StanMsgHandlerArgs> handler);
I had simplified the signature to focus on the code I had issue with.