Enregistrer un sac à partir d’un nœud (Python)

Objectif : Enregistrez les données de votre propre nœud Python dans un sac.

Niveau du didacticiel : Avancé

Durée : 20 minutes

Arrière-plan

rosbag2 ne fournit pas seulement l’outil de ligne de commande ros2 bag. Il fournit également une API Python pour lire et écrire dans un sac à partir de votre propre code source. Cela vous permet de vous abonner à un sujet et d’enregistrer les données reçues dans un sac en même temps que d’effectuer tout autre traitement de votre choix sur ces données. Vous pouvez le faire, par exemple, pour enregistrer les données d’un sujet et le résultat du traitement de ces données sans avoir besoin d’envoyer les données traitées sur un sujet juste pour les enregistrer. Étant donné que toutes les données peuvent être enregistrées dans un sac, il est également possible d’enregistrer des données générées par une autre source qu’un sujet, telles que des données synthétiques pour des ensembles d’apprentissage. Ceci est utile, par exemple, pour générer rapidement un sac contenant un grand nombre d’échantillons répartis sur une longue durée de lecture.

Conditions préalables

Vous devriez avoir les packages rosbag2 installés dans le cadre de votre configuration habituelle de ROS 2.

Si vous avez installé des packages Debian sur Linux, il peut être installé par défaut. Si ce n’est pas le cas, vous pouvez l’installer à l’aide de cette commande.

sudo apt install ros-rolling-rosbag2

Ce tutoriel traite de l’utilisation des sacs ROS 2, y compris depuis le terminal. Vous devriez déjà avoir terminé le tutoriel de base sur le sac ROS 2.

Tâches

1 Créer un package

Ouvrez un nouveau terminal et sourcez votre installation ROS 2 pour que les commandes ros2 fonctionnent.

Suivez ces instructions pour créer un nouvel espace de travail nommé ros2_ws.

Naviguez dans le répertoire ros2_ws/src et créez un nouveau package :

ros2 pkg create --build-type ament_python bag_recorder_nodes_py --dependencies rclpy rosbag2_py example_interfaces std_msgs

Votre terminal renverra un message vérifiant la création de votre package bag_recorder_nodes_py et tous ses fichiers et dossiers nécessaires. L’argument --dependencies ajoutera automatiquement les lignes de dépendance nécessaires à package.xml.txt``. Dans ce cas, le package utilisera le package rosbag2_py ainsi que le package rclpy. Une dépendance sur le package example_interfaces est également requise pour les définitions de message.

1.1 Mettre à jour package.xml et setup.py

Comme vous avez utilisé l’option --dependencies lors de la création du package, vous n’avez pas besoin d’ajouter manuellement des dépendances à package.xml.txt``. Comme toujours, assurez-vous d’ajouter la description, l’adresse e-mail et le nom du responsable, ainsi que les informations de licence à package.xml.

<description>Python bag writing tutorial</description>
<maintainer email="you@email.com">Your Name</maintainer>
<license>Apache License 2.0</license>

Assurez-vous également d’ajouter ces informations au fichier setup.py également.

maintainer='Your Name',
maintainer_email='you@email.com',
description='Python bag writing tutorial',
license='Apache License 2.0',

2 Écrivez le nœud Python

Dans le répertoire ros2_ws/src/bag_recorder_nodes_py/bag_recorder_nodes_py, créez un nouveau fichier appelé simple_bag_recorder.py et collez-y le code suivant.

import rclpy
from rclpy.node import Node
from rclpy.serialization import serialize_message
from std_msgs.msg import String

import rosbag2_py

class SimpleBagRecorder(Node):
    def __init__(self):
        super().__init__('simple_bag_recorder')
        self.writer = rosbag2_py.SequentialWriter()

        storage_options = rosbag2_py._storage.StorageOptions(
            uri='my_bag',
            storage_id='sqlite3')
        converter_options = rosbag2_py._storage.ConverterOptions('', '')
        self.writer.open(storage_options, converter_options)

        topic_info = rosbag2_py._storage.TopicMetadata(
            name='chatter',
            type='std_msgs/msg/String',
            serialization_format='cdr')
        self.writer.create_topic(topic_info)

        self.subscription = self.create_subscription(
            String,
            'chatter',
            self.topic_callback,
            10)
        self.subscription

    def topic_callback(self, msg):
        self.writer.write(
            'chatter',
            serialize_message(msg),
            self.get_clock().now().nanoseconds)


def main(args=None):
    rclpy.init(args=args)
    sbr = SimpleBagRecorder()
    rclpy.spin(sbr)
    rclpy.shutdown()


if __name__ == '__main__':
    main()

2.1 Examiner le code

Les instructions import en haut sont les dépendances du paquet. Notez l’importation du package rosbag2_py pour les fonctions et les structures nécessaires pour travailler avec les fichiers bag.

Dans le constructeur de classe, nous commençons par créer l’objet écrivain que nous utiliserons pour écrire dans le sac. Nous créons un SequentialWriter, qui écrit les messages dans le sac dans l’ordre reçu. D’autres rédacteurs avec des comportements différents peuvent être disponibles dans le rosbag2.

self.writer = rosbag2_py.SequentialWriter()

Maintenant que nous avons un objet écrivain, nous pouvons ouvrir le sac en l’utilisant. Nous spécifions l’URI du sac à créer et le format (sqlite3), laissant les autres options à leurs valeurs par défaut. Les options de conversion par défaut sont utilisées, qui n’effectueront aucune conversion et stockeront les messages dans le format de sérialisation dans lequel ils sont reçus.

storage_options = rosbag2_py._storage.StorageOptions(
    uri='my_bag',
    storage_id='sqlite3')
converter_options = rosbag2_py._storage.ConverterOptions('', '')
self.writer.open(storage_options, converter_options)

Ensuite, nous devons informer l’auteur des sujets que nous souhaitons stocker. Cela se fait en créant un objet TopicMetadata et en l’enregistrant avec le rédacteur. Cet objet spécifie le nom de rubrique, le type de données de rubrique et le format de sérialisation utilisé.

topic_info = rosbag2_py._storage.TopicMetadata(
    name='chatter',
    type='std_msgs/msg/String',
    serialization_format='cdr')
self.writer.create_topic(topic_info)

Le rédacteur étant désormais configuré pour enregistrer les données que nous lui transmettons, nous créons un abonnement et lui spécifions un rappel. Nous écrirons des données dans le sac dans le rappel.

self.subscription = self.create_subscription(
    String,
    'chatter',
    self.topic_callback,
    10)
self.subscription

Le rappel reçoit le message sous forme non sérialisée (comme c’est le cas pour l’API rclpy) et transmet le message à l’auteur, en spécifiant le sujet auquel les données sont destinées et l’horodatage à enregistrer avec le message. Cependant, l’écrivain a besoin d’un message sérialisé pour le stocker dans le sac. Cela signifie que nous devons sérialiser les données avant de les transmettre au rédacteur. Pour cette raison, nous appelons serialize_message() et transmettons le résultat à l’auteur, plutôt que de transmettre directement le message.

def topic_callback(self, msg):
    self.writer.write(
        'chatter',
        serialize_message(msg),
        self.get_clock().now().nanoseconds)

Le fichier se termine par la fonction main utilisée pour créer une instance du nœud et démarrer le traitement par ROS.

def main(args=None):
    rclpy.init(args=args)
    sbr = SimpleBagRecorder()
    rclpy.spin(sbr)
    rclpy.shutdown()

2.2 Ajouter un point d’entrée

Ouvrez le fichier setup.py dans le package bag_recorder_nodes_py et ajoutez un point d’entrée pour votre nœud.

entry_points={
    'console_scripts': [
        'simple_bag_recorder = bag_recorder_nodes_py.simple_bag_recorder:main',
    ],
},

3 Construire et exécuter

Revenez à la racine de votre espace de travail, ros2_ws, et créez votre nouveau package.

colcon build --packages-select bag_recorder_nodes_py

Ouvrez un nouveau terminal, accédez à ros2_ws et sourcez les fichiers d’installation.

source install/setup.bash

Exécutez maintenant le nœud :

ros2 run bag_recorder_nodes_py simple_bag_recorder

Ouvrez un deuxième terminal et exécutez le nœud d’exemple talker.

ros2 run demo_nodes_cpp talker

Cela commencera à publier des données sur le sujet chatter. Au fur et à mesure que le nœud d’écriture du sac reçoit ces données, il les écrira dans le sac my_bag. Si le répertoire my_bag existe déjà, vous devez d’abord le supprimer avant d’exécuter le nœud simple_bag_recorder. C’est parce que rosbag2 n’écrasera pas les sacs existants par défaut, et donc le répertoire de destination ne peut pas exister.

Terminez les deux nœuds. Ensuite, dans un terminal, démarrez le nœud d’exemple listener.

ros2 run demo_nodes_cpp listener

Dans l’autre terminal, utilisez ros2 bag pour lire le sac enregistré par votre nœud.

ros2 bag play my_bag

Vous verrez les messages du sac reçus par le nœud listener.

Si vous souhaitez exécuter à nouveau le nœud d’écriture de sac, vous devrez d’abord supprimer le répertoire my_bag.

4 Enregistrer les données synthétiques d’un nœud

Toutes les données peuvent être enregistrées dans un sac, pas seulement les données reçues sur un sujet. Un cas d’utilisation courant pour écrire dans un sac à partir de votre propre nœud consiste à générer et à stocker des données synthétiques. Dans cette section, vous apprendrez à écrire un nœud qui génère des données et les stocke dans un sac. Nous allons démontrer deux approches pour ce faire. Le premier utilise un nœud avec une minuterie ; c’est l’approche que vous utiliseriez si votre génération de données est externe au nœud, comme la lecture de données directement à partir du matériel (par exemple, une caméra). La deuxième approche n’utilise pas de nœud ; c’est l’approche que vous pouvez utiliser lorsque vous n’avez pas besoin d’utiliser les fonctionnalités de l’infrastructure ROS.

4.1 Écrire un nœud Python

Dans le répertoire ros2_ws/src/bag_recorder_nodes_py/bag_recorder_nodes_py, créez un nouveau fichier appelé data_generator_node.py et collez-y le code suivant.

import rclpy
from rclpy.node import Node
from rclpy.serialization import serialize_message
from example_interfaces.msg import Int32

import rosbag2_py

class DataGeneratorNode(Node):
    def __init__(self):
        super().__init__('data_generator_node')
        self.data = Int32()
        self.data.data = 0
        self.writer = rosbag2_py.SequentialWriter()

        storage_options = rosbag2_py._storage.StorageOptions(
            uri='timed_synthetic_bag',
            storage_id='sqlite3')
        converter_options = rosbag2_py._storage.ConverterOptions('', '')
        self.writer.open(storage_options, converter_options)

        topic_info = rosbag2_py._storage.TopicMetadata(
            name='synthetic',
            type='example_interfaces/msg/Int32',
            serialization_format='cdr')
        self.writer.create_topic(topic_info)

        self.timer = self.create_timer(1, self.timer_callback)

    def timer_callback(self):
        self.writer.write(
            'synthetic',
            serialize_message(self.data),
            self.get_clock().now().nanoseconds)
        self.data.data += 1


def main(args=None):
    rclpy.init(args=args)
    dgn = DataGeneratorNode()
    rclpy.spin(dgn)
    rclpy.shutdown()


if __name__ == '__main__':
    main()

4.2 Examiner le code

Une grande partie de ce code est identique au premier exemple. Les différences importantes sont décrites ici.

Tout d’abord, le nom du sac est changé.

storage_options = rosbag2_py._storage.StorageOptions(
    uri='timed_synthetic_bag',
    storage_id='sqlite3')

Le nom du sujet est également modifié, tout comme le type de données stocké.

topic_info = rosbag2_py._storage.TopicMetadata(
    name='synthetic',
    type='example_interfaces/msg/Int32',
    serialization_format='cdr')
self.writer.create_topic(topic_info)

Plutôt qu’un abonnement à un sujet, ce nœud a une minuterie. Le minuteur se déclenche avec une période d’une seconde et appelle la fonction membre donnée lorsqu’il le fait.

self.timer = self.create_timer(1, self.timer_callback)

Dans le rappel de la minuterie, nous générons (ou obtenons autrement, par exemple en lisant à partir d’un port série connecté à un matériel) les données que nous souhaitons stocker dans le sac. Comme dans l’exemple précédent, les données ne sont pas encore sérialisées, nous devons donc les sérialiser avant de les transmettre au rédacteur.

self.writer.write(
    'synthetic',
    serialize_message(self.data),
    self.get_clock().now().nanoseconds)

4.3 Ajouter un exécutable

Ouvrez le fichier setup.py dans le package bag_recorder_nodes_py et ajoutez un point d’entrée pour votre nœud.

entry_points={
    'console_scripts': [
        'simple_bag_recorder = bag_recorder_nodes_py.simple_bag_recorder:main',
        'data_generator_node = bag_recorder_nodes_py.data_generator_node:main',
    ],
},

4.4 Construire et exécuter

Revenez à la racine de votre espace de travail, ros2_ws, et créez votre package.

colcon build --packages-select bag_recorder_nodes_py

Ouvrez un nouveau terminal, accédez à ros2_ws et sourcez les fichiers d’installation.

source install/setup.bash

Si le répertoire timed_synthetic_bag existe déjà, vous devez d’abord le supprimer avant d’exécuter le nœud.

Exécutez maintenant le nœud :

ros2 run bag_recorder_nodes_py data_generator_node

Attendez environ 30 secondes, puis terminez le nœud avec ctrl-c. Ensuite, lisez le sac créé.

ros2 bag play timed_synthetic_bag

Ouvrez un second terminal et renvoyez le sujet /synthetic.

ros2 topic echo /synthetic

Vous verrez les données générées et stockées dans le sac imprimées sur la console au rythme d’un message par seconde.

5 Enregistrer des données synthétiques à partir d’un exécutable

Maintenant que vous pouvez créer un sac qui stocke des données à partir d’une source autre qu’un sujet, vous allez apprendre à générer et à enregistrer des données synthétiques à partir d’un exécutable non nœud. L’avantage de cette approche est un code plus simple et la création rapide d’une grande quantité de données.

5.1 Écrire un exécutable Python

Dans le répertoire ros2_ws/src/bag_recorder_nodes_py/bag_recorder_nodes_py, créez un nouveau fichier appelé data_generator_executable.py et collez-y le code suivant.

from rclpy.clock import Clock
from rclpy.duration import Duration
from rclpy.serialization import serialize_message
from example_interfaces.msg import Int32

import rosbag2_py


def main(args=None):
    writer = rosbag2_py.SequentialWriter()

    storage_options = rosbag2_py._storage.StorageOptions(
        uri='big_synthetic_bag',
        storage_id='sqlite3')
    converter_options = rosbag2_py._storage.ConverterOptions('', '')
    writer.open(storage_options, converter_options)

    topic_info = rosbag2_py._storage.TopicMetadata(
        name='synthetic',
        type='example_interfaces/msg/Int32',
        serialization_format='cdr')
    writer.create_topic(topic_info)

    time_stamp = Clock().now()
    for ii in range(0, 100):
        data = Int32()
        data.data = ii
        writer.write(
            'synthetic',
            serialize_message(data),
            time_stamp.nanoseconds)
        time_stamp += Duration(seconds=1)

if __name__ == '__main__':
    main()

5.2 Examiner le code

Une comparaison de cet échantillon et de l’échantillon précédent révélera qu’ils ne sont pas si différents. La seule différence significative est l’utilisation d’une boucle for pour piloter la génération de données plutôt qu’un temporisateur.

Notez que nous générons également des horodatages pour les données plutôt que de nous fier à l’heure système actuelle pour chaque échantillon. L’horodatage peut être n’importe quelle valeur dont vous avez besoin. Les données seront lues à la vitesse donnée par ces horodatages, c’est donc un moyen utile de contrôler la vitesse de lecture par défaut des échantillons. Notez également que même si l’écart entre chaque échantillon est d’une seconde complète, cet exécutable n’a pas besoin d’attendre une seconde entre chaque échantillon. Cela nous permet de générer beaucoup de données couvrant une large période de temps en beaucoup moins de temps que la lecture ne prendra.

time_stamp = Clock().now()
for ii in range(0, 100):
    data = Int32()
    data.data = ii
    writer.write(
        'synthetic',
        serialize_message(data),
        time_stamp.nanoseconds)
    time_stamp += Duration(seconds=1)

5.3 Ajouter un exécutable

Ouvrez le fichier setup.py dans le package bag_recorder_nodes_py et ajoutez un point d’entrée pour votre nœud.

entry_points={
    'console_scripts': [
        'simple_bag_recorder = bag_recorder_nodes_py.simple_bag_recorder:main',
        'data_generator_node = bag_recorder_nodes_py.data_generator_node:main',
        'data_generator_executable = bag_recorder_nodes_py.data_generator_executable:main',
    ],
},

5.4 Construire et exécuter

Revenez à la racine de votre espace de travail, ros2_ws, et créez votre package.

colcon build --packages-select bag_recorder_nodes_py

Ouvrez un terminal, accédez à ros2_ws et sourcez les fichiers d’installation.

source install/setup.bash

Si le répertoire big_synthetic_bag existe déjà, vous devez d’abord le supprimer avant de lancer l’exécutable.

Lancez maintenant l’exécutable :

ros2 run bag_recorder_nodes_py data_generator_executable

Notez que l’exécutable s’exécute et se termine très rapidement.

Rejouez maintenant le sac créé.

ros2 bag play big_synthetic_bag

Ouvrez un second terminal et renvoyez le sujet /synthetic.

ros2 topic echo /synthetic

Vous verrez les données générées et stockées dans le sac imprimées sur la console au rythme d’un message par seconde. Même si le sac a été généré rapidement, il est toujours lu au rythme indiqué par les horodatages.

Résumé

Vous avez créé un nœud qui enregistre les données qu’il reçoit sur un sujet dans un sac. Vous avez testé l’enregistrement d’un sac à l’aide du nœud et vérifié que les données ont été enregistrées en lisant le sac. Cette approche peut être utilisée pour enregistrer un sac avec des données supplémentaires à celles qu’il a reçues sur un sujet, par exemple avec des résultats obtenus à partir du traitement des données reçues. Vous avez ensuite créé un nœud et un exécutable pour générer des données synthétiques et les stocker dans un sac. Ces dernières approches sont utiles notamment pour générer des données synthétiques qui peuvent être utilisées, par exemple, comme ensembles d’apprentissage.