SMACC2
Loading...
Searching...
No Matches
cp_micro_ros_agent.cpp
Go to the documentation of this file.
1// Copyright 2025 Robosoft Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
16
17#include <errno.h>
18#include <fcntl.h>
19#include <signal.h>
20#include <stdio.h>
21#include <sys/types.h>
22#include <sys/wait.h>
23#include <unistd.h>
24#include <cstring>
25#include <sstream>
26#include <thread>
27#include <vector>
28
29namespace cl_px4_mr
30{
31
32CpMicroRosAgent::CpMicroRosAgent(std::string command, std::string nodeName)
33: command_(std::move(command)), nodeName_(std::move(nodeName))
34{
35}
36
38
40{
41 std::lock_guard<std::mutex> lock(mutex_);
42
43 if (launched_)
44 {
45 RCLCPP_INFO(getLogger(), "CpMicroRosAgent: already launched, skipping");
46 return;
47 }
48
49 RCLCPP_INFO(getLogger(), "CpMicroRosAgent: launching '%s'", command_.c_str());
50
51 int pipefd[2];
52 if (pipe(pipefd) == -1)
53 {
54 RCLCPP_ERROR(getLogger(), "CpMicroRosAgent: pipe() failed: %s", strerror(errno));
55 return;
56 }
57
58 pid_t pid = fork();
59
60 if (pid == 0)
61 {
62 // Child process
63 close(pipefd[0]);
64 dup2(pipefd[1], STDOUT_FILENO);
65 dup2(pipefd[1], STDERR_FILENO);
66 close(pipefd[1]);
67
68 execl("/bin/sh", "/bin/sh", "-c", command_.c_str(), nullptr);
69
70 // If execl returns, it failed
71 _exit(1);
72 }
73 else if (pid > 0)
74 {
75 // Parent process
76 close(pipefd[1]);
77 agentPid_ = pid;
78 pipeFd_ = pipefd[0];
79
80 // Set pipe to non-blocking
81 int flags = fcntl(pipeFd_, F_GETFL, 0);
82 fcntl(pipeFd_, F_SETFL, flags | O_NONBLOCK);
83
84 // Spawn detached reader thread to drain pipe (prevents buffer fill)
85 shutdownRequested_ = false;
86 std::thread readerThread(
87 [this]()
88 {
89 char buf[256];
90 while (!shutdownRequested_)
91 {
92 ssize_t n = read(pipeFd_, buf, sizeof(buf) - 1);
93 if (n > 0)
94 {
95 buf[n] = '\0';
96 RCLCPP_DEBUG(getLogger(), "[micro_ros_agent] %s", buf);
97 }
98 else
99 {
100 std::this_thread::sleep_for(std::chrono::milliseconds(100));
101 }
102 }
103 });
104 readerThread.detach();
105
106 launched_ = true;
107 RCLCPP_INFO(getLogger(), "CpMicroRosAgent: launched with PID %d", agentPid_);
108 }
109 else
110 {
111 RCLCPP_ERROR(getLogger(), "CpMicroRosAgent: fork() failed: %s", strerror(errno));
112 close(pipefd[0]);
113 close(pipefd[1]);
114 }
115}
116
118{
119 std::lock_guard<std::mutex> lock(mutex_);
120
121 if (agentPid_ <= 0)
122 {
123 return;
124 }
125
126 RCLCPP_INFO(getLogger(), "CpMicroRosAgent: shutting down agent (PID %d)", agentPid_);
127
128 shutdownRequested_ = true;
129
130 // Kill process tree recursively
132
133 // Reap zombie with brief retry loop
134 for (int i = 0; i < 10; i++)
135 {
136 int status;
137 pid_t result = waitpid(agentPid_, &status, WNOHANG);
138 if (result == agentPid_ || result == -1)
139 {
140 break;
141 }
142 std::this_thread::sleep_for(std::chrono::milliseconds(100));
143 }
144
145 // Close pipe
146 if (pipeFd_ >= 0)
147 {
148 close(pipeFd_);
149 pipeFd_ = -1;
150 }
151
152 agentPid_ = -1;
153 launched_ = false;
154
155 RCLCPP_INFO(getLogger(), "CpMicroRosAgent: shutdown complete");
156}
157
159
161{
162 std::lock_guard<std::mutex> lock(mutex_);
163 return agentPid_;
164}
165
166std::string CpMicroRosAgent::getNodeName() const { return nodeName_; }
167
169{
170 std::string command = "pgrep -P " + std::to_string(pid);
171 FILE * pipe = popen(command.c_str(), "r");
172
173 if (!pipe)
174 {
175 return;
176 }
177
178 char buffer[128];
179 std::string result;
180
181 while (fgets(buffer, sizeof(buffer), pipe) != NULL)
182 {
183 result += buffer;
184 }
185
186 pclose(pipe);
187
188 std::istringstream iss(result);
189 pid_t childPid;
190 std::vector<pid_t> childPids;
191
192 while (iss >> childPid)
193 {
194 childPids.push_back(childPid);
195 }
196
197 // Kill child processes before killing the parent process
198 for (const pid_t & child : childPids)
199 {
201 }
202
203 // After killing all children, kill the parent process
204 int res = kill(pid, SIGTERM);
205 if (res == 0)
206 {
207 RCLCPP_FATAL(rclcpp::get_logger("CpMicroRosAgent"), "Killed process %d", pid);
208 }
209}
210
211} // namespace cl_px4_mr
CpMicroRosAgent(std::string command="ros2 run micro_ros_agent micro_ros_agent udp4 -p 8888 2>&1 | tee /tmp/xrce_agent.log", std::string nodeName="/px4_micro_xrce_dds")
std::atomic< bool > shutdownRequested_
static void killProcessesRecursive(pid_t pid)
rclcpp::Logger getLogger() const