Ecriture d’un serveur d’action et d’un client (Python)
Objectif : Implémenter un serveur d’action et un client en Python.
Niveau du didacticiel : Intermédiaire
Durée : 15 minutes
Contenu
Arrière-plan
Les actions sont une forme de communication asynchrone dans ROS 2. Les clients d’action envoient des demandes d’objectif aux serveurs d’action. Les serveurs d’action envoient des commentaires sur les objectifs et les résultats aux clients d’action.
Conditions préalables
Vous aurez besoin du package action_tutorials_interfaces
et de l’interface Fibonacci.action
définie dans le tutoriel précédent, Créer une action.
Tâches
1 Ecrire un serveur d’action
Concentrons-nous sur l’écriture d’un serveur d’action qui calcule la séquence de Fibonacci en utilisant l’action que nous avons créée dans le tutoriel Créer une action.
Jusqu’à présent, vous avez créé des packages et utilisé ros2 run
pour exécuter vos nœuds. Cependant, pour simplifier les choses dans ce didacticiel, nous limiterons le serveur d’action à un seul fichier. Si vous souhaitez voir à quoi ressemble un package complet pour les didacticiels d’actions, consultez action_tutorials.
Ouvrez un nouveau fichier dans votre répertoire personnel, appelons-le fibonacci_action_server.py
, et ajoutez le code suivant :
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
result = Fibonacci.Result()
return result
def main(args=None):
rclpy.init(args=args)
fibonacci_action_server = FibonacciActionServer()
rclpy.spin(fibonacci_action_server)
if __name__ == '__main__':
main()
La ligne 8 définit une classe FibonacciActionServer
qui est une sous-classe de Node
. La classe est initialisée en appelant le constructeur Node
, en nommant notre nœud fibonacci_action_server
:
super().__init__('fibonacci_action_server')
Dans le constructeur, nous instancions également un nouveau serveur d’action :
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)
Un serveur d’action nécessite quatre arguments :
Un nœud ROS 2 auquel ajouter le client d’action :
self
.Le type de l’action :
Fibonacci
(importé à la ligne 5).Le nom de l’action :
'fibonacci'
.Une fonction de rappel pour exécuter les objectifs acceptés :
self.execute_callback
. Ce rappel doit renvoyer un message de résultat pour le type d’action.
Nous définissons également une méthode execute_callback
dans notre classe :
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
result = Fibonacci.Result()
return result
C’est la méthode qui sera appelée pour exécuter un objectif une fois qu’il est accepté.
Essayons d’exécuter notre serveur d’action :
python3 fibonacci_action_server.py
python3 fibonacci_action_server.py
python fibonacci_action_server.py
Dans un autre terminal, nous pouvons utiliser l’interface de ligne de commande pour envoyer un objectif :
ros2 action send_goal fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"
Dans le terminal qui exécute le serveur d’action, vous devriez voir un message enregistré « Exécution de l’objectif… » suivi d’un avertissement indiquant que l’état de l’objectif n’a pas été défini. Par défaut, si l’état de la poignée d’objectif n’est pas défini dans le rappel d’exécution, il prend l’état abandonné.
Nous pouvons utiliser la méthode succeed() sur le descripteur d’objectif pour indiquer que l’objectif a été atteint :
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
goal_handle.succeed()
result = Fibonacci.Result()
return result
Maintenant, si vous redémarrez le serveur d’action et envoyez un autre objectif, vous devriez voir l’objectif terminé avec le statut SUCCEEDED
.
Faisons maintenant en sorte que notre objectif d’exécution calcule et renvoie la séquence de Fibonacci demandée :
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
sequence = [0, 1]
for i in range(1, goal_handle.request.order):
sequence.append(sequence[i] + sequence[i-1])
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = sequence
return result
Après avoir calculé la séquence, nous l’attribuons au champ de message de résultat avant de revenir.
Encore une fois, redémarrez le serveur d’action et envoyez un autre objectif. Vous devriez voir le but se terminer avec la bonne séquence de résultats.
1.2 Commentaires sur la publication
L’un des avantages des actions est la possibilité de fournir des commentaires à un client d’action lors de l’exécution de l’objectif. Nous pouvons faire en sorte que notre serveur d’action publie des commentaires pour les clients d’action en appelant la fonction publish_feedback() du descripteur d’objectif méthode.
Nous allons remplacer la variable sequence
et utiliser un message de retour pour stocker la séquence à la place. Après chaque mise à jour du message de retour dans la boucle for, nous publions le message de retour et dormons pour un effet spectaculaire :
import time
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
feedback_msg = Fibonacci.Feedback()
feedback_msg.partial_sequence = [0, 1]
for i in range(1, goal_handle.request.order):
feedback_msg.partial_sequence.append(
feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])
self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))
goal_handle.publish_feedback(feedback_msg)
time.sleep(1)
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = feedback_msg.partial_sequence
return result
def main(args=None):
rclpy.init(args=args)
fibonacci_action_server = FibonacciActionServer()
rclpy.spin(fibonacci_action_server)
if __name__ == '__main__':
main()
Après avoir redémarré le serveur d’action, nous pouvons confirmer que les commentaires sont maintenant publiés en utilisant l’outil de ligne de commande avec l’option --feedback
:
ros2 action send_goal --feedback fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"
2 Écrire un client d’action
Nous allons également limiter le client d’action à un seul fichier. Ouvrez un nouveau fichier, appelons-le fibonacci_action_client.py
, et ajoutez le code passe-partout suivant :
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
return self._action_client.send_goal_async(goal_msg)
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
future = action_client.send_goal(10)
rclpy.spin_until_future_complete(action_client, future)
if __name__ == '__main__':
main()
Nous avons défini une classe FibonacciActionClient
qui est une sous-classe de Node
. La classe est initialisée en appelant le constructeur Node
, en nommant notre nœud fibonacci_action_client
:
super().__init__('fibonacci_action_client')
Toujours dans le constructeur de classe, nous créons un client d’action en utilisant la définition d’action personnalisée du tutoriel précédent sur Créer une action :
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
Nous créons un ActionClient
en lui passant trois arguments :
Un nœud ROS 2 auquel ajouter le client d’action :
self
Le type de l’action :
Fibonacci
Le nom de l’action :
'fibonacci'
Notre client d’action pourra communiquer avec des serveurs d’action du même nom et du même type d’action.
Nous définissons également une méthode send_goal
dans la classe FibonacciActionClient
:
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
return self._action_client.send_goal_async(goal_msg)
Cette méthode attend que le serveur d’action soit disponible, puis envoie un objectif au serveur. Il renvoie un avenir que nous pourrons attendre plus tard.
Après la définition de la classe, nous définissons une fonction main()
qui initialise ROS 2 et crée une instance de notre nœud FibonacciActionClient
. Il envoie ensuite un objectif et attend que cet objectif soit atteint.
Enfin, nous appelons main()
au point d’entrée de notre programme Python.
Testons notre client d’action en exécutant d’abord le serveur d’action créé précédemment :
python3 fibonacci_action_server.py
python3 fibonacci_action_server.py
python fibonacci_action_server.py
Dans un autre terminal, lancez l’action client :
python3 fibonacci_action_client.py
python3 fibonacci_action_client.py
python fibonacci_action_client.py
Vous devriez voir des messages imprimés par le serveur d’action lorsqu’il exécute avec succès l’objectif :
[INFO] [fibonacci_action_server]: Executing goal...
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5])
# etc.
Le client d’action doit démarrer, puis se terminer rapidement. À ce stade, nous avons un client d’action fonctionnel, mais nous ne voyons aucun résultat ni n’obtenons de commentaires.
2.1 Obtenir un résultat
On peut donc envoyer un objectif, mais comment savoir quand il est atteint ? Nous pouvons obtenir les informations sur les résultats en quelques étapes. Tout d’abord, nous devons obtenir une poignée de but pour le but que nous avons envoyé. Ensuite, nous pouvons utiliser la poignée d’objectif pour demander le résultat.
Voici le code complet de cet exemple :
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(goal_msg)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result.sequence))
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
action_client.send_goal(10)
rclpy.spin(action_client)
if __name__ == '__main__':
main()
La méthode ActionClient.send_goal_async() renvoie un futur à un descripteur d’objectif . Nous enregistrons d’abord un rappel lorsque le futur est terminé :
self._send_goal_future.add_done_callback(self.goal_response_callback)
Notez que le futur est terminé lorsqu’un serveur d’action accepte ou rejette la demande d’objectif. Regardons le goal_response_callback
plus en détail. Nous pouvons vérifier si l’objectif a été rejeté et revenir plus tôt car nous savons qu’il n’y aura pas de résultat :
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')
Maintenant que nous avons un descripteur d’objectif, nous pouvons l’utiliser pour demander le résultat avec la méthode get_result_async(). Semblable à l’envoi de l’objectif, nous aurons un avenir qui se terminera lorsque le résultat sera prêt. Enregistrons un rappel comme nous l’avons fait pour la réponse à l’objectif :
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
Dans le rappel, nous enregistrons la séquence de résultats et arrêtons ROS 2 pour une sortie propre :
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result.sequence))
rclpy.shutdown()
Avec un serveur d’action exécuté dans un terminal séparé, essayez d’exécuter notre client d’action Fibonacci !
python3 fibonacci_action_client.py
python3 fibonacci_action_client.py
python fibonacci_action_client.py
Vous devriez voir des messages enregistrés pour l’objectif accepté et le résultat final.
2.2 Obtenir des commentaires
Notre client d’action peut envoyer des objectifs. Bon! Mais ce serait formidable si nous pouvions obtenir des commentaires sur les objectifs que nous envoyons depuis le serveur d’action.
Voici le code complet de cet exemple :
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result.sequence))
rclpy.shutdown()
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
action_client.send_goal(10)
rclpy.spin(action_client)
if __name__ == '__main__':
main()
Voici la fonction de rappel pour les messages de feedback :
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))
Dans le rappel, nous obtenons la partie retour du message et affichons le champ partial_sequence
à l’écran.
Nous devons enregistrer le rappel auprès du client d’action. Ceci est réalisé en transmettant en plus le rappel au client d’action lorsque nous envoyons un objectif :
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
Nous sommes prêts. Si nous exécutons notre client d’action, vous devriez voir les commentaires imprimés à l’écran.