Skip to content

Commit faa73b6

Browse files
authored
Fix sigabrt in module integration tests (#421)
1 parent 25e0233 commit faa73b6

File tree

5 files changed

+97
-66
lines changed

5 files changed

+97
-66
lines changed

src/viam/sdk/robot/client.cpp

+20-25
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,15 @@ RobotClient::~RobotClient() {
129129

130130
void RobotClient::close() {
131131
should_refresh_.store(false);
132-
for (const std::shared_ptr<std::thread>& t : threads_) {
133-
t->~thread();
132+
133+
for (auto& thread : threads_) {
134+
thread.join();
134135
}
136+
threads_.clear();
137+
135138
stop_all();
136-
viam_channel_->close();
139+
140+
viam_channel_.close();
137141
}
138142

139143
bool is_error_response(const grpc::Status& response) {
@@ -211,7 +215,7 @@ void RobotClient::refresh() {
211215
if (rs) {
212216
try {
213217
const std::shared_ptr<Resource> rpc_client =
214-
rs->create_rpc_client(name.name(), channel_);
218+
rs->create_rpc_client(name.name(), viam_channel_.channel());
215219
const Name name_({name.namespace_(), name.type(), name.subtype()}, "", name.name());
216220
new_resources.emplace(name_, rpc_client);
217221
} catch (const std::exception& exc) {
@@ -250,11 +254,10 @@ void RobotClient::refresh_every() {
250254
}
251255
};
252256

253-
RobotClient::RobotClient(std::shared_ptr<ViamChannel> channel)
254-
: channel_(channel->channel()),
255-
viam_channel_(std::move(channel)),
257+
RobotClient::RobotClient(ViamChannel channel)
258+
: viam_channel_(std::move(channel)),
256259
should_close_channel_(false),
257-
impl_(std::make_unique<impl>(RobotService::NewStub(channel_))) {}
260+
impl_(std::make_unique<impl>(RobotService::NewStub(viam_channel_.channel()))) {}
258261

259262
std::vector<Name> RobotClient::resource_names() const {
260263
const std::lock_guard<std::mutex> lock(lock_);
@@ -286,20 +289,14 @@ void RobotClient::log(const std::string& name,
286289
}
287290
}
288291

289-
std::shared_ptr<RobotClient> RobotClient::with_channel(std::shared_ptr<ViamChannel> channel,
292+
std::shared_ptr<RobotClient> RobotClient::with_channel(ViamChannel channel,
290293
const Options& options) {
291-
std::shared_ptr<RobotClient> robot = std::make_shared<RobotClient>(std::move(channel));
294+
auto robot = std::make_shared<RobotClient>(std::move(channel));
292295
robot->refresh_interval_ = options.refresh_interval();
293296
robot->should_refresh_ = (robot->refresh_interval_ > 0);
294297
if (robot->should_refresh_) {
295-
const std::shared_ptr<std::thread> t =
296-
std::make_shared<std::thread>(&RobotClient::refresh_every, robot);
297-
// TODO(RSDK-1743): this was leaking, confirm that adding thread catching in
298-
// close/destructor lets us shutdown gracefully. See also address sanitizer,
299-
// UB sanitizer
300-
t->detach();
301-
robot->threads_.push_back(t);
302-
};
298+
robot->threads_.emplace_back(&RobotClient::refresh_every, robot);
299+
}
303300

304301
robot->refresh();
305302
return robot;
@@ -308,8 +305,8 @@ std::shared_ptr<RobotClient> RobotClient::with_channel(std::shared_ptr<ViamChann
308305
std::shared_ptr<RobotClient> RobotClient::at_address(const std::string& address,
309306
const Options& options) {
310307
const char* uri = address.c_str();
311-
auto channel = ViamChannel::dial_initial(uri, options.dial_options());
312-
std::shared_ptr<RobotClient> robot = RobotClient::with_channel(channel, options);
308+
auto robot =
309+
RobotClient::with_channel(ViamChannel::dial_initial(uri, options.dial_options()), options);
313310
robot->should_close_channel_ = true;
314311

315312
return robot;
@@ -318,11 +315,9 @@ std::shared_ptr<RobotClient> RobotClient::at_address(const std::string& address,
318315
std::shared_ptr<RobotClient> RobotClient::at_local_socket(const std::string& address,
319316
const Options& options) {
320317
const std::string addr = "unix://" + address;
321-
const char* uri = addr.c_str();
322-
const std::shared_ptr<grpc::Channel> channel =
323-
sdk::impl::create_viam_channel(uri, grpc::InsecureChannelCredentials());
324-
auto viam_channel = std::make_shared<ViamChannel>(channel, address.c_str(), nullptr);
325-
std::shared_ptr<RobotClient> robot = RobotClient::with_channel(viam_channel, options);
318+
auto robot = RobotClient::with_channel(
319+
ViamChannel(sdk::impl::create_viam_channel(addr, grpc::InsecureChannelCredentials())),
320+
options);
326321
robot->should_close_channel_ = true;
327322

328323
return robot;

src/viam/sdk/robot/client.hpp

+6-7
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ class RobotClient {
6464
friend bool operator==(const operation& lhs, const operation& rhs);
6565
};
6666

67+
explicit RobotClient(ViamChannel channel);
68+
6769
~RobotClient();
70+
6871
void refresh();
6972
void close();
7073

@@ -87,10 +90,7 @@ class RobotClient {
8790
/// @param options Options for connecting and refreshing.
8891
/// Connects directly to a pre-existing channel. A robot created this way must be
8992
/// `close()`d manually.
90-
static std::shared_ptr<RobotClient> with_channel(std::shared_ptr<ViamChannel> channel,
91-
const Options& options);
92-
93-
RobotClient(std::shared_ptr<ViamChannel> channel);
93+
static std::shared_ptr<RobotClient> with_channel(ViamChannel channel, const Options& options);
9494

9595
std::vector<Name> resource_names() const;
9696

@@ -165,13 +165,12 @@ class RobotClient {
165165

166166
void refresh_every();
167167

168-
std::vector<std::shared_ptr<std::thread>> threads_;
168+
std::vector<std::thread> threads_;
169169

170170
std::atomic<bool> should_refresh_;
171171
unsigned int refresh_interval_;
172172

173-
std::shared_ptr<GrpcChannel> channel_;
174-
std::shared_ptr<ViamChannel> viam_channel_;
173+
ViamChannel viam_channel_;
175174
bool should_close_channel_;
176175

177176
struct impl;

src/viam/sdk/rpc/dial.cpp

+53-23
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,46 @@
1818
namespace viam {
1919
namespace sdk {
2020

21-
const std::shared_ptr<grpc::Channel>& ViamChannel::channel() const {
22-
return channel_;
23-
}
21+
struct ViamChannel::impl {
22+
impl(const char* path, void* runtime) : path(path), rust_runtime(runtime) {}
2423

25-
void ViamChannel::close() {
26-
if (closed_) {
27-
return;
24+
impl(const impl&) = delete;
25+
26+
impl(impl&& other) noexcept
27+
: path(std::exchange(other.path, nullptr)),
28+
rust_runtime(std::exchange(other.rust_runtime, nullptr)) {}
29+
30+
impl& operator=(const impl&) = delete;
31+
32+
impl& operator=(impl&& other) noexcept {
33+
path = std::exchange(other.path, nullptr);
34+
rust_runtime = std::exchange(other.rust_runtime, nullptr);
35+
36+
return *this;
2837
}
29-
closed_ = true;
30-
free_string(path_);
31-
free_rust_runtime(rust_runtime_);
38+
39+
~impl() {
40+
free_string(path);
41+
free_rust_runtime(rust_runtime);
42+
}
43+
44+
const char* path;
45+
void* rust_runtime;
3246
};
3347

48+
ViamChannel::ViamChannel(std::shared_ptr<grpc::Channel> channel, const char* path, void* runtime)
49+
: channel_(std::move(channel)), pimpl_(std::make_unique<ViamChannel::impl>(path, runtime)) {}
50+
51+
ViamChannel::ViamChannel(std::shared_ptr<grpc::Channel> channel) : channel_(std::move(channel)) {}
52+
53+
ViamChannel::ViamChannel(ViamChannel&&) noexcept = default;
54+
55+
ViamChannel& ViamChannel::operator=(ViamChannel&&) noexcept = default;
56+
57+
ViamChannel::~ViamChannel() {
58+
close();
59+
}
60+
3461
const std::string& Credentials::type() const {
3562
return type_;
3663
}
@@ -39,9 +66,6 @@ const std::string& Credentials::payload() const {
3966
return payload_;
4067
}
4168

42-
ViamChannel::ViamChannel(std::shared_ptr<grpc::Channel> channel, const char* path, void* runtime)
43-
: channel_(std::move(channel)), path_(path), closed_(false), rust_runtime_(runtime) {}
44-
4569
DialOptions::DialOptions() = default;
4670

4771
DialOptions& DialOptions::set_credentials(boost::optional<Credentials> creds) {
@@ -105,8 +129,8 @@ bool DialOptions::allows_insecure_downgrade() const {
105129
return allow_insecure_downgrade_;
106130
}
107131

108-
std::shared_ptr<ViamChannel> ViamChannel::dial_initial(
109-
const char* uri, const boost::optional<DialOptions>& options) {
132+
ViamChannel ViamChannel::dial_initial(const char* uri,
133+
const boost::optional<DialOptions>& options) {
110134
DialOptions opts = options.get_value_or(DialOptions());
111135
auto timeout = opts.timeout();
112136
auto attempts_remaining = opts.initial_connection_attempts();
@@ -129,11 +153,10 @@ std::shared_ptr<ViamChannel> ViamChannel::dial_initial(
129153
}
130154
// the while loop will run until we either return or throw an error, so we can never reach this
131155
// point
132-
BOOST_UNREACHABLE_RETURN(nullptr)
156+
BOOST_UNREACHABLE_RETURN(ViamChannel(nullptr))
133157
}
134158

135-
std::shared_ptr<ViamChannel> ViamChannel::dial(const char* uri,
136-
const boost::optional<DialOptions>& options) {
159+
ViamChannel ViamChannel::dial(const char* uri, const boost::optional<DialOptions>& options) {
137160
void* ptr = init_rust_runtime();
138161
const DialOptions opts = options.get_value_or(DialOptions());
139162
const std::chrono::duration<float> float_timeout = opts.timeout();
@@ -157,12 +180,19 @@ std::shared_ptr<ViamChannel> ViamChannel::dial(const char* uri,
157180

158181
std::string address("unix://");
159182
address += socket_path;
160-
const std::shared_ptr<grpc::Channel> channel =
161-
impl::create_viam_channel(address, grpc::InsecureChannelCredentials());
162-
const std::unique_ptr<viam::robot::v1::RobotService::Stub> st =
163-
viam::robot::v1::RobotService::NewStub(channel);
164-
return std::make_shared<ViamChannel>(channel, socket_path, ptr);
165-
};
183+
184+
return ViamChannel(sdk::impl::create_viam_channel(address, grpc::InsecureChannelCredentials()),
185+
socket_path,
186+
ptr);
187+
}
188+
189+
const std::shared_ptr<grpc::Channel>& ViamChannel::channel() const {
190+
return channel_;
191+
}
192+
193+
void ViamChannel::close() {
194+
pimpl_.reset();
195+
}
166196

167197
unsigned int Options::refresh_interval() const {
168198
return refresh_interval_;

src/viam/sdk/rpc/dial.hpp

+17-9
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,24 @@ namespace sdk {
1313

1414
class DialOptions;
1515
class ViamChannel {
16-
public:
17-
void close();
1816
ViamChannel(std::shared_ptr<GrpcChannel> channel, const char* path, void* runtime);
1917

18+
public:
19+
explicit ViamChannel(std::shared_ptr<GrpcChannel> channel);
20+
21+
ViamChannel(ViamChannel&&) noexcept;
22+
23+
ViamChannel& operator=(ViamChannel&&) noexcept;
24+
25+
~ViamChannel();
26+
2027
/// @brief Connects to a robot at the given URI address, using the provided dial options (or
2128
/// default options is none are provided). Ignores initial connection options specifying
2229
/// how many times to attempt to connect and with what timeout.
2330
/// In general, use of this method is discouraged. `RobotClient::at_address(...)` is the
2431
/// preferred method to connect to a robot, and creates the channel itself.
2532
/// @throws Exception if it is unable to establish a connection to the provided URI
26-
static std::shared_ptr<ViamChannel> dial(const char* uri,
27-
const boost::optional<DialOptions>& options);
33+
static ViamChannel dial(const char* uri, const boost::optional<DialOptions>& options);
2834

2935
// @brief Dials to a robot at the given URI address, using the provided dial options (or default
3036
// options is none are provided). Additionally specifies that this dial is an initial connection
@@ -33,16 +39,18 @@ class ViamChannel {
3339
/// preferred method to connect to a robot, and creates the channel itself.
3440
/// @throws Exception if it is unable to establish a connection to the provided URI within
3541
/// the given number of initial connection attempts
36-
static std::shared_ptr<ViamChannel> dial_initial(const char* uri,
37-
const boost::optional<DialOptions>& options);
42+
static ViamChannel dial_initial(const char* uri, const boost::optional<DialOptions>& options);
3843

3944
const std::shared_ptr<GrpcChannel>& channel() const;
4045

46+
void close();
47+
4148
private:
49+
struct impl;
50+
4251
std::shared_ptr<GrpcChannel> channel_;
43-
const char* path_;
44-
bool closed_;
45-
void* rust_runtime_;
52+
53+
std::unique_ptr<impl> pimpl_;
4654
};
4755

4856
class Credentials {

src/viam/sdk/tests/test_robot.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ void robot_client_to_mocks_pipeline(F&& test_case) {
5050
// in-process gRPC channel.
5151
auto test_server = TestServer(server);
5252
auto grpc_channel = test_server.grpc_in_process_channel();
53-
auto viam_channel = std::make_shared<ViamChannel>(grpc_channel, "", nullptr);
54-
auto client = RobotClient::with_channel(viam_channel, Options(0, boost::none));
53+
auto client = RobotClient::with_channel(ViamChannel(grpc_channel), Options(0, boost::none));
5554

5655
// Run the passed-in test case on the created stack and give access to the
5756
// created RobotClient and MockRobotService.

0 commit comments

Comments
 (0)