net_server.cc 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. #include <iostream>
  2. #include <string>
  3. #include <sstream>
  4. #include "net_server.h"
  5. #include "log.h"
  6. #include "poll_thread.h"
  7. #include "task_request.h"
  8. #include "global.h"
  9. #include "transaction_task.h"
  10. #include "task_request.h"
  11. #include "transaction_group.h"
  12. extern CTransactionGroup* FullDBGroup;
  13. extern CTransactionGroup* HotDBGroup;
  14. CNetServerProcess::CNetServerProcess(PollerBase * o) :
  15. CTaskDispatcher<CTaskRequest>(o),
  16. ownerThread(o),
  17. output(o)
  18. {
  19. }
  20. CNetServerProcess::~CNetServerProcess()
  21. {
  22. }
  23. void CNetServerProcess::TaskNotify(CTaskRequest * cur)
  24. {
  25. log4cplus_debug("async-conn: packet receiving.");
  26. //there is a race condition here:
  27. //curr may be deleted during process (in task->ReplyNotify())
  28. CTransactionGroup* group = NULL;
  29. CTaskRequest * request = cur;
  30. int level = 0;
  31. if(request == NULL)
  32. return;
  33. //TODO: Parsing input, adapting thread groups.
  34. level = request->get_db_layer_level();
  35. log4cplus_debug("async-conn: packet db layer:%d.", level);
  36. if(level == 3)
  37. group = FullDBGroup;
  38. else if(level == 2)
  39. group = HotDBGroup;
  40. else
  41. {
  42. char err[260] = {0};
  43. sprintf(err, "layer level error:%d.", level);
  44. request->setResult(err);
  45. request->ReplyNotify();
  46. }
  47. if(group->Push(request) != 0)
  48. {
  49. request->setResult("transaction insert queue failed.");
  50. request->ReplyNotify();
  51. }
  52. return ;
  53. }
  54. void CNetServerProcess::ReplyNotify(CTaskRequest* task)
  55. {
  56. m_mutex.lock();
  57. task->ReplyNotify();
  58. m_mutex.unlock();
  59. }