A Retrospective
It’s difficult not to get enthusiastic about Reactive Extensions after being inspired by The Reactive Manifesto. Such arousal has resulted in a sequence of hello-world articles and examples of code. While the programming world in C #, Java and JavaScript is taken over by Reactive Extensions, it seems that the C++ world is slow to embrace RxCpp.
This article attempts to make RxCpp closer to C++ developers who still may not know how a responsive programming model might help them to better and more robust software. The latest ReactiveX Tutorial link lists are a good place to begin learning and grokking.
Testing concurrency with RxCpp
The version of C++ is less than a few steps away from C#. In a straightforward experiment, a set of integer values that are reached in certain intervals (a ticker) can be parameterised by coordinating (why coordination and not scheduling, as read in the RxCpp Developer guide:
1 2 3 4 |
auto seq = rxcpp::observable<>::interval( std::chrono::milliseconds(1), some_scheduler ); |
Workers made on the test scheduler presently have the Deterministic Test Scheduler API accessible:
1 2 3 |
auto sc = rxcpp::schedulers::make_test(); auto worker = sc.create_worker(); auto test = rxcpp::identity_same_worker(worker); |
The remainder should be read as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
int count = 0; WHEN("one subscribes to an observable sequence on the scheduler") { auto seq = rxcpp::observable<>::interval( std::chrono::milliseconds(1), test // on the test scheduler ).filter([](int i) { return i % 2; }); seq.subscribe([&count](int){ count++; }); THEN("the sequence is not run at first") { worker.sleep(2 /* ms */); CHECK(count == 0); AND_WHEN("the test scheduler is advanced manually") { THEN("the sequence is run as expected") { worker.advance_by(8 /* ms */); CHECK(count == 5); } } } } |