Kotlin Flow implementation in C #

image







Hello!







In recent years, I have been developing for Android on Kotlin. Not so long ago, for lack of RxJava on the Kotlin multiplatform, we started using coroutines and flow - cold streams for Kotlin out of the box. Before Android, I spent many years with C #, and there have been my coroutines there for a very long time, only they are not called that way. But I didn’t hear about the analog of flow on async / await. The main tool for reactive programming is Rx.Net (in fact, rx was born here). So I decided to nostalgic and try to saw a bike.







It is further understood that the reader is aware of the things that were discussed in the previous paragraph. For the impatient - immediately link to the repository .







Disclaimer: this code is not intended to be used in production. This is a concept, nothing more. Something may not work exactly as intended.







IFlow and IFlowCollector



Well, let's start by rewriting the Flow and FlowCollector interfaces in C # on the forehead.

It was:







interface Flow<out T> { suspend fun collect(collector: FlowCollector<T>) } interface FlowCollector<in T> { suspend fun emit(value: T) }
      
      





It became:







  public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector); } public interface IFlowCollector<in T> { Task Emit(T item); }
      
      





I believe the differences are understandable and explained by the different implementation of asynchrony.







To use these interfaces, they must be implemented. Here's what happened:







  internal class Flow<T> : IFlow<T> { private readonly Func<IFlowCollector<T>, Task> _emitter; public Flow(Func<IFlowCollector<T>, Task> emitter) { _emitter = emitter; } public Task Collect(IFlowCollector<T> collector) { return _emitter(collector); } } internal class FlowCollector<T> : IFlowCollector<T> { private readonly Func<T, Task> _handler; public FlowCollector(Func<T, Task> handler) { _handler = handler; } public Task Emit(T item) { return _handler(item); } }
      
      





In the constructor of flow, we pass a function that will emit values. And to the constructor of the collector, a function that will process each emitted value.







You can use it like this







 var flow = new Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }); var collector = new FlowCollector<int>(async item => Console.WriteLine(item)); await flow.Collect(collector);
      
      





I think everything is clear in the code above. First we create a Flow, then create a collector (handler for each element). Then we start Flow, having "signed" a collector on it. If you add a little sugar (see github), we get something like this:







 await Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }) .Collect(Console.WriteLine);
      
      





On Kotlin it looks like this:







 scope.launch{ flow{ emit(1) delay(1000) … }.collect{ printl(it) } }
      
      





Personally, I most of all do not like the option on Sharpe to explicitly indicate the type of element when creating a flow. But the point here is not that type inference in Kotlin is much steeper. The flow function looks like this:







 public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
      
      





As we can see, the block parameter is marked with the BuilderInference annotation, which tells the compiler that the type should be taken from this parameter. Does anyone know if it is possible to file similar for C # on Roslyn?







CancellationToken



In rx there is a subscription from which you can unsubscribe. In Kotlin Flow, Job is responsible for the cancellation, which the builder returns, or Coroutine Scope. We also definitely need a tool that allows Flow to complete early. In C #, to cancel asynchronous operations, I’m not afraid of the word, the Cancellation Token pattern is used. CancellationToken is a class whose object provides information to the asynchronous operation that it has been canceled. It throws itself into an asynchronous operation at startup, and this operation itself looks after its state. And the state changes from the outside.







In short, we need to throw the CancellationToken into our Flow and FlowCollector.







  public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector, CancellationToken cancellationToken = default); } public interface IFlowCollector<in T> { Task Emit(T item, CancellationToken cancellationToken = default); }
      
      





I will not put the implementation here - see github.

The test will now look like this:







  var cts = new CancellationTokenSource(); var flowTask = Flow<int>(async (collector, cancellationToken) => { await collector.Emit(1); await Task.Delay(2000, cancellationToken); await collector.Emit(2); await Task.Delay(2000, cancellationToken); await collector.Emit(3); }) .Collect(item => Log(item), cts.Token); var cancelationTask = Task.Run(async () => { await Task.Delay(3000); cts.Cancel(); }); await flowTask;
      
      





The point is this. In parallel with Flow, we start an operation that will cancel it after 3 seconds. As a result, Flow does not manage to emit the third element and ends with a TaskCanceledException, which is the required behavior.







A bit of practice



Let's try to use what happened in practice. For example, wrap some event in our Flow. In Rx.Net there is even a library method FromEventPattern for this.







In order not to mess with the real UI, I wrote the ClicksEmulator class, which generates conditional clicks on the mouse button at random intervals.







  public class ClicksEmulator { public enum Button { Left, Right } public class ClickEventArgs : EventArgs { //… public int X { get; } public int Y { get; } public Button Button { get; } } public event EventHandler<ClickEventArgs> ButtonClick; public async Task Start(CancellationToken cancellationToken = default) {… } }
      
      





I omitted the implementation as she is not very important here. The main thing is the event ButtonClick, which we want to turn into Flow. For this we write an extension method







 public static IFlow<ClicksEmulator.ClickEventArgs> Clicks(this ClicksEmulator emulator) { return FlowFactory.Flow<ClicksEmulator.ClickEventArgs>(async (collector, cancellationToken) => { void clickHandler(object sender, ClicksEmulator.ClickEventArgs args) => collector.Emit(args); emulator.ButtonClick += clickHandler; cancellationToken.Register(() => { emulator.ButtonClick -= clickHandler; }); await Task.Delay(-1, cancellationToken); }); }
      
      





First, we declare an event handler that does nothing but pass the event argument to the collector. Then we subscribe to events and register an unsubscribe in case of cancellation (completion) of flow. Well, then we endlessly wait and listen to ButtonClick events until the cancellationToken fires.







If you used callbackFlow or channelFlow in Kotlin or created cold Observable from listeners in Rx, then you will notice that the code structure is very similar in all cases. This is fine, but the question arises - what is better in this case than Flow than a crude event? All the power of jet streams lies in operators who perform various transformations on them: filtering, mapping, and many other, more complex ones. But we don’t have them yet. Let's try to do something about it.







Filter, Map, OnNext



Let's start with one of the simplest operators - Filter. It, as the name implies, will filter the flow elements in accordance with the given predicate. This will be an extension method applied to the original flow and returning flow with filtered elements only. It turns out that we need to take each element from the original flow, check, and emit further if the predicate returns true. So let's do it:







  public static IFlow<T> Filter<T>(this IFlow<T> source, Func<T, bool> predicate) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { if (predicate(item)) collector.Emit(item); }, cancellationToken) );
      
      





Now, if we only need to click on the left mouse button, we can write this:







 emulator .Clicks() .Filter(click => click.Button == ClicksEmulator.Button.Left) .Collect(item => Log($"{item.Button} {item.X} {item.Y}"), cts.Token);
      
      





By analogy, we write the operators Map and OnNext. The first converts each element of the original flow to another using the passed mapper function. The second will return flow with the same elements as the original, but performing an action on each one (usually logging).





  public static IFlow<R> Map<T, R>(this IFlow<T> source, Func<T, R> mapper) => FlowFactory.Flow<R>((collector, cancellationToken) => source.Collect( item => collector.Emit(mapper(item)), cancellationToken ) ); public static IFlow<T> OnNext<T>(this IFlow<T> source, Action<T> action) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { action(item); collector.Emit(item); }, cancellationToken) );
      
      





And an example of use:







 emulator .Clicks() .OnNext(click => Log($"{click.Button} {click.X} {click.Y}")) .Map(click => click.Button == ClicksEmulator.Button.Left ? 0 : 1) .Collect(item => Log($"{item}"), cts.Token);
      
      





In general, a lot of operators were invented for jet streams, they can be found, for example, here .







And nothing prevents to implement any of them for IFlow.







Those who are familiar with Rx.Net know that there, in addition to new and specific operators for IObservable, extension methods from Linq-to-objects are used, and this allows you to consider streams as “collections of events” and manipulate them with the usual Linq methods. Why, instead of writing the statements yourself, try putting IFlow on Linq?







IAsyncEnumerable



In C # 8, an asynchronous version of IEnumerable was introduced - IAsyncEnumerable - a collection interface that can be iterated asynchronously. The fundamental difference between IAsyncEnumerable and reactive streams (IObservable and IFlow) is this. IAsyncEnumerable, like IEnumerable, is a pull model. We iterate over the collection how much and when we need and we draw elements from it ourselves. Streams are push. We subscribe to events and “react” to them when they come - to that they are reactive. However, push-like behavior can be achieved from the pull model. This is called long polling https://en.wikipedia.org/wiki/Push_technology#Long_polling . The essence is this: we iterate over the collection, request its next element and wait as long as we like until the collection returns it to us, i.e. until the next event comes. IAsyncEnumerable, unlike IEnumerable, will allow us to wait asynchronously. In short, we need to somehow pull IAsyncEnumerable on IFlow.







As you know, the IAsyncEnumerator interface is responsible for returning the current element of the IAsyncEnumerable collection and moving to the next element. In this case, we need to take elements from IFlow, and IFlowCollector does this. It turns out here is an object that implements these interfaces:







 internal class FlowCollectorEnumerator<T> : IFlowCollector<T>, IAsyncEnumerator<T> { private readonly SemaphoreSlim _backpressureSemaphore = new SemaphoreSlim(0, 1); private readonly SemaphoreSlim _longPollingSemaphore = new SemaphoreSlim(0, 1); private bool _isFinished; public T Current { get; private set; } public async ValueTask DisposeAsync() { } public async Task Emit(T item, CancellationToken cancellationToken) { await _backpressureSemaphore.WaitAsync(cancellationToken); Current = item; _longPollingSemaphore.Release(); } public async Task Finish() { await _backpressureSemaphore.WaitAsync(); _isFinished = true; _longPollingSemaphore.Release(); } public async ValueTask<bool> MoveNextAsync() { _backpressureSemaphore.Release(); await _longPollingSemaphore.WaitAsync(); return !_isFinished; } }
      
      





The main methods here are Emit , Finish, and MoveNextAsync .

Emit at the beginning is waiting for the moment when the next item from the collection will be requested. Those. Does not emit an item until required. This is called backpressure, hence the name of the semaphore. Then the current item is set and it is reported that a long polling request can get the result.

MoveNextAsync is called when another item is pulled from the collection. He releases _backpressureSemaphore and waits for Flow to trigger the next element. Then it returns a sign that the collection has ended. This flag sets the Finish method.







Finish works on the same principle as Emit, only instead of the next element it sets the sign of the end of the collection.







Now we need to use this class.







 public static class AsyncEnumerableExtensions { public static IAsyncEnumerable<T> CollectEnumerable<T>(this IFlow<T> flow, CancellationToken cancellationToken = default) { var collector = new FlowCollectorEnumerator<T>(); flow .Collect(collector, cancellationToken) .ContinueWith(_ => collector.Finish(), cancellationToken); return new FlowEnumerableAdapter<T>(collector); } } internal class FlowEnumerableAdapter<T> : IAsyncEnumerable<T> { private readonly IAsyncEnumerator<T> _enumerator; public FlowEnumerableAdapter(IAsyncEnumerator<T> enumerator) { _enumerator = enumerator; } public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) { return _enumerator; } }
      
      





The ExtensionEnumerable extension method for IFlow creates a FlowCollectorEnumerator and signs a flow on it, after which the Finish () method is called. And it returns a FlowEnumerableAdapter, which is the simplest implementation of IAsyncEnumerable, using the FlowCollectorEnumerator as an IEnumerator.

We try what happened.







 var clicks = emulator .Clicks() .OnNext(item => Log($"{item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Where(click => click.Button == ClicksEmulator.Button.Right) .Select(click => click.Y < 540 ? "TOP" : "LEFT"); await foreach (var click in clicks) { Log($"Clicked at: {click}"); }
      
      





Here we get Flow clicks (), log each click, then turn IFlow into IAsyncEnumerable. Then it applies the well-known Linq-operators: we leave only the right-clicks and we get in which part of the screen they are made.







Next, consider an example more complicated. We will replace the right click with a double left one. Those. we will need to map each element not to some other, but to the collection. Or in Flow, converted to a collection.







 var clicks = emulator .Clicks() .OnNext(item => Log($"Original: {item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Select(click => click.Button == ClicksEmulator.Button.Left ? Flow<ClicksEmulator.ClickEventArgs>(collector => collector.Emit(click)) : Flow<ClicksEmulator.ClickEventArgs>(async collector => { var changedClick = new ClicksEmulator.ClickEventArgs(click.X, click.Y, ClicksEmulator.Button.Left); await collector.Emit(changedClick); await Task.Delay(200); await collector.Emit(changedClick); }) ) .SelectMany(flow => flow.CollectEnumerable()); await foreach (var click in clicks) { Log($"Changed: {click.Button} {click.X} {click.Y}"); }
      
      





To do this, there is a SelectMany operator in Linq. Its counterpart in jet streams is FlatMap. First, map every click in IFlow: for the left click - Flow with this one click, for the right - Flow from two left clicks with a delay between them. And then in SelectMany we turn IFlow into IAyncEnumerable.







And it works! Those. many operators do not have to be implemented for IFlow - you can use Linq.







Conclusion



Rx.Net - was and remains the main tool when working with asynchronous sequences of events in C #. But this is a fairly large library in terms of code. As we saw, similar functionality can be obtained much simpler - just two interfaces plus some binding. This is possible through the use of language features - async / await. When Rx was born, this feature in C # was not delivered yet.







Thanks for attention!








All Articles