官方并没有提供 ROS2 版本的 smach 元功能包,我们可以使用 DeepX 移植的包:
将上述两个包 clone 到工作空间下,然后 rosdep + colcon build + source 即可
网上的示例教程都是 ROS1 的,我们自己来改成 ROS2 版本:
首先 cd 到 src 目录下创建一个新的功能包
ros2 pkg create --build-type ament_python --dependencies rclpy smach smach_ros
import rclpy
from rclpy.node import Node
import smach
import smach_ros
class SmachTestNode(Node):
def __init__(self, name):
super().__init__(name)
self.get_logger().info("启动 demo 节点")
# 定义状态 Foo
class Foo(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome1','outcome2'])
self.counter = 0
def execute(self, userdata):
print('Executing state FOO')
if self.counter < 3:
self.counter += 1
return 'outcome1'
else:
return 'outcome2'
# 定义状态 Bar
class Bar(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome2'])
def execute(self, userdata):
print('Executing state BAR')
return 'outcome2'
# main
def main(args=None):
rclpy.init(args=args) # 初始化 ros
node = SmachTestNode("execute_smach_test")
# Create a SMACH state machine
sm = smach.StateMachine(outcomes=['outcome4', 'outcome5'])
# Open the container
with sm:
# Add states to the container
smach.StateMachine.add('FOO', Foo(),
transitions={'outcome1':'BAR', 'outcome2':'outcome4'})
smach.StateMachine.add('BAR', Bar(),
transitions={'outcome2':'FOO'})
# Create and start the introspection server
sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT')
sis.start()
# Execute SMACH plan
outcome = sm.execute()
# Wait for ctrl-c to stop the application
rclpy.spin(node)
sis.stop()
if __name__ == '__main__':
main()
启动该节点后显示

在另一个终端中输入ros2 run smach_viewer smach_viewer_gui.py即可可视化显示状态机

状态机有四大概念。
作为状态机,首先需要有状态,这个例程中有两个状态:FOO、BAR。这两个状态都是通过Python的函数进行定义的,而且结构相似,都包含初始化(init)和执行(execute)这两个函数。
初始化函数用来初始化该状态类,调用smach中状态的初始化函数,同时需要定义输出状态:outcome1、outcome2
def __init__(self):
smach.State.__init__(self, outcomes=['outcome1','outcome2'])
self.counter = 0
这里的outcome代表状态结束时的输出值,使用字符串表示,由用户来定义取值的范围。例如我们可以定义状态执行是否成功:[‘succeeded’, ‘failed’, ‘awesome’]. 每个状态的输出值可以有多个,根据不同的输出值有可能跳转到不同的下一个状态。
注意:初始化函数中不能阻塞,如果需要实现同步等阻塞功能,可以使用多线程实现。
执行函数就是每个状态中的具体工作内容了,可以进行阻塞工作,当工作后需要返回定义的输出值,该状态结束。
def execute(self, userdata):
print('Executing state FOO')
if self.counter < 3:
self.counter += 1
return 'outcome1'
else:
return 'outcome2'
在main函数中,首先初始化ROS节点
然后使用StateMachine创建一个状态机,并且指定状态机执行结束后的最终输出值有两个:outcome4和outcome5.
# 创建一个 SMACH 状态机
sm = smach.StateMachine(outcomes=['outcome4', 'outcome5'])
SMACH状态机是一个容器,我们可以使用add方法添加需要的状态到状态机容器当中,同时需要设置状态之间的跳转关系。
# 打开容器
with sm:
# 将状态添加到容器中
smach.StateMachine.add('FOO', Foo(),
transitions={'outcome1':'BAR', 'outcome2':'outcome4'})
smach.StateMachine.add('BAR', Bar(),
transitions={'outcome2':'FOO'})
例如我们首先在状态机中添加一个名为“FOO”的状态,该状态的类就是我们之前定义的Foo,transitions代表状态的变换(即状态机的第四个概念),如果FOO状态执行输出outcome1时,则跳转到“BAR”状态,如果执行输出outcome2时,则结束这个状态机,并且输出outcome4。
还记得我们上边看到的可视化界面么,为了将状态机可视化显示,我们需要在代码中加入可视化服务器:
# Create and start the introspection server
sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT')
sis.start()
IntrospectionServer()方法用来创建内部可视化服务器,有三个参数:
然后就可以使用execute()方法开始执行状态机了:
# Execute SMACH plan
outcome = sm.execute()
执行结束后需要将内部可视化服务器停止:
sis.stop()
现在再来回顾整个状态机:

从图中我们可以看到,状态机开始工作后首先跳入我们添加的第一个状态“FOO”,然后在该状态中累加counter变量,counter小于3时,会输出outcome1,状态结束后就跳转到“BAR”状态。在“BAR”状态中什么都没做,输出outcome2回到“FOO”状态。就这样来回几次之后,counter等于3,“FOO”状态的输出值变成outcome2,继而跳转到outcome4,也就代表着有限状态机运行结束。outcome5全程并没有涉及到,所以在图上成为了一个孤立的节点。
可以将上边的状态机想象成一个简单的机器人应用:机器人去抓取桌子上的杯子,如果抓取到就结束任务,如果抓取不到就继续尝试,尝试3次还没抓到,就放弃抓取,结束任务。
后记:一个复杂的状态机:
