semaphore_timewait.cpp 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. #include <pthread.h>
  2. #include <errno.h>
  3. #include <unistd.h>
  4. #include <list>
  5. #include <semaphore.h>
  6. #include <iostream>
  7. class Task
  8. {
  9. public:
  10. Task(int taskID)
  11. {
  12. this->taskID = taskID;
  13. }
  14. void doTask()
  15. {
  16. std::cout << "handle a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl;
  17. }
  18. private:
  19. int taskID;
  20. };
  21. pthread_mutex_t mymutex;
  22. std::list<Task*> tasks;
  23. sem_t mysemaphore;
  24. void* consumer_thread(void* param)
  25. {
  26. Task* pTask = NULL;
  27. while (true)
  28. {
  29. struct timespec ts;
  30. ts.tv_sec = 3;
  31. ts.tv_nsec = 0;
  32. if (sem_timewait(&mysemaphore, &ts) != 0)
  33. {
  34. if (errno == ETIMEDOUT)
  35. {
  36. std::cout << "ETIMEOUT" << std::endl;
  37. }
  38. continue;
  39. }
  40. if (tasks.empty())
  41. continue;
  42. pthread_mutex_lock(&mymutex);
  43. pTask = tasks.front();
  44. tasks.pop_front();
  45. pthread_mutex_unlock(&mymutex);
  46. pTask->doTask();
  47. delete pTask;
  48. }
  49. return NULL;
  50. }
  51. void* producer_thread(void* param)
  52. {
  53. int taskID = 0;
  54. Task* pTask = NULL;
  55. while (true)
  56. {
  57. //pTask = new Task(taskID);
  58. pthread_mutex_lock(&mymutex);
  59. //tasks.push_back(pTask);
  60. //std::cout << "produce a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl;
  61. pthread_mutex_unlock(&mymutex);
  62. //释放信号量,通知消费者线程
  63. sem_post(&mysemaphore);
  64. taskID ++;
  65. //休眠1秒
  66. sleep(1);
  67. }
  68. return NULL;
  69. }
  70. int main()
  71. {
  72. pthread_mutex_init(&mymutex, NULL);
  73. //初始信号量资源计数为0
  74. sem_init(&mysemaphore, 0, 0);
  75. //创建5个消费者线程
  76. pthread_t consumerThreadID[5];
  77. for (int i = 0; i < 5; ++i)
  78. {
  79. pthread_create(&consumerThreadID[i], NULL, consumer_thread, NULL);
  80. }
  81. //创建一个生产者线程
  82. //pthread_t producerThreadID;
  83. //pthread_create(&producerThreadID, NULL, producer_thread, NULL);
  84. //pthread_join(producerThreadID, NULL);
  85. for (int i = 0; i < 5; ++i)
  86. {
  87. pthread_join(consumerThreadID[i], NULL);
  88. }
  89. sem_destroy(&mysemaphore);
  90. pthread_mutex_destroy(&mymutex);
  91. return 0;
  92. }