Thursday, 24 February 2011

How to do Chaining with RX

Much of RX is focused on asynchronous operations or events happening in parallel, and the coordination of those results across multiple threads. Very little attention has been given to chaining – quite often you cannot make the next web service call until you know the result of the previous one or which button the user clicked.

The way we’ve done it in the past was typically in the callback of the first async operation, we invoke the second operation which has yet another callback and so on. The first chained call is not too hard but it quickly gets messy as you add more async operations.

Below is my (once again contrived) web service, and I’ve used the WCF RX extension template from my previous post to wrap up the WCF calls in IObservable<T> extension methods.

This is my service contract:

[ServiceContract]
public interface IBooksOnlineService
{
    [OperationContract]
    Book GetSpecial();
 
    [OperationContract]
    string GetBookReview(int bookId);
 
    [OperationContract]
    List<Book> GetBooksByAuthor(string author);
 
    [OperationContract]
    List<Book> GetBooksByGenre(string genre);
 
    [OperationContract]
    void AddFavourites(List<Book> books);
}
 
[DataContract]
public class Book
{
    [DataMember]
    public int Id { get; set; }
 
    [DataMember]
    public string Title {get;set;}
 
    [DataMember]
    public string Author { get; set; }
 
    [DataMember]
    public string Genre { get; set;} 
}

And the client proxy methods are wrapped up like this:



public static IObservable<Book> GetSpecialAsObservable(this IBooksOnlineService service)
{
    return Observable
        .Start(()=> Observable.FromAsyncPattern<Book>(service.BeginGetSpecial, service.EndGetSpecial)())
        .SelectMany(p=>p);
}

To demonstrate the good ol’ favourite way of chaining things, below is an example of the subscribe-in-the-callback method. There’s a number of problems with this arrangement:



  1. Two error handlers
  2. Cancelling the request is complicated because you don’t know which request is being processed.
  3. Adding another chained method or even a parallel chained method is not going to be pretty.
  4. When you start trying to unit test this, you are going to end up in a world of pain. (more about unit testing RX coming in a later post)


private static void ChainingTheClumsyWay()
{
    var client = new BooksOnlineServiceClient();
    var mysubscription = client
        .GetSpecialAsObservable()
        .Subscribe(special => //This is the callback of the first method.
                        {
                            var reviewSubscription = client
                                .GetBookReviewAsObservable(special.Id)
                                .Subscribe(review => //This is the callback of the second method
                                                {
                                                    var bookWithReview = new
                                                                            {
                                                                                Book = special,
                                                                                Review = review
                                                                            };
                                                    //Do something with the result.
                                                });
                        },
                    specialErrors => Console.WriteLine(specialErrors.Message));
}

Rewriting this as Linq makes it far easier to understand:



private static void ChainingWithLinq()
{
    var client = new BooksOnlineServiceClient();
    var query = from special in client.GetSpecialAsObservable()
                from review in client.GetBookReviewAsObservable(special.Id)
                select new
                            {
                                Book = special,
                                Review = review
                            };
    var mysubscription = query.Subscribe(
        result =>
        {
            //Do something with the result. 
        },
        error => Console.WriteLine(error.Message));
}

What that Linq syntax is going to do under the covers is a SelectMany(…) on the first result. The SelectMany allows you to create a new IObservable<string> for each item returned in the OnNext by the first IObservable<Book>, then flattens the result. In this case we are only dealing with sequences of one event for both observables, but it works equally well if the Observable<Book> event sequence was continuous and returned a new special every hour. As long as the subscription is active, it would chain a call to the GetBookReview operation every hour and produce a new result. We can rewrite the above as methods if you’re not a Linq fan.



private static void ChainingWithMethods()
{
    var client = new BooksOnlineServiceClient();
    var query = client.GetSpecialAsObservable()
        .SelectMany(special =>
            client.GetBookReviewAsObservable(special.Id),
            (special, review) =>
                new
                {
                    Book = special,
                    Review = review
                });
    var mysubscription = query.Subscribe(
        result =>
        {
            //Do something with the result. 
        },
        error => Console.WriteLine(error.Message));
}

The first two problems are addressed, we have a single error handler and the whole chain can be cancelled by disposing the subscription token, so what about chaining multiple calls off the first? I’m going to make calls to the GetBooksByAuthor and GetBooksByGenre methods to fill in the similar books and other books by the same author. Although they will be chained, I want them to be called in parallel.



private static void ChainingMultipleCalls()
{
    var client = new BooksOnlineServiceClient();
    var query = from special in client.GetSpecialAsObservable()
                from details in client.GetBookReviewAsObservable(special.Id)
                                .Zip(client.GetBooksByAuthorAsObservable(special.Author),
                                (lhs, sameAuthor) => new
                                    {
                                        Review = lhs,
                                        SameAuthor = sameAuthor
                                    })
                                .Zip(client.GetBooksByGenreAsObservable(special.Genre),
                                (lhs, sameGenre) => new
                                    {
                                        lhs.Review,
                                        lhs.SameAuthor,
                                        SimilarBooks = sameGenre
                                    })
                select new
                {
                    Book = special,
                    details.Review,
                    details.SameAuthor,
                    details.SimilarBooks
                };
    var mysubscription = query.Subscribe(
        result =>
        {
            //Do something with the result. 
        },
        error => Console.WriteLine(error.Message));
}

I’ve made use of the “Zip” operator for observables that will take two observable sequences and pair up each result using the lambda expression in its second argument. In this case I know that each of the services only has a single result, so the zip works nicely. With continuous observables you may want to use another operator like “CombineLatest”.


In Summary:


1. When you think of chaining in RX, think SelectMany.

Wednesday, 23 February 2011

Generate RX Methods For A WCF Client Using T4

This is a Text Template (T4) to generate reactive extensions (RX) for .Net extension methods for WCF service clients.

Reactive extensions for .Net is a library that simplifies coordination of asynchronous methods in .Net amongst other things. I’ll assume that if you’re reading this post you already know how to use RX, if not you could have a look at the beginners page.

I have my contrived WCF service that I have hosted in a console app and the service contract looks like this:

[ServiceContract]
[DataContractFormat]
public interface IDemoService
{
    [OperationContract]
    int MethodWithReturnValue(string arg1, int arg2);
 
    [OperationContract]
    void MethodWithNoReturnValue(string sdjsdkdskjksdj);
 
    [OperationContract]
    void NoArgsNoReturn();
 
    void NotAMethod();
}


I then turn to my client application and add a WCF service reference, and make sure I’ve checked the “Generate asynchronous operations” option.



image



To use the RX extensions, download the appropriate version and reference System.Reactive.dll in the client project. Creating the RX WCF extensions should be as simple as adding the T4 template to the client project. There is also a vb.net version in the zip file.



image



The T4 template generates an extension class for each service contract interface that it finds, with IObservable<T> methods for each asynchronous service operation.






public static IObservable<int> MethodWithReturnValueAsObservable(this ServiceReference2.IDemoService service, String arg1, Int32 arg2)
{
    return Observable
                .Start(()=> Observable.FromAsyncPattern<string, int, int>(service.BeginMethodWithReturnValue, service.EndMethodWithReturnValue)(arg1, arg2))
                .SelectMany(p=>p);
}


Observable.FromAsyncPattern is a method to turn a BeginXyz/EndXyz asynchronous pattern into a function that returns an observable collection. When you invoke the function and subscribe to the resulting Observable<T>, you can be notified when the result returns or combine it with other results. I have wrapped that method in a lambda expression so that it only gets invoked when the client subscribes to the Observable<T>.






()=> Observable.FromAsyncPattern<string, int, int>(service.BeginMethodWithReturnValue, service.EndMethodWithReturnValue)(arg1, arg2)


After that using the SelectMany(…) to flatten the IObservable<IObservable<T>> returned by the Start(…) method into an IObservable<T>.



Subscribing to the generated client looks something like this:






var client = new ServiceReference2.DemoServiceClient();
IObservable<int> observable = client.MethodWithReturnValueAsObservable("test", 12);
observable.Subscribe(
    next => MessageBox.Show(next.ToString()), 
    error => MessageBox.Show(error.Message), 
    () => MessageBox.Show("Complete"));


So in summary:





  1. Create your WCF service.

  2. Add a WCF service reference to the client project with asynchronous operations.

  3. Add a reference to Reactive eXtensions to the client project.

  4. Drop in the T4 template to generate the IObservable extension methods.

[UPDATE 1.0.0.1] Fixed templates for arrays and included correct T4 template for VB.net
[UPDATE 1.0.0.2] Added templates for RX 1.0.10621


Download T4 templates