docs-06-workflow.md 5.8 KB

中文版

06 - Integrating with the asynchronous Workflow framework

1. Server

You can follow the detailed example below:

  • Echo RPC sends an HTTP request to the upstream modules when it receives the request.
  • After the request to the upstream modules is completed, the server populates the body of HTTP response into the message of the response and send a reply to the client.
  • We don't want to block/occupy the handler thread, so the request to the upstream must be asynchronous.
  • First, we can use WFTaskFactory::create_http_task() of the factory of Workflow to create an asynchronous http_task.
  • Then, we use ctx->get_series() of the RPCContext to get the SeriesWork of the current ServerTask.
  • Finally, we use the push_back() interface of the SeriesWork to append the http_task to the SeriesWork.

    class ExampleServiceImpl : public Example::Service
    {
    public:
    void Echo(EchoRequest *request, EchoResponse *response, RPCContext *ctx) override
    {
        auto *http_task = WFTaskFactory::create_http_task("https://www.sogou.com", 0, 0,
            [request, response](WFHttpTask *task) {
                if (task->get_state() == WFT_STATE_SUCCESS)
                {
                    const void *data;
                    size_t len;
                    task->get_resp()->get_parsed_body(&data, &len);
                    response->mutable_message()->assign((const char *)data, len);
                }
                else
                    response->set_message("Error: " + std::to_string(task->get_error()));
    
                printf("Server Echo()\nget_req:\n%s\nset_resp:\n%s\n",
                                            request->DebugString().c_str(),
                                            response->DebugString().c_str());
            });
    
        ctx->get_series()->push_back(http_task);
    }
    };
    

2. Client

You can follow the detailed example below:

  • We send two requests in parallel. One is an RPC request and the other is an HTTP request.
  • After both requests are finished, we initiate a calculation task again to calculate the sum of the squares of the two numbers.
  • First, use create_Echo_task() of the RPC Client to create an rpc_task, which is an asynchronous RPC network request.
  • Then, use WFTaskFactory::create_http_task and WFTaskFactory::create_go_task in the the factory of Workflow to create an asynchronous network task http_task and an asynchronous computing task calc_task respectively.
  • Finally, use the serial-parallel graph to organize three asynchronous tasks, in which the multiplication sign indicates parallel tasks and the greater than sign indicates serial tasks and then execute start().

    void calc(int x, int y)
    {
    int z = x * x + y * y;
    
    printf("calc result: %d\n", z);
    }
    
    int main()
    {
    Example::SRPCClient client("127.0.0.1", 1412);
    
    auto *rpc_task = client.create_Echo_task([](EchoResponse *response, RPCContext *ctx) {
        if (ctx->success())
            printf("%s\n", response->DebugString().c_str());
        else
            printf("status[%d] error[%d] errmsg:%s\n",
                    ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
    });
    
    auto *http_task = WFTaskFactory::create_http_task("https://www.sogou.com", 0, 0, [](WFHttpTask *task) {
        if (task->get_state() == WFT_STATE_SUCCESS)
        {
            std::string body;
            const void *data;
            size_t len;
    
            task->get_resp()->get_parsed_body(&data, &len);
            body.assign((const char *)data, len);
            printf("%s\n\n", body.c_str());
        }
        else
            printf("Http request fail\n\n");
    });
    
    auto *calc_task = WFTaskFactory::create_go_task(calc, 3, 4);
    
    EchoRequest req;
    req.set_message("Hello!");
    req.set_name("1412");
    rpc_task->serialize_input(&req);
    
    WFFacilities::WaitGroup wait_group(1);
    
    SeriesWork *series = Workflow::create_series_work(http_task, [&wait_group](const SeriesWork *) {
        wait_group.done();
    });
    series->push_back(rpc_task);
    series->push_back(calc_task);
    series->start();
    
    wait_group.wait();
    return 0;
    }
    

3. Upstream

SRPC can directly use any component of Workflow, the most commonly used is Upstream, any kind of client of SRPC can use Upstream.

You may use the example below to construct a client that can use Upstream through parameters:

#include "workflow/UpstreamManager.h"

int main()
{
    // 1. create upstream and add server instances
    UpstreamManager::upstream_create_weighted_random("echo_server", true);
    UpstreamManager::upstream_add_server("echo_server", "127.0.0.1:1412");
    UpstreamManager::upstream_add_server("echo_server", "192.168.10.10");
    UpstreamManager::upstream_add_server("echo_server", "internal.host.com");

    // 2. create params and fill upstream name
    RPCClientParams client_params = RPC_CLIENT_PARAMS_DEFAULT;
    client_params.host = "echo_server";
    client_params.port = 1412; // this port only used when upstream URI parsing and will not affect the select of instances

    // 3. construct client by params, the rest of usage is similar as other tutorials
    Example::SRPCClient client(&client_params);

    ...

If we use the ConsistentHash or Manual upstream, we often need to distinguish different tasks for the selection algorithm. At this time, we may use the int set_uri_fragment(const std::string& fragment); interface on the client task to set request-level related information.

This field is the fragment in the URI. For the semantics, please refer to RFC3689 3.5-Fragment, any infomation that needs to use the fragment (such as other information included in some other selection policy), you may use this field as well.