Ecriture d’un serveur d’action et d’un client (Python)

Objectif : Implémenter un serveur d’action et un client en Python.

Niveau du didacticiel : Intermédiaire

Durée : 15 minutes

Arrière-plan

Les actions sont une forme de communication asynchrone dans ROS 2. Les clients d’action envoient des demandes d’objectif aux serveurs d’action. Les serveurs d’action envoient des commentaires sur les objectifs et les résultats aux clients d’action.

Conditions préalables

Vous aurez besoin du package action_tutorials_interfaces et de l’interface Fibonacci.action définie dans le tutoriel précédent, Créer une action.

Tâches

1 Ecrire un serveur d’action

Concentrons-nous sur l’écriture d’un serveur d’action qui calcule la séquence de Fibonacci en utilisant l’action que nous avons créée dans le tutoriel Créer une action.

Jusqu’à présent, vous avez créé des packages et utilisé ros2 run pour exécuter vos nœuds. Cependant, pour simplifier les choses dans ce didacticiel, nous limiterons le serveur d’action à un seul fichier. Si vous souhaitez voir à quoi ressemble un package complet pour les didacticiels d’actions, consultez action_tutorials.

Ouvrez un nouveau fichier dans votre répertoire personnel, appelons-le fibonacci_action_server.py, et ajoutez le code suivant :

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        result = Fibonacci.Result()
        return result


def main(args=None):
    rclpy.init(args=args)

    fibonacci_action_server = FibonacciActionServer()

    rclpy.spin(fibonacci_action_server)


if __name__ == '__main__':
    main()

La ligne 8 définit une classe FibonacciActionServer qui est une sous-classe de Node. La classe est initialisée en appelant le constructeur Node, en nommant notre nœud fibonacci_action_server :

        super().__init__('fibonacci_action_server')

Dans le constructeur, nous instancions également un nouveau serveur d’action :

        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

Un serveur d’action nécessite quatre arguments :

  1. Un nœud ROS 2 auquel ajouter le client d’action : self.

  2. Le type de l’action : Fibonacci (importé à la ligne 5).

  3. Le nom de l’action : 'fibonacci'.

  4. Une fonction de rappel pour exécuter les objectifs acceptés : self.execute_callback. Ce rappel doit renvoyer un message de résultat pour le type d’action.

Nous définissons également une méthode execute_callback dans notre classe :

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        result = Fibonacci.Result()
        return result

C’est la méthode qui sera appelée pour exécuter un objectif une fois qu’il est accepté.

Essayons d’exécuter notre serveur d’action :

python3 fibonacci_action_server.py

Dans un autre terminal, nous pouvons utiliser l’interface de ligne de commande pour envoyer un objectif :

ros2 action send_goal fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"

Dans le terminal qui exécute le serveur d’action, vous devriez voir un message enregistré « Exécution de l’objectif… » suivi d’un avertissement indiquant que l’état de l’objectif n’a pas été défini. Par défaut, si l’état de la poignée d’objectif n’est pas défini dans le rappel d’exécution, il prend l’état abandonné.

Nous pouvons utiliser la méthode succeed() sur le descripteur d’objectif pour indiquer que l’objectif a été atteint :

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        goal_handle.succeed()
        result = Fibonacci.Result()
        return result

Maintenant, si vous redémarrez le serveur d’action et envoyez un autre objectif, vous devriez voir l’objectif terminé avec le statut SUCCEEDED.

Faisons maintenant en sorte que notre objectif d’exécution calcule et renvoie la séquence de Fibonacci demandée :

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')

        sequence = [0, 1]

        for i in range(1, goal_handle.request.order):
            sequence.append(sequence[i] + sequence[i-1])

        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = sequence
        return result

Après avoir calculé la séquence, nous l’attribuons au champ de message de résultat avant de revenir.

Encore une fois, redémarrez le serveur d’action et envoyez un autre objectif. Vous devriez voir le but se terminer avec la bonne séquence de résultats.

1.2 Commentaires sur la publication

L’un des avantages des actions est la possibilité de fournir des commentaires à un client d’action lors de l’exécution de l’objectif. Nous pouvons faire en sorte que notre serveur d’action publie des commentaires pour les clients d’action en appelant la fonction publish_feedback() du descripteur d’objectif méthode.

Nous allons remplacer la variable sequence et utiliser un message de retour pour stocker la séquence à la place. Après chaque mise à jour du message de retour dans la boucle for, nous publions le message de retour et dormons pour un effet spectaculaire :

import time

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')

        feedback_msg = Fibonacci.Feedback()
        feedback_msg.partial_sequence = [0, 1]

        for i in range(1, goal_handle.request.order):
            feedback_msg.partial_sequence.append(
                feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])
            self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))
            goal_handle.publish_feedback(feedback_msg)
            time.sleep(1)

        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = feedback_msg.partial_sequence
        return result


def main(args=None):
    rclpy.init(args=args)

    fibonacci_action_server = FibonacciActionServer()

    rclpy.spin(fibonacci_action_server)


if __name__ == '__main__':
    main()

Après avoir redémarré le serveur d’action, nous pouvons confirmer que les commentaires sont maintenant publiés en utilisant l’outil de ligne de commande avec l’option --feedback :

ros2 action send_goal --feedback fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"

2 Écrire un client d’action

Nous allons également limiter le client d’action à un seul fichier. Ouvrez un nouveau fichier, appelons-le fibonacci_action_client.py, et ajoutez le code passe-partout suivant :

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        return self._action_client.send_goal_async(goal_msg)


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    future = action_client.send_goal(10)

    rclpy.spin_until_future_complete(action_client, future)


if __name__ == '__main__':
    main()

Nous avons défini une classe FibonacciActionClient qui est une sous-classe de Node. La classe est initialisée en appelant le constructeur Node, en nommant notre nœud fibonacci_action_client :

        super().__init__('fibonacci_action_client')

Toujours dans le constructeur de classe, nous créons un client d’action en utilisant la définition d’action personnalisée du tutoriel précédent sur Créer une action :

        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

Nous créons un ActionClient en lui passant trois arguments :

  1. Un nœud ROS 2 auquel ajouter le client d’action : self

  2. Le type de l’action : Fibonacci

  3. Le nom de l’action : 'fibonacci'

Notre client d’action pourra communiquer avec des serveurs d’action du même nom et du même type d’action.

Nous définissons également une méthode send_goal dans la classe FibonacciActionClient :

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        return self._action_client.send_goal_async(goal_msg)

Cette méthode attend que le serveur d’action soit disponible, puis envoie un objectif au serveur. Il renvoie un avenir que nous pourrons attendre plus tard.

Après la définition de la classe, nous définissons une fonction main() qui initialise ROS 2 et crée une instance de notre nœud FibonacciActionClient. Il envoie ensuite un objectif et attend que cet objectif soit atteint.

Enfin, nous appelons main() au point d’entrée de notre programme Python.

Testons notre client d’action en exécutant d’abord le serveur d’action créé précédemment :

python3 fibonacci_action_server.py

Dans un autre terminal, lancez l’action client :

python3 fibonacci_action_client.py

Vous devriez voir des messages imprimés par le serveur d’action lorsqu’il exécute avec succès l’objectif :

[INFO] [fibonacci_action_server]: Executing goal...
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5])
# etc.

Le client d’action doit démarrer, puis se terminer rapidement. À ce stade, nous avons un client d’action fonctionnel, mais nous ne voyons aucun résultat ni n’obtenons de commentaires.

2.1 Obtenir un résultat

On peut donc envoyer un objectif, mais comment savoir quand il est atteint ? Nous pouvons obtenir les informations sur les résultats en quelques étapes. Tout d’abord, nous devons obtenir une poignée de but pour le but que nous avons envoyé. Ensuite, nous pouvons utiliser la poignée d’objectif pour demander le résultat.

Voici le code complet de cet exemple :

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        self._send_goal_future = self._action_client.send_goal_async(goal_msg)

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    action_client.send_goal(10)

    rclpy.spin(action_client)


if __name__ == '__main__':
    main()

La méthode ActionClient.send_goal_async() renvoie un futur à un descripteur d’objectif . Nous enregistrons d’abord un rappel lorsque le futur est terminé :

        self._send_goal_future.add_done_callback(self.goal_response_callback)

Notez que le futur est terminé lorsqu’un serveur d’action accepte ou rejette la demande d’objectif. Regardons le goal_response_callback plus en détail. Nous pouvons vérifier si l’objectif a été rejeté et revenir plus tôt car nous savons qu’il n’y aura pas de résultat :

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

Maintenant que nous avons un descripteur d’objectif, nous pouvons l’utiliser pour demander le résultat avec la méthode get_result_async(). Semblable à l’envoi de l’objectif, nous aurons un avenir qui se terminera lorsque le résultat sera prêt. Enregistrons un rappel comme nous l’avons fait pour la réponse à l’objectif :

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

Dans le rappel, nous enregistrons la séquence de résultats et arrêtons ROS 2 pour une sortie propre :

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()

Avec un serveur d’action exécuté dans un terminal séparé, essayez d’exécuter notre client d’action Fibonacci !

python3 fibonacci_action_client.py

Vous devriez voir des messages enregistrés pour l’objectif accepté et le résultat final.

2.2 Obtenir des commentaires

Notre client d’action peut envoyer des objectifs. Bon! Mais ce serait formidable si nous pouvions obtenir des commentaires sur les objectifs que nous envoyons depuis le serveur d’action.

Voici le code complet de cet exemple :

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    action_client.send_goal(10)

    rclpy.spin(action_client)


if __name__ == '__main__':
    main()

Voici la fonction de rappel pour les messages de feedback :

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))

Dans le rappel, nous obtenons la partie retour du message et affichons le champ partial_sequence à l’écran.

Nous devons enregistrer le rappel auprès du client d’action. Ceci est réalisé en transmettant en plus le rappel au client d’action lorsque nous envoyons un objectif :

        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

Nous sommes prêts. Si nous exécutons notre client d’action, vous devriez voir les commentaires imprimés à l’écran.

Résumé

Dans ce didacticiel, vous avez assemblé ligne par ligne un serveur d’action Python et un client d’action, et les avez configurés pour échanger des objectifs, des commentaires et des résultats.