From ff591b7f725750f3066ed8c04ad4e503f9fc0877 Mon Sep 17 00:00:00 2001 From: Terence Tong Date: Mon, 21 Feb 2022 14:25:30 +0800 Subject: [PATCH] Mon Feb 21 02:25:30 PM HKT 2022 --- README.md | 2 +- lib/core/core.dart | 7 ++ lib/core/param.dart | 49 ++++++++++ lib/core/request.dart | 111 ++++++++++++++++++++++ lib/core/ros.dart | 202 +++++++++++++++++++++++++++++++++++++++++ lib/core/ros_html.dart | 6 ++ lib/core/ros_io.dart | 6 ++ lib/core/ros_stub.dart | 6 ++ lib/core/service.dart | 96 ++++++++++++++++++++ lib/core/topic.dart | 173 +++++++++++++++++++++++++++++++++++ lib/roslib.dart | 6 ++ 11 files changed, 663 insertions(+), 1 deletion(-) create mode 100644 lib/core/core.dart create mode 100644 lib/core/param.dart create mode 100644 lib/core/request.dart create mode 100644 lib/core/ros.dart create mode 100644 lib/core/ros_html.dart create mode 100644 lib/core/ros_io.dart create mode 100644 lib/core/ros_stub.dart create mode 100644 lib/core/service.dart create mode 100644 lib/core/topic.dart create mode 100644 lib/roslib.dart diff --git a/README.md b/README.md index 896285c..395a398 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # roslibdart -The is a fork of the original roslib by @artrmz, @TimWhiting, @artrmz. roslibdart is a library for communicating to a ROS node over websockets with rosbridge. It is heavily influenced by roslibjs and follows the same structure. This is an effort to update the library and mark it compatible to ros2 and dart2. +The is a fork of the original roslib by @artrmz, @TimWhiting, @artrmz. roslibdart is a library for communicating to a ROS node over websockets with rosbridge. It is heavily influenced by roslibjs and follows the same structure. This fork is an effort to update the library and make it compatible to ros2 and dart2. ## List of feature implementation statuses (essentially a list of features required to reach roslibjs's level) - [X] Core: diff --git a/lib/core/core.dart b/lib/core/core.dart new file mode 100644 index 0000000..2da5340 --- /dev/null +++ b/lib/core/core.dart @@ -0,0 +1,7 @@ +// Copyright (c) 2019 Conrad Heidebrecht. + +export 'ros.dart'; +export 'request.dart'; +export 'service.dart'; +export 'topic.dart'; +export 'param.dart'; diff --git a/lib/core/param.dart b/lib/core/param.dart new file mode 100644 index 0000000..fef3206 --- /dev/null +++ b/lib/core/param.dart @@ -0,0 +1,49 @@ +// Copyright (c) 2019 Conrad Heidebrecht. + +import 'dart:async'; +import 'ros.dart'; +import 'service.dart'; + +/// A wrapper for a ROS parameter. +class Param { + Param({required this.ros, required this.name}); + + /// The ROS connection. + Ros ros; + + /// Name of the parameter. + String name; + + /// Get the parameter from the ROS node using the /rosapi/get_param service. + Future get() { + final client = Service( + ros: ros, + name: '/rosapi/get_param', + type: 'rosapi/GetParam', + ); + return client.call({'name': name}); + } + + /// Set the [value] of the parameter. + Future set(dynamic value) { + final client = Service( + ros: ros, + name: '/rosapi/set_param', + type: 'rosapi/SetParam', + ); + return client.call({ + 'name': name, + 'value': value, + }); + } + + /// Delete the parameter. + Future delete() { + final client = Service( + ros: ros, + name: '/rosapi/delete_param', + type: 'rosapi/DeleteParam', + ); + return client.call({'name': name}); + } +} diff --git a/lib/core/request.dart b/lib/core/request.dart new file mode 100644 index 0000000..1edf316 --- /dev/null +++ b/lib/core/request.dart @@ -0,0 +1,111 @@ +// Copyright (c) 2019 Conrad Heidebrecht. + +import 'dart:convert'; + +/// Container for all possible ROS request parameters. +class Request { + Request({ + required this.op, + this.id, + this.type, + this.topic, + this.msg, + this.latch, + this.compression, + this.throttleRate, + this.queueLength, + this.queueSize, + this.service, + this.args, + this.values, + this.result, + }); + + /// Requested operation. + String op; + + /// ID to distinguish request or object operating on. + String? id; + + /// Message or service type. + String? type; + + /// Topic name operating on. + String? topic; + + /// Message object (generally JSON). + dynamic msg; + + /// Latch the topic when publishing. + bool? latch; + + /// The type of compression to use, like 'png' or 'cbor'. + String? compression; + + /// The rate (in ms between messages) at which to throttle the topic. + int? throttleRate; + + /// The queue length at the bridge side used when subscribing. + int? queueLength; + + /// The queue created at the bridge side for republishing topics. + int? queueSize; + + /// Service name operating on. + String? service; + + /// Arguments of the request (JSON). + Map? args; + + /// Values returned from a request. + dynamic values; + + /// Boolean value indicating the success of the operation. + bool? result; + + factory Request.fromJson(dynamic jsonData) { + return Request( + op: jsonData['op'], + id: jsonData['id'], + type: jsonData['type'], + topic: jsonData['topic'], + msg: jsonData['msg'], + latch: jsonData['latch'], + compression: jsonData['compression'], + throttleRate: jsonData['throttle_rate'], + queueLength: jsonData['queue_length'], + queueSize: jsonData['queue_size'], + service: jsonData['service'], + args: jsonData['args'], + values: jsonData['values'], + result: jsonData['result'], + ); + } + + factory Request.decode(String raw) { + return Request.fromJson(json.decode(raw)); + } + + Map toJson() { + return { + 'op': op, + if (id != null) 'id': id, + if (type != null) 'type': type, + if (topic != null) 'topic': topic, + if (msg != null) 'msg': msg, + if (latch != null) 'latch': latch, + if (compression != null) 'compression': compression, + if (throttleRate != null) 'throttle_rate': throttleRate, + if (queueLength != null) 'queue_length': queueLength, + if (queueSize != null) 'queue_size': queueSize, + if (service != null) 'service': service, + if (args != null) 'args': args, + if (values != null) 'values': values, + if (result != null) 'result': result, + }; + } + + String encode() { + return json.encode(toJson()); + } +} diff --git a/lib/core/ros.dart b/lib/core/ros.dart new file mode 100644 index 0000000..95875e4 --- /dev/null +++ b/lib/core/ros.dart @@ -0,0 +1,202 @@ +// Copyright (c) 2019 Conrad Heidebrecht. + +import 'dart:async'; +import 'dart:convert'; +import 'package:web_socket_channel/web_socket_channel.dart'; + +// ignore: uri_does_not_exist +// ignore: unused_import +import 'ros_stub.dart' + // ignore: uri_does_not_exist + if (dart.library.html) 'ros_html.dart' + // ignore: uri_does_not_exist + if (dart.library.io) 'ros_io.dart'; + +import 'request.dart'; + +/// Status enums. +enum Status { none, connecting, connected, closed, errored } +enum TopicStatus { + subscribed, + unsubscribed, + publisher, + advertised, + unadvertised, +} + +/// The class through which all data to and from a ROS node goes through. +/// Manages status and key information about the connection and node. +class Ros { + /// Initializes the [_statusController] as a broadcast. + /// The [url] of the ROS node can be optionally specified at this point. + Ros({this.url}) { + _statusController = StreamController.broadcast(); + } + + /// The url of ROS node running the rosbridge server. + dynamic url; + + /// Total subscribers to ever connect. + int subscribers = 0; + + /// Total number of advertisers to ever connect. + int advertisers = 0; + + /// Total number of publishers to ever connect. + int publishers = 0; + + /// Total number of callers to ever call a service. + int serviceCallers = 0; + + /// The sum to generate IDs with. + int get ids => subscribers + advertisers + publishers + serviceCallers; + + /// The websocket connection to communicate with the ROS node. + late WebSocketChannel _channel; + + /// Subscription to the websocket stream. + late StreamSubscription _channelListener; + + /// JSON broadcast websocket stream. + late Stream> stream; + + /// The controller to update subscribers on the state of the connection. + late StreamController _statusController; + + /// Subscribable stream to listen for connection status changes. + Stream get statusStream => _statusController.stream; + + /// Status variable that can be used when not interested in getting live updates. + Status status = Status.none; + + /// Connect to the ROS node, the [url] can override what was provided in the constructor. + void connect({dynamic url}) { + this.url = url ?? this.url; + url ??= this.url; + try { + // Initialize the connection to the ROS node with a Websocket channel. + _channel = initializeWebSocketChannel(url); + stream = + _channel.stream.asBroadcastStream().map((raw) => json.decode(raw)); + // Update the connection status. + status = Status.connected; + _statusController.add(status); + // Listen for messages on the connection to update the status. + _channelListener = stream.listen((data) { + //print('INCOMING: $data'); + if (status != Status.connected) { + status = Status.connected; + _statusController.add(status); + } + }, onError: (error) { + status = Status.errored; + _statusController.add(status); + }, onDone: () { + status = Status.closed; + _statusController.add(status); + }); + } on WebSocketChannelException { + status = Status.errored; + _statusController.add(status); + } + } + + /// Close the connection to the ROS node, an exit [code] and [reason] can + /// be optionally specified. + Future close([int? code, String? reason]) async { + /// Close listener and websocket. + await _channelListener.cancel(); + await _channel.sink.close(code, reason); + + /// Update the connection status. + _statusController.add(Status.closed); + status = Status.closed; + } + + /// Send a [message] to the ROS node + bool send(dynamic message) { + // If we're not connected give up. + if (status != Status.connected) return false; + // Format the message into JSON and then stringify. + final toSend = (message is Request) + ? json.encode(message.toJson()) + : (message is Map || message is List) + ? json.encode(message) + : message; + //print('OUTGOING: $toSend'); + // Actually send it to the node. + _channel.sink.add(toSend); + return true; + } + + void authenticate({ + required String mac, + required String client, + required String dest, + required String rand, + required DateTime t, + required String level, + required DateTime end, + }) async { + send({ + 'mac': mac, + 'client': client, + 'dest': dest, + 'rand': rand, + 't': t.millisecondsSinceEpoch, + 'level': level, + 'end': end.millisecondsSinceEpoch, + }); + } + + /// Sends a set_level request to the server. + /// [level] can be one of {none, error, warning, info}, and + /// [id] is the optional operation ID to change status level on + void setStatusLevel({String ?level, int ?id}) { + send({ + 'op': 'set_level', + 'level': level, + 'id': id, + }); + } + + /// Request a subscription ID. + String requestSubscriber(String name) { + subscribers++; + return 'subscribe:' + name + ':' + ids.toString(); + } + + /// Request an advertiser ID. + String requestAdvertiser(String name) { + advertisers++; + return 'advertise:' + name + ':' + ids.toString(); + } + + /// Request a publisher ID. + String requestPublisher(String name) { + publishers++; + return 'publish:' + name + ':' + ids.toString(); + } + + /// Request a service caller ID. + String requestServiceCaller(String name) { + serviceCallers++; + return 'call_service:' + name + ':' + ids.toString(); + } + @override + bool operator ==(other) { + return other.hashCode == hashCode; + } + + @override + int get hashCode => + url.hashCode + + subscribers.hashCode + + advertisers.hashCode + + publishers.hashCode + + _channel.hashCode + + _channelListener.hashCode + + stream.hashCode + + _statusController.hashCode + + status.hashCode; +} diff --git a/lib/core/ros_html.dart b/lib/core/ros_html.dart new file mode 100644 index 0000000..6a4547a --- /dev/null +++ b/lib/core/ros_html.dart @@ -0,0 +1,6 @@ +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'package:web_socket_channel/html.dart'; + +WebSocketChannel initializeWebSocketChannel(String url) { + return HtmlWebSocketChannel.connect(url); +} diff --git a/lib/core/ros_io.dart b/lib/core/ros_io.dart new file mode 100644 index 0000000..e4425fe --- /dev/null +++ b/lib/core/ros_io.dart @@ -0,0 +1,6 @@ +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'package:web_socket_channel/io.dart'; + +WebSocketChannel initializeWebSocketChannel(String url) { + return IOWebSocketChannel.connect(url); +} diff --git a/lib/core/ros_stub.dart b/lib/core/ros_stub.dart new file mode 100644 index 0000000..e72c0d2 --- /dev/null +++ b/lib/core/ros_stub.dart @@ -0,0 +1,6 @@ +import 'package:web_socket_channel/web_socket_channel.dart'; + +//Implemented in `ros_html.dart` and `ros_io.dart` +WebSocketChannel initializeWebSocketChannel(String url) { + throw UnsupportedError('Cannot create a web socket channel without dart:html or dart:io.'); +} diff --git a/lib/core/service.dart b/lib/core/service.dart new file mode 100644 index 0000000..0eeb793 --- /dev/null +++ b/lib/core/service.dart @@ -0,0 +1,96 @@ +// Copyright (c) 2019 Conrad Heidebrecht. + +import 'dart:async'; +import 'ros.dart'; +import 'request.dart'; + +// Receiver function to handle requests when the service is advertising. +typedef ServiceHandler = Future Function(dynamic request); + +/// Wrapper to interact with ROS services. +class Service { + Service({ + required this.ros, + required this.name, + required this.type, + }); + + /// The ROS connection. + Ros ros; + + /// Name of the service. + String name; + + /// Type of the service. + String type; + + /// Advertiser that is listened to for service requests when advertising. + Stream? _advertiser; + + /// Checks whether or not the service is currently advertising. + bool get isAdvertised => _advertiser != null; + + StreamSubscription? listener; + + /// Call the service with a request ([req]). + Future call(dynamic req) { + // The service can't be called if it's currently advertising. + if (isAdvertised) return Future.value(false); + // Set up the response receiver by filtering data from the ROS node by the ID generated. + final callId = ros.requestServiceCaller(name); + final receiver = ros.stream.where((message) => message['id'] == callId).map( + (Map message) => message['result'] == null + ? Future.error(message['values']!) + : Future.value(message['values'])); + // Wait for the receiver to receive a single response and then return. + final completer = Completer(); + + listener = receiver.listen((d) { + completer.complete(d); + listener!.cancel(); + }); + // Actually send the request. + ros.send(Request( + op: 'call_service', + id: callId, + service: name, + type: type, + args: req, + )); + return completer.future; + } + + // Advertise the service and provide a [handler] to deal with requests. + Future advertise(ServiceHandler handler) async { + if (isAdvertised) return; + // Send the advertise request. + ros.send(Request( + op: 'advertise_service', + type: type, + service: name, + )); + // Listen for requests, forward them to the handler and then + // send the response back to the ROS node. + _advertiser = ros.stream + .where((message) => message['service'] == name) + .asyncMap((req) => handler(req['args']).then((resp) { + ros.send(Request( + op: 'service_response', + id: req['id'], + service: name, + values: resp ?? {}, + result: resp != null, + )); + })); + } + + // Stop advertising the service. + void unadvertise() { + if (!isAdvertised) return; + ros.send(Request( + op: 'unadvertise_service', + service: name, + )); + _advertiser = null; + } +} diff --git a/lib/core/topic.dart b/lib/core/topic.dart new file mode 100644 index 0000000..abe7bf6 --- /dev/null +++ b/lib/core/topic.dart @@ -0,0 +1,173 @@ +// Copyright (c) 2019 Conrad Heidebrecht. + +import 'dart:async'; +import 'ros.dart'; +import 'request.dart'; + +/// Wrapper to interact with ROS topics. +class Topic { + Topic({ + required this.ros, + required this.name, + required this.type, + this.compression = 'none', + this.throttleRate = 0, + this.latch = false, + this.queueSize = 100, + this.queueLength = 0, + this.reconnectOnClose = true, + }) : assert(['png', 'cbor', 'none'].contains(compression)), + assert(throttleRate >= 0); + + /// The ROS connection. + Ros ros; + + /// Stream subscribers to the topic can listen to. + Stream>? subscription; + + /// Name of the topic. + String name; + + /// Message type the topic uses. + String type; + + /// Subscription ID provided by [ros]. + String? subscribeId; + + /// Advertiser ID provided by [ros]. + String? advertiseId; + + /// Checks whether or not the topic is currently advertising. + bool get isAdvertised => advertiseId != null; + + /// Publisher ID provided by [ros]. + late String publishId; + + /// The type of compression to use, like 'png' or 'cbor'. + /// + /// Defaults to 'none'. + String compression; + + /// The rate (in ms between messages) at which to throttle the topic. + /// + /// Defaults to 0. + int throttleRate; + + /// Latch the topic when publishing. + /// + /// Defaults to false. + bool latch; + + /// The queue created at the bridge side for republishing topics. + /// + /// Defaults to 100. + int queueSize; + + /// The queue length at the bridge side used when subscribing. + /// + /// Defaults to 0 (no queueing). + int queueLength; + + /// Flag to enable resubscription and readvertisement on a ROS connection close event. + /// + /// Defaults to true. + bool reconnectOnClose; + + /// Subscribe to the topic if not already subscribed. + Future subscribe() async { + if (subscribeId == null) { + // Create the listenable broadcast subscription stream. + subscription = ros.stream.where((message) => message['topic'] == name); + subscribeId = ros.requestSubscriber(name); + // Send the subscribe request to ROS. + await safeSend(Request( + op: 'subscribe', + id: subscribeId, + type: type, + topic: name, + compression: compression, + throttleRate: throttleRate, + queueLength: queueLength, + )); + } + } + + /// Unsubscribe from the topic. + Future unsubscribe() async { + if (subscribeId != null) { + // Send the request and reset the subscription variables. + await safeSend(Request( + op: 'unsubscribe', + id: subscribeId, + topic: name, + )); + // await ros.requestUnsubscribe(id); + subscription = null; + subscribeId = null; + } + } + + /// Publish a [message] to the topic. + Future publish(dynamic message) async { + // Advertise the topic and then send the publish request. + await advertise(); + publishId = ros.requestPublisher(name); + await safeSend(Request( + op: 'publish', + topic: name, + id: publishId, + msg: message, + latch: latch, + )); + } + + /// Advertise the topic. + Future advertise() async { + if (!isAdvertised) { + // Send the advertisement request. + advertiseId = ros.requestAdvertiser(name); + await safeSend(Request( + op: 'advertise', + id: advertiseId, + type: type, + topic: name, + latch: latch, + queueSize: queueSize, + )); + // If the ROS connection closes show that we're not advertising anymore. + watchForClose(); + } + } + + /// Wait for the connection to close and then reset advertising variables. + Future watchForClose() async { + if (!reconnectOnClose) { + await ros.statusStream.firstWhere((s) => s == Status.closed); + advertiseId = null; + } + } + + /// Stop advertising the topic. + Future unadvertise() async { + if (isAdvertised) { + // Send the unadvertise request and reset variables. + await safeSend(Request( + op: 'unadvertise', + id: advertiseId, + topic: name, + )); + advertiseId = null; + } + } + + /// Safely send a [message] to ROS. + Future safeSend(Request message) async { + // Send the message but if we're not connected and the [reconnectOnClose] flag + // is set, wait for ROS to reconnect and then resend the [message]. + ros.send(message); + if (reconnectOnClose && ros.status != Status.connected) { + await ros.statusStream.firstWhere((s) => s == Status.connected); + ros.send(message); + } + } +} diff --git a/lib/roslib.dart b/lib/roslib.dart new file mode 100644 index 0000000..d701726 --- /dev/null +++ b/lib/roslib.dart @@ -0,0 +1,6 @@ +// Copyright (c) 2019 Conrad Heidebrecht. + +library roslib; + +export 'core/core.dart'; +// export 'actionlib/actionlib.dart';