test_tc_cas.cpp 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. #include "util/tc_common.h"
  2. #include "util/tc_cas_queue.h"
  3. #include "util/tc_thread.h"
  4. #include "util/tc_autoptr.h"
  5. #include "util/tc_spin_lock.h"
  6. #include "util/tc_thread_queue.h"
  7. #include "gtest/gtest.h"
  8. #include <mutex>
  9. #include <iostream>
  10. using namespace tars;
  11. class UtilCasTest : public testing::Test
  12. {
  13. public:
  14. //添加日志
  15. static void SetUpTestCase()
  16. {
  17. // cout<<"SetUpTestCase"<<endl;
  18. }
  19. static void TearDownTestCase()
  20. {
  21. // cout<<"TearDownTestCase"<<endl;
  22. }
  23. virtual void SetUp() //TEST跑之前会执行SetUp
  24. {
  25. // cout<<"SetUp"<<endl;
  26. }
  27. virtual void TearDown() //TEST跑完之后会执行TearDown
  28. {
  29. // cout<<"TearDown"<<endl;
  30. }
  31. };
  32. TEST_F(UtilCasTest, casLock)
  33. {
  34. size_t i=0;
  35. size_t count = 1000000;
  36. TC_SpinLock mutex;
  37. std::thread add([&]{
  38. int j = count;
  39. while(j--) {
  40. TC_LockT<TC_SpinLock> lock(mutex);
  41. ++i;
  42. }
  43. });
  44. std::thread del([&]{
  45. int j = count;
  46. while(j--) {
  47. TC_LockT<TC_SpinLock> lock(mutex);
  48. --i;
  49. }
  50. });
  51. add.join();
  52. del.join();
  53. ASSERT_TRUE(i == 0);
  54. }
  55. bool g_terminate = false;
  56. TC_CasQueue<int64_t> data_cas;
  57. TC_ThreadQueue<int64_t> data_queue;
  58. class WriteThread : public TC_Thread
  59. {
  60. protected:
  61. virtual void run()
  62. {
  63. for(int64_t i = 0; i < 100000; i++) {
  64. data_cas.push_back(i);
  65. }
  66. }
  67. };
  68. class ReadThread : public TC_Thread, public TC_HandleBase
  69. {
  70. public:
  71. ReadThread() {}
  72. virtual void run()
  73. {
  74. int64_t last = -1;
  75. while(!g_terminate) {
  76. int64_t i;
  77. if(data_cas.pop_front(i))
  78. {
  79. ASSERT_TRUE(i - last == 1);
  80. last = i;
  81. if(i == 100000-1)
  82. {
  83. g_terminate = true;
  84. }
  85. }
  86. }
  87. }
  88. };
  89. TEST_F(UtilCasTest, casQueue)
  90. {
  91. WriteThread w;
  92. ReadThread r;
  93. w.start();
  94. r.start();
  95. w.join();
  96. r.join();
  97. }
  98. template<typename Q>
  99. void start(int w, int r, int sleeps, Q &queue_data)
  100. {
  101. g_terminate = false;
  102. atomic<int64_t> writeIndex {0};
  103. vector<std::thread*> wthreads;
  104. for(int i = 0 ; i < w; i++) {
  105. wthreads.push_back(new std::thread([&]
  106. {
  107. while(!g_terminate)
  108. {
  109. queue_data.push_back(++writeIndex);
  110. }
  111. }));
  112. }
  113. int64_t readIndex = 0;
  114. vector<std::thread*> rthreads;
  115. for(int i = 0 ; i < r; i++)
  116. {
  117. rthreads.push_back(new std::thread([&]{
  118. do{
  119. int64_t j;
  120. if(queue_data.pop_front(j))
  121. {
  122. readIndex = j;
  123. }
  124. else
  125. {
  126. if(sleeps > 0)
  127. {
  128. TC_Common::msleep(sleeps);
  129. }
  130. }
  131. }while(!g_terminate);
  132. }));
  133. }
  134. std::thread print([&]{
  135. int64_t lastReadIndex = 0;
  136. int64_t lastWriteIndex = 0;
  137. while(!g_terminate) {
  138. cout << "size:" << data_queue.size() << ", write/read index:" << writeIndex/10000. << "/" << readIndex/10000. << ", " << (writeIndex-lastWriteIndex)/10000. << ", " << (readIndex - lastReadIndex)/10000. << endl;
  139. lastReadIndex = readIndex;
  140. lastWriteIndex = writeIndex;
  141. TC_Common::sleep(1);
  142. }
  143. });
  144. std::thread t([&]{
  145. TC_Common::sleep(10);
  146. g_terminate = true;
  147. });
  148. t.join();
  149. print.join();
  150. for(size_t i = 0; i < wthreads.size(); i++)
  151. {
  152. wthreads[i]->join();
  153. delete wthreads[i];
  154. }
  155. for(size_t i = 0; i < rthreads.size(); i++)
  156. {
  157. rthreads[i]->join();
  158. delete rthreads[i];
  159. }
  160. }
  161. int wThread = 1;
  162. int rThread = 4;
  163. int sleepms = 0;
  164. TEST_F(UtilCasTest, queueBatch)
  165. {
  166. cout << "threadQueueBatch-------------------------------------------" << endl;
  167. start<TC_ThreadQueue<int64_t>>(wThread, rThread, sleepms, data_queue);
  168. cout << "casQueueBatch-------------------------------------------" << endl;
  169. start<TC_CasQueue<int64_t>>(wThread, rThread, sleepms, data_cas);
  170. }