Much of RX is focused on asynchronous operations or events happening in parallel, and the coordination of those results across multiple threads. Very little attention has been given to chaining – quite often you cannot make the next web service call until you know the result of the previous one or which button the user clicked.
The way we’ve done it in the past was typically in the callback of the first async operation, we invoke the second operation which has yet another callback and so on. The first chained call is not too hard but it quickly gets messy as you add more async operations.
Below is my (once again contrived) web service, and I’ve used the WCF RX extension template from my previous post to wrap up the WCF calls in IObservable<T> extension methods.
This is my service contract:
[ServiceContract]
public interface IBooksOnlineService
{[OperationContract]
Book GetSpecial();
[OperationContract]
string GetBookReview(int bookId);
[OperationContract]
List<Book> GetBooksByAuthor(string author);[OperationContract]
List<Book> GetBooksByGenre(string genre);[OperationContract]
void AddFavourites(List<Book> books);}
[DataContract]
public class Book
{[DataMember]
public int Id { get; set; }
[DataMember]
public string Title {get;set;}
[DataMember]
public string Author { get; set; }
[DataMember]
public string Genre { get; set;}
}
And the client proxy methods are wrapped up like this:
public static IObservable<Book> GetSpecialAsObservable(this IBooksOnlineService service)
{ return Observable.Start(()=> Observable.FromAsyncPattern<Book>(service.BeginGetSpecial, service.EndGetSpecial)())
.SelectMany(p=>p);
}
To demonstrate the good ol’ favourite way of chaining things, below is an example of the subscribe-in-the-callback method. There’s a number of problems with this arrangement:
- Two error handlers
- Cancelling the request is complicated because you don’t know which request is being processed.
- Adding another chained method or even a parallel chained method is not going to be pretty.
- When you start trying to unit test this, you are going to end up in a world of pain. (more about unit testing RX coming in a later post)
private static void ChainingTheClumsyWay()
{ var client = new BooksOnlineServiceClient();var mysubscription = client
.GetSpecialAsObservable()
.Subscribe(special => //This is the callback of the first method. {var reviewSubscription = client
.GetBookReviewAsObservable(special.Id)
.Subscribe(review => //This is the callback of the second method { var bookWithReview = new {Book = special,
Review = review
};
//Do something with the result.});
},
specialErrors => Console.WriteLine(specialErrors.Message));
}
Rewriting this as Linq makes it far easier to understand:
private static void ChainingWithLinq()
{ var client = new BooksOnlineServiceClient(); var query = from special in client.GetSpecialAsObservable() from review in client.GetBookReviewAsObservable(special.Id) select new {Book = special,
Review = review
};
var mysubscription = query.Subscribe(
result =>
{ //Do something with the result. },
error => Console.WriteLine(error.Message));
}
What that Linq syntax is going to do under the covers is a SelectMany(…) on the first result. The SelectMany allows you to create a new IObservable<string> for each item returned in the OnNext by the first IObservable<Book>, then flattens the result. In this case we are only dealing with sequences of one event for both observables, but it works equally well if the Observable<Book> event sequence was continuous and returned a new special every hour. As long as the subscription is active, it would chain a call to the GetBookReview operation every hour and produce a new result. We can rewrite the above as methods if you’re not a Linq fan.
private static void ChainingWithMethods()
{ var client = new BooksOnlineServiceClient();var query = client.GetSpecialAsObservable()
.SelectMany(special =>
client.GetBookReviewAsObservable(special.Id),
(special, review) =>
new {Book = special,
Review = review
});
var mysubscription = query.Subscribe(
result =>
{ //Do something with the result. },
error => Console.WriteLine(error.Message));
}
The first two problems are addressed, we have a single error handler and the whole chain can be cancelled by disposing the subscription token, so what about chaining multiple calls off the first? I’m going to make calls to the GetBooksByAuthor and GetBooksByGenre methods to fill in the similar books and other books by the same author. Although they will be chained, I want them to be called in parallel.
private static void ChainingMultipleCalls()
{ var client = new BooksOnlineServiceClient(); var query = from special in client.GetSpecialAsObservable() from details in client.GetBookReviewAsObservable(special.Id).Zip(client.GetBooksByAuthorAsObservable(special.Author),
(lhs, sameAuthor) => new {Review = lhs,
SameAuthor = sameAuthor
})
.Zip(client.GetBooksByGenreAsObservable(special.Genre),
(lhs, sameGenre) => new {lhs.Review,
lhs.SameAuthor,
SimilarBooks = sameGenre
})
select new {Book = special,
details.Review,
details.SameAuthor,
details.SimilarBooks
};
var mysubscription = query.Subscribe(
result =>
{ //Do something with the result. },
error => Console.WriteLine(error.Message));
}
I’ve made use of the “Zip” operator for observables that will take two observable sequences and pair up each result using the lambda expression in its second argument. In this case I know that each of the services only has a single result, so the zip works nicely. With continuous observables you may want to use another operator like “CombineLatest”.
In Summary:
1. When you think of chaining in RX, think SelectMany.