Skip to content

Commit

Permalink
add ConcurrentSampler (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
oathdruid authored Jun 27, 2024
1 parent 2131dc0 commit 48919c0
Show file tree
Hide file tree
Showing 17 changed files with 1,558 additions and 8 deletions.
6 changes: 3 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.14)

project(babylon VERSION 1.2.0)
project(babylon VERSION 1.2.1)

include(CTest) # for BUILD_TESTING option
include(CMakePackageConfigHelpers) # for write_basic_package_version_file
Expand All @@ -16,8 +16,8 @@ if(BUILD_DEPS)
)
FetchContent_Declare(
protobuf
URL "https://github.com/protocolbuffers/protobuf/archive/refs/tags/v25.3.tar.gz"
URL_HASH SHA256=d19643d265b978383352b3143f04c0641eea75a75235c111cc01a1350173180e
URL "https://github.com/protocolbuffers/protobuf/archive/refs/tags/v27.2.tar.gz"
URL_HASH SHA256=e4ff2aeb767da6f4f52485c2e72468960ddfe5262483879ef6ad552e52757a77
)
FetchContent_Declare(
boost
Expand Down
4 changes: 2 additions & 2 deletions MODULE.bazel
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module(
name = 'babylon',
version = '1.2.0',
version = '1.2.1',
compatibility_level = 1,
)

Expand Down Expand Up @@ -29,7 +29,7 @@ bazel_dep(name = 'googletest', version = '1.14.0', repo_name = 'com_google_googl

# --registry=file://%workspace%/registry
# protobuf 25.3 is not officially support in BCR
single_version_override(module_name = 'protobuf', version = '27.1')
single_version_override(module_name = 'protobuf', version = '27.2')
# rules_cuda latest release 0.2.1 is too old and do not have auto detect feature
bazel_dep(name = 'rules_cuda', version = '0.2.2-dev', dev_dependency = True)

Expand Down
2 changes: 1 addition & 1 deletion example/use-arena-with-brpc/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ int main(int argc, char* argv[]) {
stub.Echo(controller, &request, response,
::google::protobuf::NewCallback(finish, response, controller));
auto use_us = ::butil::gettimeofday_us() - begin_us;

if (use_us < expected_us) {
usleep(expected_us - use_us);
}
Expand Down
6 changes: 4 additions & 2 deletions example/use-arena-with-brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ class EchoServiceImpl : public EchoService {
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
response->mutable_payload()->CopyFrom(request->payload());
LOG_EVERY_SECOND(INFO) << "Request SpaceUsedLong = " << request->SpaceUsedLong()
<< " Response SpaceUsedLong = " << response->SpaceUsedLong();
LOG_EVERY_SECOND(INFO) << "Request SpaceUsedLong = "
<< request->SpaceUsedLong()
<< " Response SpaceUsedLong = "
<< response->SpaceUsedLong();
}
};
} // namespace example
Expand Down
5 changes: 5 additions & 0 deletions example/use-counter-with-bvar/.bazelrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
common --registry=https://bcr.bazel.build
common --registry=https://baidu.github.io/babylon/registry
common --registry=https://raw.githubusercontent.com/bazelboost/registry/main

build --compilation_mode opt --cxxopt=-std=c++17
1 change: 1 addition & 0 deletions example/use-counter-with-bvar/.bazelversion
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
7.1.2
20 changes: 20 additions & 0 deletions example/use-counter-with-bvar/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
cc_library(
name = 'recorder',
srcs = ['recorder.cpp', 'recorder.trick.cpp'],
hdrs = ['recorder.h'],
copts = ['-fno-access-control'],
deps = [
'@brpc//:bvar',
'@babylon//:concurrent_counter',
],
)

cc_binary(
name = 'example',
srcs = ['example.cpp'],
deps = [
':recorder',
'@brpc',
'@tcmalloc//tcmalloc',
],
)
4 changes: 4 additions & 0 deletions example/use-counter-with-bvar/MODULE.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
bazel_dep(name = 'babylon', version = '1.2.0')
bazel_dep(name = 'brpc', version = '1.9.0')
bazel_dep(name = 'tcmalloc', version = '0.0.0-20240411-5ed309d')
single_version_override(module_name = 'protobuf', version = '25.3')
71 changes: 71 additions & 0 deletions example/use-counter-with-bvar/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Use arena for brpc

brpc在调用用户的service前,需要在内部先完成Request和Response的实例构建和以及前后对应的正反序列化。对应的代码实现在相应的Protocol中,默认的方式实例采用动态堆内存分配模式创建,对于比较复杂的结构,内存分配释放和Message结构的构建和析构可能也会带来可见的开销。

利用babylon::SwissMemoryResource可以将堆内存分配该用Arena机制分配到内存池上,降低内存分配的成本且提升局部性。进一步使用babylon::ReusableManager可以在保持内存池聚集分配的局部性的同时,进一步通过尽可能复用Message结构降低构建和析构的开销。

下面实现了一个集成了对应功能的brpc::Protocol,演示了响应功能的使用方式,并配套对应的例子来演示性能对比。相应的Protocol实际也在baidu内部广泛使用,预期可以支持在生产环境直接使用。

## 示例构成

- `:reusable_rpc_protocol`: 独立的brpc:Protocol实现,集成了内存复用和实例复用的功能
- `reusable_rpc_protocol.h`&`reusable_rpc_protocol.cpp`: 独立逻辑,和对应brpc版本无关
- `reusable_rpc_protocol.trick.cpp`: 拷贝自`src/brpc/policy/baidu_rpc_protocol.cpp`并进行简要修改
- `:client`&`:server`: 模拟比较复杂的Message演示性能对比

## 使用手册

```
#include "reusable_rpc_protocol.h"
// 向brpc注册新的protocol,默认使用
// protocol type = 72
// protocol name = "baidu_std_reuse"
if (0 != ::babylon::ReusableRPCProtocol::register_protocol()) {
// 注册protocol失败
}
// 返回失败很可能因为type冲突,可以更换type和name
if (0 != ::babylon::ReusableRPCProtocol::register_protocol(type, name)) {
// 注册protocol失败
}
// ReusableRPCProtocol协议和baidu_std相同,注册后,默认依然会走baidu_std
// 需要通过显式在option中指定来启用
::baidu::rpc::ServerOptions options;
options.enabled_protocols = "baidu_std_reuse";
// 下面正常注册服务,启动服务器即可
class SomeServiceImpl : public SomeService {
public:
virtual void some_method(::google::protobuf::RpcController* controller,
const SomeRequest* request,
SomeResponse* response,
::google::protobuf::Closure* done) {
... // 正常进行业务处理,对应的request和response已经改用内存池或者实例复用托管了
}
};
// 影响运行时的flag
// --babylon_rpc_full_reuse,是否启用实例重用,默认false
// --babylon_rpc_closure_cache_num,内存池和ReusableManager实例本身也会通过对象池复用,设置对象池大小
// --babylon_rpc_page_size,内存池单页大小,超过单页大小的申请会直接改为动态申请
// --babylon_rpc_page_cache_num,内存池页本身通过对象池复用,设置对象池大小
```

## 性能演示

CPU: AMD EPYC 7W83 64-Core Processor, taskset 0-3 core
QPS: 750

- 原始模式
- latency_percentiles: "[1923,2222,2944,3447]"
- process_cpu_usage : 1.489
- process_memory_resident : 59244544
- `--use_arena`模式
- latency_percentiles: "[1378,1607,2263,2716]"
- process_cpu_usage : 0.695
- process_memory_resident : 54255616
- `--use_arena`&`--babylon_rpc_full_reuse`模式
- latency_percentiles: "[1096,1256,1612,1938]"
- process_cpu_usage : 0.612
- process_memory_resident : 101576704
4 changes: 4 additions & 0 deletions example/use-counter-with-bvar/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/sh
set -ex

bazel build example
145 changes: 145 additions & 0 deletions example/use-counter-with-bvar/example.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#include "brpc/server.h"
#include "gflags/gflags.h"
#include "recorder.h"

#include <random>
#include <thread>

DEFINE_int32(dummy_port, 8000, "TCP Port of this dummy server");
DEFINE_uint64(concurrency, 4, "Concurrent counting thread num");
DEFINE_uint64(vars, 10, "Counting bvar num");
DEFINE_bool(use_counter, false, "use babylon counter implemented bvar");
DEFINE_string(mode, "latency_recorder",
"adder/maxer/int_recorder/latency_recorder");

template <typename T>
void __attribute__((noinline)) work(T& var, uint32_t value) {
var << value;
}

template <typename S>
void run_loop(::std::string prefix) {
::std::vector<::std::unique_ptr<S>> vec;
for (size_t i = 0; i < FLAGS_vars; ++i) {
auto s = ::std::make_unique<S>();
s->expose("test-" + prefix + "-" + ::std::to_string(i));
vec.emplace_back(::std::move(s));
}

::bvar::LatencyRecorder latency("test-" + prefix);
::std::vector<::std::thread> threads;
for (size_t i = 0; i < FLAGS_concurrency; ++i) {
threads.emplace_back([&] {
::std::mt19937_64 gen {::std::random_device {}()};
::std::normal_distribution<> dis(600, 100);
while (true) {
auto begin = ::butil::cpuwide_time_ns();
auto v = dis(gen);
for (size_t i = 0; i < 1000; ++i) {
for (auto& s : vec) {
work(s->var, v);
}
}
auto use = (::butil::cpuwide_time_ns() - begin) / 1000 / vec.size();
latency << use;
}
});
}
usleep(1000000000);
}

template <typename V>
void run(::std::string prefix) {
struct S {
void expose(::std::string name) {
win.expose(name);
}
V var;
::bvar::Window<V, ::bvar::SERIES_IN_SECOND> win {&var, -1};
};
run_loop<S>(prefix);
}

template <>
void run<::bvar::LatencyRecorder>(::std::string prefix) {
struct S {
void expose(::std::string name) {
var.expose(name);
}
::bvar::LatencyRecorder var;
};
run_loop<S>(prefix);
}

template <>
void run<::babylon::BvarLatencyRecorder>(::std::string prefix) {
struct S {
void expose(::std::string name) {
var.expose(name);
}
::babylon::BvarLatencyRecorder var;
};
run_loop<S>(prefix);
}

int main(int argc, char* argv[]) {
::gflags::ParseCommandLineFlags(&argc, &argv, true);

::brpc::StartDummyServerAt(FLAGS_dummy_port);

if (FLAGS_mode == "adder") {
if (FLAGS_use_counter) {
run<::babylon::BvarAdder>("babylon");
} else {
run<::bvar::Adder<ssize_t>>("bvar");
}
} else if (FLAGS_mode == "maxer") {
if (FLAGS_use_counter) {
run<::babylon::BvarMaxer>("babylon");
} else {
run<::bvar::Maxer<ssize_t>>("bvar");
}
} else if (FLAGS_mode == "int_recorder") {
if (FLAGS_use_counter) {
run<::babylon::BvarIntRecorder>("babylon");
} else {
run<::bvar::IntRecorder>("bvar");
}
} else if (FLAGS_mode == "latency_recorder") {
if (FLAGS_use_counter) {
run<::babylon::BvarLatencyRecorder>("babylon");
} else {
run<::bvar::LatencyRecorder>("bvar");
}
}

/*
::babylon::BvarAdder adder;
adder.expose("xxxx_adder");
::bvar::Window<::babylon::BvarAdder, ::bvar::SERIES_IN_SECOND> adder_window {
&adder, -1};
adder_window.expose("xxxx_adder_win");
::babylon::BvarMaxer maxer;
maxer.expose("xxxx_maxer");
::bvar::Window<::babylon::BvarMaxer, ::bvar::SERIES_IN_SECOND> maxer_window {
&maxer, -1};
maxer_window.expose("xxxx_maxer_win");
::babylon::BvarIntRecorder int_recorder;
int_recorder.expose("xxxx_int_recorder");
::bvar::Window<::babylon::BvarIntRecorder, ::bvar::SERIES_IN_SECOND>
int_recorder_window {&int_recorder, -1};
int_recorder_window.expose("xxxx_int_recorder_win");
::babylon::BvarPercentile percentile;
::bvar::Window<::babylon::BvarPercentile, ::bvar::SERIES_IN_SECOND>
percentile_window {&percentile, -1};
percentile_window.expose("xxxx_percentile_win");
*/

return 0;
}
Loading

0 comments on commit 48919c0

Please sign in to comment.