Examples in C++

Neutral native API for business logic

// The only header needed.
#include <futoin/iasyncsteps.hpp>

// Just for examples
#include <list>
#include <vector>

extern void do_some_non_blocking_stuff();
extern void this_code_is_not_executed();
extern void this_code_IS_executed();
extern futoin::ISync& get_some_synchronization_object();
extern int schedule_external_callback(std::function<void(bool)>);
extern void external_cancel(int);

// Just for shorter code
using futoin::ErrorCode;
using futoin::IAsyncSteps;

// Example of entry point
void example_business_logic(IAsyncSteps& asi)
{
    // 1. regular step example
    //
    // Prototypes:
    // - asi.add(func(asi));
    // - asi.add(func(asi), on_error(asi, code));
    // - asi.add(func(asi, [A [,B [,C [,D]]]]));
    // - asi.add(func(asi, [A [,B [,C [,D]]]]), on_error(asi, code));
    //
    asi.add([](IAsyncSteps& asi) { do_some_non_blocking_stuff(); });

    // 2. try {} catch {} block example
    asi.add(
            [](IAsyncSteps& asi) {
                // regular step example
                do_some_non_blocking_stuff();

                asi.error("MyError"); // throw error

                this_code_is_not_executed();
            },
            [](IAsyncSteps& asi, ErrorCode code) {
                if (code == "MyError") {
                    // Override error unwind
                    asi.success(); // or just asi()
                } else {
                    asi.error("OverrideErrorCode");
                }
            });

    // 3. Inner steps
    asi.add(
            [](IAsyncSteps& asi) {
                // regular step example
                do_some_non_blocking_stuff();

                asi.add([](IAsyncSteps& asi) {
                    asi.error("MyError"); // throw error
                });

                // NOTE: inner steps are executed AFTER outer step body
                this_code_IS_executed();
            },
            [](IAsyncSteps& asi, ErrorCode code) {
                if (code == "MyError") {
                    asi();
                }
            });

    // 4. Passing arbitrary parameters
    asi.add([](IAsyncSteps& asi) {
        asi.success(123, true, "SomeString", std::vector<int>({1, 2, 3}));
        // or just asi(...);
    });
    asi.add([](IAsyncSteps& asi,
               int a,
               bool b,
               futoin::string&& c,
               std::vector<int>&& d) {
        // NOTE: Maximum of 4 arguments is supported based on best practices
        assert(a == 123);
        assert(b);
        assert(c == "SomeString");
        assert(d[0] == 1);
    });
    asi.add([](IAsyncSteps& asi, int a, bool b) {
        // Only C++-specific - result variables can be re-used in following
        // steps. NOTE: it's undefined behavior in canonical specification.
        assert(a == 123);
        assert(b);
    });

    // 5. state() - Thread Local Storage emulation
    //
    // Some predefined properties are set directly on State object for
    // performance reasons.
    //
    // Business logic can use custom dynamic items as associative key-value map.
    // Key is string, but value is of futoin::any type.
    {
        // get reference to state object
        auto& state = asi.state();

        // Get set-default variable
        auto some_var = asi.state("SomeVar", 123);
        asi.state("SomeVar", 234);

        // Check the var is saved only on first attempt
        assert(some_var == futoin::any_cast<int>(state["SomeVar"]));
        // Same, but more clean way
        assert(some_var == asi.state<int>("SomeVar"));

        // Set something more complex
        using V = std::vector<int>;
        state["SomeVector"] = V({1, 2, 3});
        auto& v = asi.state<V>("SomeVector");

        asi.add([](IAsyncSteps& asi) {
            asi.state<V>("SomeVector");
            asi.state<int>("SomeVar");

            try {
                asi.state<V>("SomeVar");
            } catch (const std::bad_cast& e) {
                //...
            }
        });
    }

    // 6. Advanced error handling
    asi.state().unhandled_error = [](ErrorCode code) {
        // This handler would be called instead of default
        // std::terminate(), if unhandled error is thrown.
    };

    asi.add(
            [](IAsyncSteps& asi) {
                // NOTE: error codes are associative, but not regular integers
                // to be
                //       more network-friendly.
                asi.error("MyError", "Some arbitrary description of the error");
            },
            [](IAsyncSteps& asi, ErrorCode code) {
                // ErrorCode is wrapper around const char*
                assert(code == "MyError");
                // Error info is stored in state
                assert(asi.state().error_info
                       == "Some arbitrary description of the error");
                // Last exception thrown is also available in state
                std::exception_ptr e = asi.state().last_exception;
            });

    // 7. Synchronization
    //
    // Unlike hardware race conditions, AsyncSteps synchronization serves
    // logical purposes to limit concurrency of execution or rate of calls or
    // both.
    //
    // FTN12 concept defines Mutex, Throttle and Limiter primitives which
    // implement a single ISync interface.

    futoin::ISync& obj = get_some_synchronization_object();

    // The same interface as asi.add(), but with extra synchronization object
    // parameter.
    asi.sync(
            obj,
            [](IAsyncSteps& asi) {},
            [](IAsyncSteps& asi, ErrorCode code) {} // optional
    );

    // 8. Loops
    asi.loop([](IAsyncSteps& asi) {
        // infinite loop
    });
    asi.repeat(10, [](IAsyncSteps& asi, size_t i) {
        // range loop from i=0 till i=9 (inclusive)
    });
    asi.forEach(
            std::vector<int>{1, 2, 3}, [](IAsyncSteps& asi, size_t i, int v) {
                // Iteration of vector-like and list-like objects
            });
    asi.forEach(
            std::list<futoin::string>{"1", "2", "3"},
            [](IAsyncSteps& asi, size_t i, const futoin::string& v) {
                // Iteration of vector-like and list-like objects
            });
    asi.forEach(
            std::map<futoin::string, futoin::string>(),
            [](IAsyncSteps& asi,
               const futoin::string& key,
               const futoin::string& v) {
                // Iteration of map-like objects
            });

    std::map<futoin::string, futoin::string> non_const_map;
    asi.forEach(
            non_const_map,
            [](IAsyncSteps& asi, const futoin::string& key, futoin::string& v) {
                // Iteration of map-like objects, note the value reference type
            });

    // 9. Timeout support
    asi.add(
            [](IAsyncSteps& asi) {
                // Raises Timeout error after specified period
                asi.setTimeout(std::chrono::seconds{10});

                asi.loop([](IAsyncSteps& asi) {
                    // infinite loop
                });
            },
            [](IAsyncSteps& asi, ErrorCode code) {
                if (code == futoin::errors::Timeout) {
                    asi();
                }
            });

    // 10. External event integration
    asi.add([](IAsyncSteps& asi) {
        auto handle = schedule_external_callback([&](bool err) {
            if (err) {
                try {
                    asi.error("ExternalError");
                } catch (...) {
                    // pass
                }
            } else {
                asi.success();
            }
        });

        asi.setCancel([=](IAsyncSteps& asi) { external_cancel(handle); });
    });
    asi.add([](IAsyncSteps& asi) {
        auto handle = schedule_external_callback([&](bool err) {
            if (!asi) {
                // AsyncSteps object is invalidated due to external cancel.
                //
                // However, most likely this would lead to corrupted
                // memory read.
                //
                // Such approach makes sense only for technologies with
                // Garbage Collection without explicit heap management.
                //
                // Scheduled external callback must be manually canceled
                // before AsyncSteps flow is canceled.
            } else if (err) {
                try {
                    asi.error("ExternalError");
                } catch (...) {
                    // pass
                }
            } else {
                asi.success();
            }
        });

        // alias for setCancel() with noop handler
        asi.waitExternal();
    });

    // 11. Standard Promise/Await integration
    asi.add([](IAsyncSteps& asi) {
        // Proper way to create new AsyncSteps instances
        // without hard dependency on implementation.
        auto new_steps = asi.newInstance();
        new_steps->add([](IAsyncSteps& asi) {});

        // Can be called outside of AsyncSteps event loop
        // new_steps.promise().wait();
        //  or
        // new_steps.promise<int>().get();

        // Proper way to wait for standard std::future
        asi.await(new_steps->promise());

        // Ensure instance lifetime
        asi.state()["some_obj"] = std::move(new_steps);
    });

    // 12. Parallel execution
    //
    // It's designed for concurrent execution of sub-flows
    // with shared state().
    //
    // Unhandled error in sub-flows lead to abort of all non-executed
    // parallel steps.
    using OrderVector = std::vector<int>;
    asi.state("order", OrderVector{});

    auto& p = asi.parallel([](IAsyncSteps& asi, ErrorCode) {
        // Overall error handler
        asi.success();
    });
    p.add([](IAsyncSteps& asi) {
        // regular flow
        asi.state<OrderVector>("order").push_back(1);

        asi.add([](IAsyncSteps& asi) {
            asi.state<OrderVector>("order").push_back(4);
        });
    });
    p.add([](IAsyncSteps& asi) {
        asi.state<OrderVector>("order").push_back(2);

        asi.add([](IAsyncSteps& asi) {
            asi.state<OrderVector>("order").push_back(5);
            asi.error("SomeError");
        });
    });
    p.add([](IAsyncSteps& asi) {
        asi.state<OrderVector>("order").push_back(3);

        asi.add([](IAsyncSteps& asi) {
            asi.state<OrderVector>("order").push_back(6);
        });
    });

    asi.add([](IAsyncSteps& asi) {
        asi.state<OrderVector>("order"); // 1, 2, 3, 4, 5
    });

    // 13. Control of AsyncSteps flow
    {
        // std::unique_ptr<IAsyncSteps> with newly allocated instance
        auto new_steps = asi.newInstance();

        // Add steps
        new_steps->add([](IAsyncSteps& asi) {});
        new_steps->loop([](IAsyncSteps& asi) {});

        // Schedule execution of AsyncSteps flow
        new_steps->execute();

        // Cancel execution of AsyncSteps flow
        new_steps->cancel();
    }
}

Reference Implementation for execution

#include <futoin/ri/asyncsteps.hpp>
#include <futoin/ri/asynctool.hpp>

void inner_thread() {
    futoin::ri::AsyncTool at;
    
    futoin::ri::AsyncSteps asi;
    asi.state("requests", RequestManager());

    asi.loop([&at](futoin::IAsyncSteps &asi){
        // Some infinite loop logic
        auto request = ...;
        
        // Handle some new request
        auto steps = asi.newInstance().release();
        
        // That's just for example, real implementation must
        // manage request objects (their std::unique_ptr references).
        auto cleanup = [&at,steps]() {
            at.immediate([steps](){
                delete steps;
            });
        };
        
        steps->add([cleanup, request](futoin::IAsyncSteps &asi){
            asi.setCancel([cleanup](futoin::IAsyncSteps &asi){
                cleanup();
            });
            call_business_logic(asi, request);
        });
        steps->add([cleanup](futoin::IAsyncSteps &asi){
            cleanup();
        });
        steps->execute();
    });
    
    asi.promise.wait();
}

void external_event_loop() {
    futoin::ri::AsyncTool::Params prm;
    prm.mempool_mutex = false; // boost performance for single threaded

    futoin::ri::AsyncTool at([](){
        // Called when new jobs are scheduled through
        // immediate() or deferred() API from other threads.
        //
        // It's never called, if AsyncTool is not exposed to other threads.
        //
        // This callback disables spawn of internal thread.
    }, prm);

    for (;;) {
        // Real implementation should call it not earlier than delay
        // and only if there is some work to do.
        auto res = at.iterate();
        
        if (!res.have_work) {
            // wait for external events
        } else if (res.delay > 0) {
            // wait for external events with specified delay
        }
    }
}

Mutex

#include <futoin/ri/mutex.hpp>

using futoin::ri::Mutex;

// 1 concurrent flow with infinite wait queue
Mutex mtx_a;

// 10 concurrent flows with infinite wait queue
Mutex mtx_b{10};

// 1 concurrent flow with queue of 8 pending flows
Mutex mtx_c{1, 8};

asi.sync(mtx_a, [](IAsyncSteps& asi) {
    // synchronized section
    asi.add([](IAsyncSteps& asi) {
        // inner step in the section
        
        // This synchronization is NOOP for already
        // acquired Mutex.
        asi.sync(mtx_a, [](IAsyncSteps& asi) {
        });
    });
});

Throttle

#include <futoin/ri/throttle.hpp>

// Required to schedule period reset timer
futoin::ri::AsyncTool at;

using futoin::ri::Throttle;

// 2 flows-per-second with infinite queue
Throttle thr_a(at, 2);

// 4 flows-per-15-seconds with infinite queue
Throttle thr_b(at, 4, std::chrono::seconds(15));

// 4 flows-per-500-milliseconds with queue size of 12 flows
Throttle thr_c(at, 4, std::chrono::milliseconds(500), 12);

asi.sync(thr_a, [](IAsyncSteps& asi) {
    // synchronized section after rate barrier
    
    // This synchronization is accounted in rate!
    asi.sync(thr_a, [](IAsyncSteps& asi) {
    });
});

Limiter

#include <futoin/ri/limiter.hpp>

// Required to schedule period reset timer
futoin::ri::AsyncTool at;

using futoin::ri::Limiter;

// 1 concurrent flow with infinite wait queue
// 2 flows-per-second with infinite queue
Limiter::Params prm_a;
prm_a.rate = 2;

Limiter lmtr_a(at, prm_a);

// 10 concurrent flows with infinite wait queue
// 4 flows-per-15-seconds with infinite queue
Limiter::Params prm_b;
prm_b.concurrency = 10;
prm_b.rate = 4;
prm_b.period = std::chrono::seconds(15);

Limiter lmtr_b(at, prm_b);

// 1 concurrent flow with queue of 8 pending flows
// 4 flows-per-500-milliseconds with queue size of 12 flows
Limiter::Params prm_c;
prm_c.concurrency = 1;
prm_c.max_queue = 8;
prm_c.rate = 4;
prm_c.period = std::chrono::milliseconds(500);
prm_c.burst = 12;

Limiter lmtr_c(at, 4, std::chrono::milliseconds(500), 12);

asi.sync(lmtr_a, [](IAsyncSteps& asi) {
    // synchronized section after rate barrier
    
    // Not accounted for concurrency, but accounted for rate!
    asi.sync(lmtr_a, [](IAsyncSteps& asi) {
    });
});