|
@@ -20,6 +20,12 @@
|
|
|
#include "brpc/errno.pb.h"
|
|
|
#include "brpc/policy/auto_concurrency_limiter.h"
|
|
|
|
|
|
+namespace bthread {
|
|
|
+
|
|
|
+DECLARE_int32(bthread_concurrency);
|
|
|
+
|
|
|
+} // namespace bthread
|
|
|
+
|
|
|
namespace brpc {
|
|
|
namespace policy {
|
|
|
|
|
@@ -183,7 +189,7 @@ bool AutoConcurrencyLimiter::AddSample(int error_code,
|
|
|
UpdateMaxConcurrency(sampling_time_us);
|
|
|
} else {
|
|
|
// All request failed
|
|
|
- _max_concurrency /= 2;
|
|
|
+ AdjustMaxConcurrency(_max_concurrency / 2);
|
|
|
}
|
|
|
ResetSampleWindow(sampling_time_us);
|
|
|
return true;
|
|
@@ -216,6 +222,13 @@ void AutoConcurrencyLimiter::UpdateQps(double qps) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+void AutoConcurrencyLimiter::AdjustMaxConcurrency(int next_max_concurrency) {
|
|
|
+ next_max_concurrency = std::max(bthread::FLAGS_bthread_concurrency, next_max_concurrency);
|
|
|
+ if (next_max_concurrency != _max_concurrency) {
|
|
|
+ _max_concurrency = next_max_concurrency;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
|
|
|
int32_t total_succ_req = _total_succ_req.load(butil::memory_order_relaxed);
|
|
|
double failed_punish = _sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
|
|
@@ -247,9 +260,7 @@ void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
|
|
|
_min_latency_us * _ema_max_qps / 1000000 * (1 + _explore_ratio);
|
|
|
}
|
|
|
|
|
|
- if (next_max_concurrency != _max_concurrency) {
|
|
|
- _max_concurrency = next_max_concurrency;
|
|
|
- }
|
|
|
+ AdjustMaxConcurrency(next_max_concurrency);
|
|
|
}
|
|
|
|
|
|
} // namespace policy
|