Przeglądaj źródła

support grpc and add GrpcDemo

wangguanfeng 2 lat temu
rodzic
commit
fc8ccbaf05

+ 34 - 4
cmake/BuildTarsCpp.cmake

@@ -8,9 +8,10 @@ macro(build_tars_server MODULE DEPS)
     aux_source_directory(. DIR_SRCS)
 
     FILE(GLOB TARS_LIST "${CMAKE_CURRENT_SOURCE_DIR}/*.tars")
+    FILE(GLOB PB_LIST "${CMAKE_CURRENT_SOURCE_DIR}/*.proto")
 
     set(TARS_LIST_DEPENDS)
-
+    set(PB_LIST_DEPENDS)
     if (TARS_LIST)
         set(CLEAN_LIST)
 
@@ -40,10 +41,39 @@ macro(build_tars_server MODULE DEPS)
         add_executable(${MODULE} ${DIR_SRCS})
 
         add_dependencies(${MODULE} ${TARS_TARGET})
+        
+    elseif(PB_LIST)
+        set(CLEAN_LIST)
+        set(_PROTOBUF_PROTOC ${CMAKE_BINARY_DIR}/src/protobuf/bin/protoc)
+
+        foreach (PB_SRC ${PB_LIST})
+            get_filename_component(NAME_WE ${PB_SRC} NAME_WE)
+
+            set(PB_H ${NAME_WE}.pb.h)
+            set(PB_CC ${NAME_WE}.pb.cc)
 
-    else(TARS_LIST)
+            set(CUR_PB_GEN ${CMAKE_CURRENT_SOURCE_DIR}/${PB_H} ${CMAKE_CURRENT_SOURCE_DIR}/${PB_CC})
+            LIST(APPEND PB_LIST_DEPENDS ${CUR_PB_GEN})
+
+            add_custom_command(OUTPUT ${CUR_PB_GEN}
+                    WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
+                    DEPENDS ${PROTO2TARS} ${_PROTOBUF_PROTOC}
+                    COMMAND ${_PROTOBUF_PROTOC} -I "${CMAKE_CURRENT_SOURCE_DIR}" 
+                                "${PB_SRC}" --cpp_out "${CMAKE_CURRENT_SOURCE_DIR}"        
+                    COMMENT "${_PROTOBUF_PROTOC} ${PB_SRC} ${CMAKE_CURRENT_SOURCE_DIR} ${CUR_PB_GEN}")
+
+            list(APPEND CLEAN_LIST ${CMAKE_CURRENT_SOURCE_DIR}/${PB_H} ${CMAKE_CURRENT_SOURCE_DIR}/${PB_CC})
+        endforeach ()
+
+        set_directory_properties(PROPERTIES ADDITIONAL_MAKE_CLEAN_FILES "${CLEAN_LIST}")
+
+        set(TARS_TARGET "TARS_${MODULE}")  
+        add_custom_target(${TARS_TARGET} ALL DEPENDS ${PB_LIST_DEPENDS})
+        add_executable(${MODULE} ${CLEAN_LIST} ${DIR_SRCS})
+        add_dependencies(${MODULE} ${TARS_TARGET})
+    else()
         add_executable(${MODULE} ${DIR_SRCS})
-    endif(TARS_LIST)
+    endif()
 
     if("${DEPS}" STREQUAL "")
         add_dependencies(${MODULE} tarsservant tarsutil)
@@ -63,7 +93,7 @@ macro(build_tars_server MODULE DEPS)
     endif()
 
     if(TARS_HTTP2)
-        target_link_libraries(${MODULE} ${LIB_HTTP2})
+        target_link_libraries(${MODULE} ${LIB_HTTP2} ${LIB_PROTOBUF})
     endif()
 
     if(TARS_GPERF)

+ 1 - 0
examples/CMakeLists.txt

@@ -2,6 +2,7 @@
 add_subdirectory(UtilDemo)
 add_subdirectory(CoroutineDemo)
 add_subdirectory(HttpDemo)
+add_subdirectory(GrpcDemo)
 add_subdirectory(CustomDemo)
 add_subdirectory(AuthDemo)
 

+ 2 - 0
examples/GrpcDemo/CMakeLists.txt

@@ -0,0 +1,2 @@
+add_subdirectory(GrpcClient)
+add_subdirectory(GrpcServer)

+ 1 - 0
examples/GrpcDemo/GrpcClient/CMakeLists.txt

@@ -0,0 +1 @@
+build_tars_server("GrpcClient" "GrpcServer")

+ 38 - 0
examples/GrpcDemo/GrpcClient/helloworld.proto

@@ -0,0 +1,38 @@
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "io.grpc.examples.helloworld";
+option java_outer_classname = "HelloWorldProto";
+option objc_class_prefix = "HLW";
+
+package helloworld;
+
+// The greeting service definition.
+service Greeter {
+  // Sends a greeting
+  rpc SayHello (HelloRequest) returns (HelloReply) {}
+}
+
+// The request message containing the user's name.
+message HelloRequest {
+  string name = 1;
+}
+
+// The response message containing the greetings
+message HelloReply {
+  string message = 1;
+}

+ 271 - 0
examples/GrpcDemo/GrpcClient/main.cpp

@@ -0,0 +1,271 @@
+/**
+ * Tencent is pleased to support the open source community by making Tars available.
+ *
+ * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except 
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed 
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the 
+ * specific language governing permissions and limitations under the License.
+ */
+
+#include <iostream>
+#include "util/tc_http.h"
+#include "util/tc_option.h"
+#include "util/tc_common.h"
+#include "util/tc_clientsocket.h"
+#include "util/tc_thread_pool.h"
+#include "util/tc_timeprovider.h"
+#include "servant/Application.h"
+#include "helloworld.pb.h"
+#include "util/tc_grpc.h"
+
+using namespace std;
+using namespace tars;
+
+Communicator* _comm;
+
+string grpcObj = "TestApp.GrpcServer.GrpcObj@";
+
+struct Param
+{
+	int count;
+	string call;
+	int thread;
+	string domain;
+
+	ServantPrx servant2Prx;
+};
+
+Param param;
+std::atomic<int> callback_count(0);
+
+void syncRpc2(int c)
+{
+	int64_t t = TC_Common::now2us();
+
+    //发起远程调用
+    for (int i = 0; i < c; ++i)
+    {
+	    shared_ptr<TC_HttpResponse> rsp;
+	    shared_ptr<TC_HttpRequest> req = std::make_shared<TC_HttpRequest>();
+		helloworld::HelloRequest request;
+		helloworld::HelloReply reply;
+		request.set_name(string("world-") + TC_Common::tostr(i));
+		string message = request.SerializeAsString();
+		addGrpcPrefix(message, false);
+		std::string url = "http://" + param.domain;
+		req->setPostRequest(url, message, true);
+		req->setHeader("content-type", "application/grpc");
+		req->setHeader("te", "trailers");
+		req->setHeader(":path", "/helloworld.Greeter/SayHello");
+		std::string content;
+		bool compressed;
+
+	    try
+	    {
+		    param.servant2Prx->http_call("SayHello", req, rsp);
+			content =  rsp->getContent();
+			
+			RemoveGrpcPrefix(content, &compressed);
+			reply.ParseFromString(content);
+			cout << "rsp content: " << reply.message() << endl;
+	    }
+	    catch (exception & e)
+	    {
+		    cout << "exception:" << e.what() << endl;
+	    }
+
+		std::string rspMessage = "Hello " + request.name();
+	    assert(rspMessage == reply.message());
+	    assert(req.use_count() == 1);
+	    assert(rsp.use_count() == 1);
+
+        ++callback_count;
+    }
+
+    int64_t cost = TC_Common::now2us() - t;
+    cout << "syncRpc2 total:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
+}
+
+struct TestHttpCallback : public HttpCallback
+{
+	TestHttpCallback(const string &buff) : _buff(buff)
+	{
+
+	}
+
+	virtual int onHttpResponse(const shared_ptr<TC_HttpResponse> &rsp)
+	{
+		callback_count++;
+		helloworld::HelloRequest request;
+		helloworld::HelloReply reply;
+		std::string content;
+		bool compressed;
+		content =  rsp->getContent();
+
+		RemoveGrpcPrefix(content, &compressed);
+		RemoveGrpcPrefix(_buff, &compressed);
+		request.ParseFromString(_buff);
+		reply.ParseFromString(content);
+
+		std::string rspMessage = "Hello " + request.name();
+	    assert(rspMessage == reply.message());
+
+		return 0;
+	}
+	virtual int onHttpResponseException(int expCode)
+	{
+		cout << "onHttpResponseException expCode:" << expCode  << endl;
+
+		callback_count++;
+
+		return 0;
+	}
+
+	string _buff;
+};
+
+void asyncRpc2(int c)
+{
+	int64_t t = TC_Common::now2us();
+
+	//发起远程调用
+	for (int i = 0; i < c; ++i)
+	{
+		shared_ptr<TC_HttpResponse> rsp;
+	    shared_ptr<TC_HttpRequest> req = std::make_shared<TC_HttpRequest>();
+		helloworld::HelloRequest request;
+		helloworld::HelloReply reply;
+		request.set_name(string("world-") + TC_Common::tostr(i));
+		string message = request.SerializeAsString();
+		addGrpcPrefix(message, false);
+		std::string url = "http://" + param.domain;
+		req->setPostRequest(url, message, true);
+		req->setHeader("content-type", "application/grpc");
+		req->setHeader(":path", "/helloworld.Greeter/SayHello");
+		req->setHeader("te", "trailers");
+
+
+		HttpCallbackPtr p = new TestHttpCallback(message);
+
+		try
+		{
+			param.servant2Prx->http_call_async("hello", req, p);
+		}
+		catch(exception& e)
+		{
+			cout << "exception:" << e.what() << endl;
+		}
+
+        if(i % 500 == 0)
+        {
+            TC_Common::msleep(100);
+        }
+	}
+
+	int64_t cost = TC_Common::now2us() - t;
+	cout << "asyncRpc2 send:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
+}
+
+
+int main(int argc, char *argv[])
+{
+    try
+    {
+        if (argc < 4)
+        {
+	        cout << "Usage:" << argv[0] << " --domain=127.0.0.1:50051 --count=1000 --call=[sync|async] --thread=1" << endl;
+
+	        return 0;
+        }
+
+	    TC_Option option;
+        option.decode(argc, argv);
+
+		param.domain = option.getValue("domain");
+		if(param.domain.empty()) param.domain = "127.0.0.1:50051";
+		param.count = TC_Common::strto<int>(option.getValue("count"));
+	    if(param.count <= 0) param.count = 1000;
+	    param.call = option.getValue("call");
+	    if(param.call.empty()) param.call = "sync";
+	    param.thread = TC_Common::strto<int>(option.getValue("thread"));
+	    if(param.thread <= 0) param.thread = 1;
+
+        _comm = new Communicator();
+
+//         TarsRollLogger::getInstance()->logger()->setLogLevel(6);
+
+        _comm->setProperty("sendqueuelimit", "1000000");
+        _comm->setProperty("asyncqueuecap", "1000000");
+
+		string ip = param.domain.substr(0, param.domain.find(":"));
+		string port = param.domain.substr(param.domain.find(":") + 1);
+		grpcObj += "tcp -h " + ip + " -p " + port;
+		std::cout << "grpcObj: " << grpcObj << std::endl;
+        param.servant2Prx = _comm->stringToProxy<ServantPrx>(grpcObj);
+
+	    param.servant2Prx->tars_connect_timeout(5000);
+        param.servant2Prx->tars_async_timeout(60*1000);
+
+        param.servant2Prx->tars_set_protocol(ServantProxy::PROTOCOL_GRPC);
+
+        int64_t start = TC_Common::now2us();
+
+        std::function<void(int)> func;
+
+        if (param.call == "sync")
+        {
+            func = syncRpc2;
+        }
+        else if(param.call == "async")
+        {
+        	func = asyncRpc2;
+        }
+        else
+        {
+        	cout << "no func, exits" << endl;
+        	exit(0);
+        }
+
+	    vector<std::thread*> vt;
+        for(int i = 0 ; i< param.thread; i++)
+        {
+            vt.push_back(new std::thread(func, param.count));
+        }
+
+        std::thread print([&]{while(callback_count != param.count * param.thread) {
+	        cout << "Grpc:" << param.call << ": ----------finish count:" << callback_count << endl;
+	        std::this_thread::sleep_for(std::chrono::seconds(1));
+        };});
+
+        for(size_t i = 0 ; i< vt.size(); i++)
+        {
+            vt[i]->join();
+            delete vt[i];
+        }
+
+        cout << "(pid:" << std::this_thread::get_id() << ")"
+             << "(count:" << param.count << ")"
+             << "(use ms:" << (TC_Common::now2us() - start)/1000 << ")"
+             << endl;
+
+	    while(callback_count != param.count * param.thread) {
+		    std::this_thread::sleep_for(std::chrono::seconds(1));
+	    }
+	    print.join();
+	    cout << "----------finish count:" << callback_count << endl;
+    }
+    catch(exception &ex)
+    {
+        cout << ex.what() << endl;
+    }
+    cout << "main return." << endl;
+
+    return 0;
+}

+ 1 - 0
examples/GrpcDemo/GrpcServer/CMakeLists.txt

@@ -0,0 +1 @@
+build_tars_server("GrpcServer" "")

+ 80 - 0
examples/GrpcDemo/GrpcServer/GrpcImp.cpp

@@ -0,0 +1,80 @@
+/**
+ * Tencent is pleased to support the open source community by making Tars available.
+ *
+ * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except 
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed 
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the 
+ * specific language governing permissions and limitations under the License.
+ */
+
+#include "GrpcImp.h"
+#include "servant/Application.h"
+#include "helloworld.pb.h"
+#include "GrpcServer.h"
+
+using namespace std;
+
+//////////////////////////////////////////////////////
+void GrpcImp::initialize()
+{
+    //initialize servant here:
+    //...
+}
+
+//////////////////////////////////////////////////////
+void GrpcImp::destroy()
+{
+    //destroy servant here:
+    //...
+}
+
+int GrpcImp::doRequest(TarsCurrentPtr current, vector<char> &buffer)
+{
+    shared_ptr<TC_GrpcServer> session = TC_GrpcServer::getHttp2(current->getUId());
+
+	vector<shared_ptr<TC_GrpcServer::Http2Context>> contexts = session->decodeRequest();
+
+	for(size_t i = 0; i< contexts.size(); ++i)
+	{
+		shared_ptr<TC_GrpcServer::Http2Context> context = contexts[i];
+
+		std::string content =  context->request.getContent();
+		bool compressed;
+		RemoveGrpcPrefix(content, &compressed);
+		string path = context->request.getHeader(":path");
+		if (path == "/helloworld.Greeter/SayHello") {
+			vector<char> data;
+			helloworld::HelloReply reply;
+			helloworld::HelloRequest request;
+			request.ParseFromString(content);
+			TLOGDEBUG("doRequest request: " << request.name() << endl);
+			reply.set_message("Hello " + request.name());
+			string message = reply.SerializeAsString();
+			addGrpcPrefix(message, false);
+			session->packGrpcResponse(context, 200, message);
+			int ret = session->encodeResponse(context, "0", data);
+			if(ret != 0)
+			{
+				cout << "encodeResponse error:" << session->getErrMsg() << endl;
+			}
+
+			buffer.insert(buffer.end(), data.begin(), data.end());
+		}
+	}
+
+    return 0;
+}
+
+
+int GrpcImp::doClose(TarsCurrentPtr current)
+{
+    TC_GrpcServer::delHttp2(current->getUId());
+    return 0;
+}

+ 60 - 0
examples/GrpcDemo/GrpcServer/GrpcImp.h

@@ -0,0 +1,60 @@
+/**
+ * Tencent is pleased to support the open source community by making Tars available.
+ *
+ * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except 
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed 
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the 
+ * specific language governing permissions and limitations under the License.
+ */
+
+#ifndef _GrpcImp_H_
+#define _GrpcImp_H_
+
+
+#include "servant/Application.h"
+
+using namespace tars;
+using namespace std;
+
+/**
+ *
+ *
+ */
+class GrpcImp : public Servant
+{
+public:
+    /**
+     *
+     */
+    virtual ~GrpcImp() {}
+
+    /**
+     *
+     */
+    virtual void initialize();
+
+    /**
+     *
+     */
+    virtual void destroy();
+
+    /**
+     *
+     */
+    int doRequest(TarsCurrentPtr current, vector<char> &buffer);
+
+    /**
+     * close connection
+     */    
+    int doClose(TarsCurrentPtr current);
+
+};
+/////////////////////////////////////////////////////
+#endif

+ 73 - 0
examples/GrpcDemo/GrpcServer/GrpcServer.cpp

@@ -0,0 +1,73 @@
+/**
+ * Tencent is pleased to support the open source community by making Tars available.
+ *
+ * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except 
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed 
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the 
+ * specific language governing permissions and limitations under the License.
+ */
+
+#include "GrpcServer.h"
+#include "GrpcImp.h"
+
+using namespace std;
+
+GrpcServer g_app;
+
+void
+GrpcServer::initialize()
+{
+    //initialize application here:
+    //...
+
+    std::string servant = ServerConfig::Application + "." + ServerConfig::ServerName + ".GrpcObj";
+
+    addServant<GrpcImp>(servant);
+    addServantProtocol(servant, &TC_GrpcServer::parseGrpc);
+
+    /*
+    string adapterName = _servantHelper->getServantAdapter(servant);
+
+    if (adapterName == "")
+    {
+        throw runtime_error("addServantProtocol fail, no found adapter for servant:" + servant);
+    }
+
+    getEpollServer()->getBindAdapter(adapterName)->setHandle<HttpHandle>(5);
+    */
+    
+}
+/////////////////////////////////////////////////////////////////
+void
+GrpcServer::destroyApp()
+{
+    //destroy application here:
+    //...
+}
+/////////////////////////////////////////////////////////////////
+int
+main(int argc, char* argv[])
+{
+    try
+    {
+        g_app.main(argc, argv);
+        g_app.waitForShutdown();
+    }
+    catch (std::exception& e)
+    {
+        cerr << "std::exception:" << e.what() << std::endl;
+    }
+    catch (...)
+    {
+        cerr << "unknown exception." << std::endl;
+    }
+    return -1;
+}
+/////////////////////////////////////////////////////////////////

+ 51 - 0
examples/GrpcDemo/GrpcServer/GrpcServer.h

@@ -0,0 +1,51 @@
+/**
+ * Tencent is pleased to support the open source community by making Tars available.
+ *
+ * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except 
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed 
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the 
+ * specific language governing permissions and limitations under the License.
+ */
+
+#ifndef _GrpcServer_H_
+#define _GrpcServer_H_
+
+#include <iostream>
+#include "servant/Application.h"
+#include "util/tc_grpc.h"
+
+using namespace tars;
+
+/**
+ *
+ **/
+class GrpcServer : public Application
+{
+public:
+    /**
+     *
+     **/
+    virtual ~GrpcServer() {};
+
+    /**
+     *
+     **/
+    virtual void initialize();
+
+    /**
+     *
+     **/
+    virtual void destroyApp();
+};
+
+extern GrpcServer g_app;
+
+////////////////////////////////////////////
+#endif

+ 63 - 0
examples/GrpcDemo/GrpcServer/config.conf

@@ -0,0 +1,63 @@
+<tars>
+  <application>
+    #proxy需要的配置
+    <client>
+        #地址
+        locator                     = tars.tarsregistry.QueryObj@tcp -h 127.0.0.1 -p 17890
+        #最大超时时间(毫秒)
+        sync-invoke-timeout          = 5000
+        #刷新端口时间间隔(毫秒)
+        refresh-endpoint-interval   = 10000
+        #模块间调用[可选]
+        stat                        = tars.tarsstat.StatObj
+        #发送队列长度
+        sendqueuelimit              = 100000
+        #异步回调队列个数限制
+        asyncqueuecap               = 100000
+        #网络异步回调线程个数
+        asyncthread                 = 3
+        #网络线程个数
+        netthread                   = 3
+        #合并回调线程和网络线程(以网络线程个数为准)
+        mergenetasync               = 0
+        #模块名称
+        modulename                  = TestApp.GrpcServer
+    </client>
+
+    #定义所有绑定的IP
+    <server>
+        closecout = 0
+        #应用名称
+        app      = TestApp
+        #服务名称
+        server   = GrpcServer
+        #服务的数据目录,可执行文件,配置文件等
+        basepath = ./
+        datapath = ./
+        #日志路径
+        logpath  = ./
+        mergenetimp = 0
+        #本地管理套接字[可选]
+        local   = tcp -h 127.0.0.1 -p 15001 -t 10000
+
+        #本地node的ip:port:timeout[可选]
+#        node    = ServerObj@tcp -h 127.0.0.1 -p 2345 -t 10000
+        #配置中心的地址[可选]
+#        config  = tars.tarsconfig.ConfigObj
+        #配置中心的地址[可选]
+#	notify  = tars.tarsconfig.NotifyObj
+        #远程LogServer[可选]
+#       log     = tars.tarslog.LogObj
+
+        <GrpcAdapter>
+            endpoint = tcp -h 0.0.0.0 -p 8082 -t 10000
+            allow	 =
+            maxconns = 4096
+            threads	 = 5
+            servant = TestApp.GrpcServer.GrpcObj
+            queuecap = 1000000
+            protocol = not-tars
+        </GrpcAdapter>
+    </server>
+  </application>
+</tars>

+ 38 - 0
examples/GrpcDemo/GrpcServer/helloworld.proto

@@ -0,0 +1,38 @@
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "io.grpc.examples.helloworld";
+option java_outer_classname = "HelloWorldProto";
+option objc_class_prefix = "HLW";
+
+package helloworld;
+
+// The greeting service definition.
+service Greeter {
+  // Sends a greeting
+  rpc SayHello (HelloRequest) returns (HelloReply) {}
+}
+
+// The request message containing the user's name.
+message HelloRequest {
+  string name = 1;
+}
+
+// The response message containing the greetings
+message HelloReply {
+  string message = 1;
+}

+ 66 - 0
servant/libservant/AppProtocol.cpp

@@ -16,6 +16,7 @@
 
 #include "util/tc_epoll_server.h"
 #include "util/tc_http.h"
+#include "util/tc_grpc.h"
 #include "servant/AppProtocol.h"
 #include "servant/Transceiver.h"
 #include "servant/AdapterProxy.h"
@@ -166,6 +167,71 @@ TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::http2Response(TC_NetWorkBuffer &in,
 	return flag;
 }
 
+
+// ENCODE function, called by network thread
+vector<char> ProxyProtocol::grpcRequest(RequestPacket& request, Transceiver *trans)
+{
+    TC_GrpcClient* session = (TC_GrpcClient*)trans->getSendBuffer()->getContextData();
+	if(session == NULL)
+	{
+		session = new TC_GrpcClient();
+
+		trans->getSendBuffer()->setContextData(session, [=](TC_NetWorkBuffer*nb){ delete session; });
+
+		session->settings(3000);
+	}
+
+    if (session->buffer().size() != 0) {
+        //直接发送裸得应答数据,业务层一般不直接使用,仅仅tcp支持
+		trans->getSendBuffer()->addBuffer(session->buffer());
+		auto data = trans->getSendBuffer()->getBufferPointer();
+		int iRet = trans->send(data.first, (uint32_t) data.second, 0);
+		trans->getSendBuffer()->moveHeader(iRet);
+        session->buffer().clear();
+    }
+
+	shared_ptr<TC_HttpRequest> *data = (shared_ptr<TC_HttpRequest>*)request.sBuffer.data();
+
+	request.iRequestId = session->submit(*(*data).get());
+
+	//这里把智能指针释放一次
+	(*data).reset();
+
+	if (request.iRequestId < 0)
+	{
+		TLOGERROR("http2Request::Fatal submit error: " << session->getErrMsg() << endl);
+		return vector<char>();
+	}
+
+//	cout << "http2Request id:" << request.iRequestId << endl;
+
+	vector<char> out;
+	session->swap(out);
+
+	return out;
+}
+
+TC_NetWorkBuffer::PACKET_TYPE ProxyProtocol::grpcResponse(TC_NetWorkBuffer &in, ResponsePacket& rsp)
+{
+	TC_GrpcClient* session = (TC_GrpcClient*)((Transceiver*)(in.getConnection()))->getSendBuffer()->getContextData();
+
+	pair<int, shared_ptr<TC_HttpResponse>> out;
+	TC_NetWorkBuffer::PACKET_TYPE flag = session->parseResponse(in, out);
+
+	if(flag == TC_NetWorkBuffer::PACKET_FULL)
+	{
+		rsp.iRequestId  = out.first;
+
+		rsp.sBuffer.resize(sizeof(shared_ptr<TC_HttpResponse>));
+
+		//这里智能指针有一次+1, 后面要自己reset掉
+		*(shared_ptr<TC_HttpResponse>*)rsp.sBuffer.data() = out.second;
+	}
+
+	return flag;
+}
+
+
 #endif
 }
 

+ 5 - 0
servant/libservant/ServantProxy.cpp

@@ -438,6 +438,11 @@ void ServantProxy::tars_set_protocol(SERVANT_PROTOCOL protocol, int connectionSe
 			proto.responseFunc  = ProxyProtocol::http2Response;
             connectionSerial    = 0;
 			break;
+        case PROTOCOL_GRPC:
+			proto.requestFunc   = ProxyProtocol::grpcRequest;
+			proto.responseFunc  = ProxyProtocol::grpcResponse;
+            connectionSerial    = 0;
+			break;
 #endif
 		case PROTOCOL_TARS:
 		default:

+ 6 - 0
servant/servant/AppProtocol.h

@@ -152,6 +152,12 @@ public:
 
     // DECODE function, called by network thread
     static TC_NetWorkBuffer::PACKET_TYPE http2Response(TC_NetWorkBuffer &in, ResponsePacket& done);
+
+        // ENCODE function, called by network thread
+    static vector<char> grpcRequest(tars::RequestPacket& request, Transceiver *);
+
+    // DECODE function, called by network thread
+    static TC_NetWorkBuffer::PACKET_TYPE grpcResponse(TC_NetWorkBuffer &in, ResponsePacket& done);
 #endif
 
     /**

+ 1 - 0
servant/servant/ServantProxy.h

@@ -452,6 +452,7 @@ public:
     	PROTOCOL_TARS,              //默认tars服务的协议
     	PROTOCOL_HTTP1,             //http协议
 	    PROTOCOL_HTTP2,             //http2协议
+        PROTOCOL_GRPC,              //grpc协议
     };
 
     /**

+ 10 - 0
util/include/util/tc_epoll_server.h

@@ -1396,6 +1396,16 @@ public:
          */
         int sendBufferDirect(const std::string& buff);
 
+        /**
+         * 直接发送裸得应答数据,业务层一般不直接使用,仅仅tcp支持
+         * send naked response data
+         * @param buffer
+         * @return int, -1:发送出错, 0:无数据, 1:发送完毕, 2:还有数据
+         * @return int, -1: sending error, 0: no data, 1: send completely, 2: data retains
+         * @return
+         */
+        int sendBufferDirect(const std::vector<char>& buff);
+
 	    /**
 		 * 关闭连接
 		 * Close the connection

+ 114 - 0
util/include/util/tc_grpc.h

@@ -0,0 +1,114 @@
+#ifndef __TC_GRPC_PROTOCOL_H__
+#define __TC_GRPC_PROTOCOL_H__
+
+#include <unordered_map>
+#include "util/tc_network_buffer.h"
+#include "util/tc_spin_lock.h"
+#include "util/tc_http2.h"
+#include "nghttp2/nghttp2.h"
+#include "util/tc_epoll_server.h"
+
+namespace tars
+{
+void addGrpcPrefix(string& body, bool compressed);
+bool RemoveGrpcPrefix(string& body, bool* compressed);
+
+class TC_GrpcServer : public TC_Http2Server
+{
+public:
+
+	/**
+	 * constructor
+	 */
+    TC_GrpcServer();
+
+    /**
+     * deconstructor
+     */
+    ~TC_GrpcServer();
+
+    
+	/**
+	 *
+	 * @param context
+	 * @param out
+	 * @return
+	 */
+	int encodeResponse(const shared_ptr<Http2Context> &context, std::string gStatus, vector<char> &out);
+
+	/**
+	 *
+	 * @param context
+	 * @param status
+	 * @param body
+	 * @return
+	 */
+    void packGrpcResponse(shared_ptr<TC_GrpcServer::Http2Context> &context, const int status, const string &body);
+
+    static shared_ptr<TC_GrpcServer> getHttp2(uint32_t uid);
+    static void addHttp2(uint32_t uid, const shared_ptr<TC_GrpcServer> &ptr);
+    static void delHttp2(uint32_t uid);
+    static TC_NetWorkBuffer::PACKET_TYPE parseGrpc(TC_NetWorkBuffer&in, vector<char> &out);
+
+protected:
+
+    static TC_SpinLock _mutex;
+    static unordered_map<int32_t, shared_ptr<TC_GrpcServer>> _http2;
+};
+
+/////////////////////////////////////////////////////////////////////////////////
+
+class TC_GrpcClient : public TC_Http2Client
+{
+public:
+
+	/**
+	 * constructor
+	 */
+    TC_GrpcClient();
+
+    /**
+     * deconstructor
+     */
+    ~TC_GrpcClient();
+
+
+	/**
+     * parse response
+     * @param in
+     */
+    TC_NetWorkBuffer::PACKET_TYPE parseResponse(TC_NetWorkBuffer &in, pair<int, shared_ptr<TC_HttpResponse>> &out);
+
+	//    int submit(const string &method, const string &path, const map<string, string> &header, const vector<char> &buff);
+	int submit(const TC_HttpRequest &request);
+		/**
+	 * @brief response
+	 */
+    std::unordered_map<int, shared_ptr<TC_HttpResponse>> &responses() { return _responses; }
+
+    /** 
+     * @brief response finished
+     */
+    std::unordered_map<int, shared_ptr<TC_HttpResponse>> &doneResponses() { return _doneResponses; }
+
+private:
+
+    /**
+     * 收到的响应
+	 * Responses received
+     */
+    std::unordered_map<int, shared_ptr<TC_HttpResponse>> _responses;
+
+    /**
+     * 收到的完整响应
+	 * Complete response received
+     */
+    std::unordered_map<int, shared_ptr<TC_HttpResponse>> _doneResponses;
+
+};
+
+  
+}
+
+
+#endif  //__TC_GRPC_PROTOCOL_H__

+ 8 - 0
util/include/util/tc_network_buffer.h

@@ -742,6 +742,14 @@ public:
      */
     static TC_NetWorkBuffer::PACKET_TYPE parseEcho(TC_NetWorkBuffer&in, vector<char> &out);
 
+    /**
+    * echo
+    * @param in
+    * @param out
+    * @return
+    */
+    static TC_NetWorkBuffer::PACKET_TYPE parseJson(TC_NetWorkBuffer&in, vector<char> &out);
+
 protected:
 
 	size_t getBuffers(char *buffer, size_t length) const;

+ 34 - 0
util/src/tc_epoll_server.cpp

@@ -1204,6 +1204,40 @@ int TC_EpollServer::Connection::sendBufferDirect(const std::string& buff)
     }
 }
 
+int TC_EpollServer::Connection::sendBufferDirect(const std::vector<char>& buff)
+{
+    _pBindAdapter->increaseSendBufferSize();
+    
+    if(getBindAdapter()->getEndpoint().isTcp())
+    {
+#if TAF_SSL
+        if (getBindAdapter()->getEndpoint().isSSL())
+        {
+            //assert(_openssl->isHandshaked());
+            
+            int ret = _openssl->write(&buff[0], buff.size(), _sendBuffer);
+            if (ret != 0)
+            {
+                _pBindAdapter->getEpollServer()->error("[TC_EpollServer::Connection] send direct error! " + TC_Common::tostr(ret));
+                return -1; // should not happen
+            }
+    
+        }
+        else
+#endif
+        {
+            _sendBuffer.addBuffer(buff);
+        }
+
+        return sendBuffer();
+    }
+    else
+    {
+        _pBindAdapter->getEpollServer()->error("[TC_EpollServer::Connection] send direct not support udp! ");
+        return -2;
+    }
+}
+
 int TC_EpollServer::Connection::send(const shared_ptr<SendContext> &sc)
 {
 	assert(sc);

+ 598 - 0
util/src/tc_grpc.cpp

@@ -0,0 +1,598 @@
+/**
+ * Tencent is pleased to support the open source community by making Tars available.
+ *
+ * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
+ *
+ * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except 
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * https://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed 
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the 
+ * specific language governing permissions and limitations under the License.
+ */
+#include <string>
+#include <iostream>
+#include "util/tc_grpc.h"
+
+namespace tars
+{
+
+TC_SpinLock TC_GrpcServer::_mutex;
+
+unordered_map<int32_t, shared_ptr<TC_GrpcServer>> TC_GrpcServer::_http2;
+
+void addGrpcPrefix(string& body, bool compressed)
+{
+    char buf[5];
+    buf[0] = (compressed ? 1: 0);
+    *(uint32_t*)(buf+1) = htonl(body.size());
+    string message;
+    message.append(buf, sizeof(buf));
+    message.append(body);
+    body.swap(message);
+}
+
+bool RemoveGrpcPrefix(string& body, bool* compressed)
+{
+    if (body.empty()) {
+        *compressed = false;
+        return true;
+    }
+
+    const size_t sz = body.size();
+    if (sz <(size_t)5) {
+        return false;
+    }
+
+    char buf[5];
+    body.copy(buf, 5, 0);
+    *compressed = buf[0];
+    const size_t message_len = ntohl(*(uint32_t*)(buf+1));
+    body.erase(0, 5);
+
+    return (message_len + 5 == sz);
+}
+
+namespace server
+{
+
+static ssize_t str_read_callback(nghttp2_session *session, int32_t stream_id,
+                                  uint8_t *buf, size_t length,
+                                  uint32_t *data_flags,
+                                  nghttp2_data_source *source,
+                                  void *user_data) 
+{
+    TC_Http2::DataPack *dataPack = (TC_Http2::DataPack*)(source->ptr);
+	if(dataPack->_readPos == dataPack->_length)
+	{
+		*data_flags |= NGHTTP2_DATA_FLAG_EOF;
+		return 0;
+	}
+    size_t size = std::min(dataPack->_length - dataPack->_readPos, length);
+
+    memcpy(buf, dataPack->_dataBuf + dataPack->_readPos, size);
+
+    dataPack->_readPos += size;
+	
+    if(dataPack->_readPos == dataPack->_length)
+    {
+        *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+    }
+  
+    return size;
+}
+
+static ssize_t send_callback(nghttp2_session *session, const uint8_t *data,
+                             size_t length, int flags, void *user_data) 
+{
+    TC_GrpcServer *ptr = (TC_GrpcServer*)user_data;
+	ptr->insertBuff((const char*)data, length);
+
+    return (ssize_t)length;
+}
+
+static int on_header_callback(nghttp2_session *session,
+                              const nghttp2_frame *frame, const uint8_t *name,
+                              size_t namelen, const uint8_t *value,
+                              size_t valuelen, uint8_t flags, void *user_data)
+{
+    TC_GrpcServer *ptr = (TC_GrpcServer*)user_data;
+    ptr->onHeaderCallback(frame->hd.stream_id, string((char*)name, namelen), string((char*)value, valuelen));
+    return 0;
+}
+
+static int on_begin_headers_callback(nghttp2_session *session,
+                                     const nghttp2_frame *frame,
+                                     void *user_data)
+{
+    TC_GrpcServer *ptr = (TC_GrpcServer*)user_data;
+
+    if (frame->hd.type != NGHTTP2_HEADERS || frame->headers.cat != NGHTTP2_HCAT_REQUEST) {
+        return 0;
+    }
+
+    ptr->onHeaderCallback(frame->hd.stream_id);
+    return 0;
+}
+
+static int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data)
+{
+    TC_GrpcServer *ptr = (TC_GrpcServer*)user_data;
+
+    switch (frame->hd.type)
+    {
+    case NGHTTP2_DATA:
+    case NGHTTP2_HEADERS:
+        /* Check that the client request has finished */
+        if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)
+        {
+            ptr->onFrameRecvCallback(frame->hd.stream_id);
+            return 0;
+        }
+        break;
+    case NGHTTP2_WINDOW_UPDATE:
+        nghttp2_session_send(session);
+        break;
+    case NGHTTP2_PING:
+        nghttp2_session_send(session);
+        break;
+    default:
+        break;
+    }
+    return 0;
+}
+
+static int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags,
+                                       int32_t stream_id, const uint8_t *data,
+                                       size_t len, void *user_data)
+{
+    TC_GrpcServer *ptr = (TC_GrpcServer*)user_data;
+
+    ptr->onDataChunkRecvCallback(stream_id, (const char*)data, len);
+
+    return 0;
+}
+
+static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *user_data)
+{
+    TC_GrpcServer *ptr = (TC_GrpcServer*)user_data;
+    ptr->onStreamCloseCallback(stream_id);
+
+    return 0;
+}
+}   //namespace server
+
+
+TC_GrpcServer::TC_GrpcServer()
+{
+    nghttp2_session_callbacks *callbacks;
+
+    nghttp2_session_callbacks_new(&callbacks);
+
+    nghttp2_session_callbacks_set_send_callback(callbacks, server::send_callback);
+
+    nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, server::on_frame_recv_callback);
+
+    nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, server::on_data_chunk_recv_callback);
+
+    nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, server::on_stream_close_callback);
+
+    nghttp2_session_callbacks_set_on_header_callback(callbacks, server::on_header_callback);
+
+    nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks, server::on_begin_headers_callback);
+
+    nghttp2_session_server_new(&_session, callbacks, ((void*)this));
+
+    nghttp2_session_callbacks_del(callbacks);
+}
+
+TC_GrpcServer::~TC_GrpcServer()
+{
+}
+
+
+int TC_GrpcServer::encodeResponse(const shared_ptr<TC_GrpcServer::Http2Context> &context, std::string gStatus, vector<char> &out)
+{
+	string sstatus = TC_Common::tostr(context->response.getStatus());
+
+	const char* strstatus = ":status";
+
+	nghttp2_nv *hdrs    = new nghttp2_nv[context->response.getHeaders().size() + 1];
+	hdrs[0].flags       = NGHTTP2_NV_FLAG_NONE;
+	hdrs[0].name        = (uint8_t*)strstatus;
+	hdrs[0].namelen     = 7;
+	hdrs[0].value       = (uint8_t*)sstatus.c_str();
+	hdrs[0].valuelen    = sstatus.size();
+
+	TC_Http::http_header_type::const_iterator it = context->response.getHeaders().begin();
+	for (int n = 1; it != context->response.getHeaders().end(); n++, it++)
+	{
+		hdrs[n].flags       = NGHTTP2_NV_FLAG_NONE;
+		hdrs[n].name        = (uint8_t*)it->first.c_str();
+		hdrs[n].namelen     = it->first.size();
+		hdrs[n].value       = (uint8_t*)it->second.c_str();
+		hdrs[n].valuelen    = it->second.size();
+	}
+
+	const string &data = context->response.getContent();
+	DataPack dataPack(data.c_str(), data.size());
+
+	nghttp2_data_provider data_prd;
+	data_prd.source.ptr     = (void*)&dataPack;
+	data_prd.read_callback  = server::str_read_callback;
+	{
+		TC_LockT<TC_ThreadMutex> lock(_nghttpLock);
+
+		_err = nghttp2_submit_headers(_session, NGHTTP2_FLAG_NONE, context->reqId, nullptr,
+										hdrs, context->response.getHeaders().size()+1, nullptr);
+		if (_err != 0 ) {
+			delete [] hdrs;
+			return _err;
+		}
+
+
+		_err = nghttp2_submit_data(_session, NGHTTP2_FLAG_NONE, context->reqId, &data_prd);
+		if (_err != 0 ) {
+			delete [] hdrs;
+			return _err;
+		}
+
+		nghttp2_session_send(_session);
+
+		const char* grpcStatus = "grpc-status";
+		const char* gstatus = gStatus.c_str();
+
+		nghttp2_nv hdr[1];
+		hdr[0].flags       = NGHTTP2_NV_FLAG_NONE;
+		hdr[0].name        = (uint8_t*)grpcStatus;
+		hdr[0].namelen     = 11;
+		hdr[0].value       = (uint8_t*)gstatus;
+		hdr[0].valuelen    = 1;
+
+		_err = nghttp2_submit_headers(_session, NGHTTP2_FLAG_END_STREAM, context->reqId, nullptr,
+										hdr, 1, nullptr);
+		if (_err != 0 ) {
+			delete [] hdrs;
+			return _err;
+		}
+		
+		
+		while (nghttp2_session_want_write(_session)) {
+			_err = nghttp2_session_send(_session);
+			if (_err != 0) {
+				delete [] hdrs;
+
+				return _err;
+			}
+		}
+
+		this->swap(out);
+	}
+
+	delete [] hdrs;
+
+	return 0;
+}
+
+void TC_GrpcServer::packGrpcResponse(shared_ptr<TC_GrpcServer::Http2Context> &context, const int status, const string &body)
+{
+    context->response.setHeader("content-type", "application/grpc");
+    context->response.setResponse(status, "OK", body);
+}
+
+shared_ptr<TC_GrpcServer> TC_GrpcServer::getHttp2(uint32_t uid)
+{
+    TC_LockT<TC_SpinLock> lock(_mutex);
+
+    auto it = _http2.find(uid);
+
+    if(it != _http2.end())
+    {
+        return it->second;
+    }
+    return NULL;
+}
+
+void TC_GrpcServer::addHttp2(uint32_t uid, const shared_ptr<TC_GrpcServer> &ptr)
+{
+    TC_LockT<TC_SpinLock> lock(_mutex);
+
+    _http2[uid] = ptr;
+}
+
+void TC_GrpcServer::delHttp2(uint32_t uid)
+{
+    TC_LockT<TC_SpinLock> lock(_mutex);
+
+    auto it = _http2.find(uid);
+
+    if(it != _http2.end())
+    {
+        _http2.erase(it);
+    }
+}
+
+TC_NetWorkBuffer::PACKET_TYPE TC_GrpcServer::parseGrpc(TC_NetWorkBuffer&in, vector<char> &out)
+{
+    TC_GrpcServer*sessionPtr = (TC_GrpcServer*)(in.getContextData());
+    TC_EpollServer::Connection *connection = (TC_EpollServer::Connection *)in.getConnection();
+
+    if(sessionPtr == NULL)
+    {
+    	shared_ptr<TC_GrpcServer> session(new TC_GrpcServer());
+	    in.setContextData(session.get());
+	    session->settings(3000);
+        addHttp2(connection->getId(), session);
+
+	    sessionPtr = session.get();
+    }
+
+    if (sessionPtr->buffer().size() != 0) {
+        //直接发送裸得应答数据,业务层一般不直接使用,仅仅tcp支持
+        connection->sendBufferDirect(sessionPtr->buffer());
+        sessionPtr->buffer().clear();
+    }
+
+    std::string inStr = in.getBuffersString();
+
+    auto ret = sessionPtr->parse(in, out);
+    
+    if (sessionPtr->buffer().size() != 0) {
+        //直接发送裸得应答数据,业务层一般不直接使用,仅仅tcp支持
+        connection->sendBufferDirect(sessionPtr->buffer());
+        sessionPtr->buffer().clear();
+    }
+
+	return ret;
+}
+
+///////////////////////////////////////////////////////////////////////////////////////
+
+namespace client
+{
+static ssize_t send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data)
+{
+    TC_GrpcClient* nghttp2 = (TC_GrpcClient* )user_data;
+    nghttp2->insertBuff((const char*)data, length);
+
+    return length;
+}
+
+static int on_begin_headers_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data)
+{
+    TC_GrpcClient* nghttp2 = (TC_GrpcClient* )user_data;
+
+    if (frame->hd.type == NGHTTP2_HEADERS)
+    {
+        if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE)
+        {
+            nghttp2->responses()[frame->hd.stream_id] = std::make_shared<TC_HttpResponse>();
+        }
+    }
+
+    return 0;
+}
+
+static int on_header_callback(nghttp2_session* session, const nghttp2_frame* frame,
+                         const uint8_t* name, size_t namelen,
+                         const uint8_t* value, size_t valuelen,
+                         uint8_t flags, void* user_data)
+{
+    TC_GrpcClient* nghttp2 = (TC_GrpcClient* )user_data;
+
+    int streamId = frame->hd.stream_id;
+    auto it = nghttp2->responses().find(streamId);
+    if (it == nghttp2->responses().end())
+    {
+        return NGHTTP2_ERR_CALLBACK_FAILURE;
+    }
+
+    std::string n((const char*)name, namelen);
+    std::string v((const char*)value, valuelen);
+
+    it->second->setHeader(n, v);
+
+    return 0;
+}
+
+static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data)
+{
+    TC_GrpcClient* nghttp2 = (TC_GrpcClient* )user_data;
+
+    int streamId = frame->hd.stream_id;
+    if (streamId == 0)
+        return 0;
+
+    auto it = nghttp2->responses().find(streamId);
+    if (it == nghttp2->responses().end())
+    {
+        return NGHTTP2_ERR_CALLBACK_FAILURE;
+    }
+
+   
+    switch (frame->hd.type)
+    {
+        case NGHTTP2_HEADERS:
+            if (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS)
+            {
+            	;
+            }
+            return 0;
+        case NGHTTP2_WINDOW_UPDATE:
+            nghttp2_session_send(session);
+            break;
+        case NGHTTP2_PING:
+            nghttp2_session_send(session);
+            break;
+        default:
+            break;
+    }
+
+    return 0;
+}
+
+static int on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len, void* user_data)
+{
+    TC_GrpcClient* nghttp2 = (TC_GrpcClient* )user_data;
+
+    auto it = nghttp2->responses().find(stream_id);
+    if (it == nghttp2->responses().end())
+    {
+        return NGHTTP2_ERR_CALLBACK_FAILURE;
+    }
+
+    it->second->appendContent((const char* )data, len);
+    return 0;
+}
+
+static int on_stream_close_callback(nghttp2_session* session, int32_t stream_id, uint32_t error_code, void* user_data)
+{
+    TC_GrpcClient* nghttp2 = (TC_GrpcClient* )user_data;
+
+    auto it = nghttp2->responses().find(stream_id);
+    if (it == nghttp2->responses().end())
+    {
+        return NGHTTP2_ERR_CALLBACK_FAILURE;
+    }
+
+	nghttp2->doneResponses()[stream_id] = it->second;
+	nghttp2->responses().erase(it);
+
+    return 0;
+}
+
+}   //namespace client
+
+#define MAKE_STRING_NV(NAME, VALUE) {(uint8_t*)(NAME.c_str()), (uint8_t*)(VALUE.c_str()), NAME.size(), VALUE.size(), NGHTTP2_NV_FLAG_NONE};
+
+TC_GrpcClient::TC_GrpcClient()
+{
+    nghttp2_session_callbacks* callbacks;
+    nghttp2_session_callbacks_new(&callbacks);
+    nghttp2_session_callbacks_set_send_callback(callbacks, client::send_callback);
+    nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks, client::on_begin_headers_callback);
+    nghttp2_session_callbacks_set_on_header_callback(callbacks, client::on_header_callback);
+    nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, client::on_frame_recv_callback);
+    nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, client::on_data_chunk_recv_callback);
+    nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, client::on_stream_close_callback);
+
+    nghttp2_session_client_new(&_session, callbacks, this);
+
+    nghttp2_session_callbacks_del(callbacks);
+}
+
+TC_GrpcClient::~TC_GrpcClient()
+{
+}
+
+
+int TC_GrpcClient::submit(const TC_HttpRequest &request)
+{
+	std::vector<nghttp2_nv> nva;
+
+	//注意MAKE_STRING_NV不能用临时变量, 这里出过几次问题了!
+	const std::string smethod(":method");
+	const std::string vmethod(request.getMethod());
+	nghttp2_nv nv1 = MAKE_STRING_NV(smethod, vmethod);
+	nva.push_back(nv1);
+
+	// const std::string spath(":path");
+	// const std::string vpath(request.getRequest());
+	// nghttp2_nv nv2 = MAKE_STRING_NV(spath, vpath);
+	// nva.push_back(nv2);
+
+	const std::string sauthority(":authority");
+    string sPort = request.getURL().isDefaultPort() ? "" : ":" + request.getURL().getPort();
+	const std::string vauthority(request.getURL().getDomain() + sPort);
+	nghttp2_nv nv3 = MAKE_STRING_NV(sauthority, vauthority);
+	nva.push_back(nv3);
+
+	const std::string sscheme(":scheme");
+	const std::string vscheme(request.getURL().getScheme());
+	nghttp2_nv nv4 = MAKE_STRING_NV(sscheme, vscheme);
+	nva.push_back(nv4);
+
+	const TC_Http::http_header_type &header = request.getHeaders();
+	for (auto it = header.begin(); it != header.end(); ++ it)
+	{
+		if(TC_Port::strcasecmp(it->first.c_str(), "Content-Length") == 0 || it->second.empty())
+			continue;
+
+		nghttp2_nv nv = MAKE_STRING_NV(it->first, it->second);
+		nva.push_back(nv);
+	}
+
+	nghttp2_data_provider* pData = NULL;
+	nghttp2_data_provider data;
+
+	DataPack dataPack(request.getContent().c_str(), request.getContent().size());
+
+	if (!request.getContent().empty())
+	{
+		pData = &data;
+		data.source.ptr = (void*)&dataPack;
+		data.read_callback = server::str_read_callback;
+	}
+
+	_err = nghttp2_submit_request(_session,
+	                              NULL,
+	                              nva.data(),
+	                              nva.size(),
+	                              pData,
+	                              NULL);
+	if (_err < 0)
+	{
+		return _err;
+	}
+
+	int sid = _err;
+
+	_err = nghttp2_session_send(_session);
+	if (_err != 0) {
+		return _err;
+	}
+
+	return sid;
+}
+
+
+TC_NetWorkBuffer::PACKET_TYPE TC_GrpcClient::parseResponse(TC_NetWorkBuffer &in, pair<int, shared_ptr<TC_HttpResponse>> &out)
+{
+	if(_doneResponses.empty() && in.empty())
+	{
+		return TC_NetWorkBuffer::PACKET_LESS;
+	}
+
+	if(!in.empty())
+	{
+		//merge to one buffer
+		in.mergeBuffers();
+
+		pair<const char *, size_t> buffer = in.getBufferPointer();
+
+		int readlen = nghttp2_session_mem_recv(_session, (const uint8_t *) buffer.first, buffer.second);
+		if (readlen < 0) {
+			return TC_NetWorkBuffer::PACKET_ERR;
+		}
+
+		in.moveHeader(readlen);
+	}
+
+	if(_doneResponses.empty())
+	{
+		return TC_NetWorkBuffer::PACKET_LESS;
+	}
+
+	auto it = _doneResponses.begin();
+	out.first = it->first;
+	out.second = it->second;
+
+	_doneResponses.erase(it);
+	return TC_NetWorkBuffer::PACKET_FULL;
+
+}
+
+}   //namespace tars

+ 14 - 0
util/src/tc_network_buffer.cpp

@@ -303,4 +303,18 @@ TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseEcho(TC_NetWorkBuffer&in, v
     return TC_NetWorkBuffer::PACKET_LESS;             //表示收到的包不完全
 }
 
+TC_NetWorkBuffer::PACKET_TYPE TC_NetWorkBuffer::parseJson(TC_NetWorkBuffer&in, vector<char> &out)
+{
+    auto jsonEnd = in.find("}", 1);
+
+    if (jsonEnd != in.end())
+    {
+        out = in.getBuffers();
+        in.clearBuffers();
+        return TC_NetWorkBuffer::PACKET_FULL;   //返回1表示收到的包已经完全
+    }
+
+    return TC_NetWorkBuffer::PACKET_ERR;        //返回-1表示收到包协议错误,框架会自动关闭当前连接
+}
+
 }