Mon Feb 21 02:25:30 PM HKT 2022

This commit is contained in:
Terence Tong 2022-02-21 14:25:30 +08:00
parent 6196a8a159
commit ff591b7f72
11 changed files with 663 additions and 1 deletions

View File

@ -1,6 +1,6 @@
# roslibdart # roslibdart
The is a fork of the original roslib by @artrmz, @TimWhiting, @artrmz. roslibdart is a library for communicating to a ROS node over websockets with rosbridge. It is heavily influenced by roslibjs and follows the same structure. This is an effort to update the library and mark it compatible to ros2 and dart2. The is a fork of the original roslib by @artrmz, @TimWhiting, @artrmz. roslibdart is a library for communicating to a ROS node over websockets with rosbridge. It is heavily influenced by roslibjs and follows the same structure. This fork is an effort to update the library and make it compatible to ros2 and dart2.
## List of feature implementation statuses (essentially a list of features required to reach roslibjs's level) ## List of feature implementation statuses (essentially a list of features required to reach roslibjs's level)
- [X] Core: - [X] Core:

7
lib/core/core.dart Normal file
View File

@ -0,0 +1,7 @@
// Copyright (c) 2019 Conrad Heidebrecht.
export 'ros.dart';
export 'request.dart';
export 'service.dart';
export 'topic.dart';
export 'param.dart';

49
lib/core/param.dart Normal file
View File

@ -0,0 +1,49 @@
// Copyright (c) 2019 Conrad Heidebrecht.
import 'dart:async';
import 'ros.dart';
import 'service.dart';
/// A wrapper for a ROS parameter.
class Param {
Param({required this.ros, required this.name});
/// The ROS connection.
Ros ros;
/// Name of the parameter.
String name;
/// Get the parameter from the ROS node using the /rosapi/get_param service.
Future get() {
final client = Service(
ros: ros,
name: '/rosapi/get_param',
type: 'rosapi/GetParam',
);
return client.call({'name': name});
}
/// Set the [value] of the parameter.
Future set(dynamic value) {
final client = Service(
ros: ros,
name: '/rosapi/set_param',
type: 'rosapi/SetParam',
);
return client.call({
'name': name,
'value': value,
});
}
/// Delete the parameter.
Future delete() {
final client = Service(
ros: ros,
name: '/rosapi/delete_param',
type: 'rosapi/DeleteParam',
);
return client.call({'name': name});
}
}

111
lib/core/request.dart Normal file
View File

@ -0,0 +1,111 @@
// Copyright (c) 2019 Conrad Heidebrecht.
import 'dart:convert';
/// Container for all possible ROS request parameters.
class Request {
Request({
required this.op,
this.id,
this.type,
this.topic,
this.msg,
this.latch,
this.compression,
this.throttleRate,
this.queueLength,
this.queueSize,
this.service,
this.args,
this.values,
this.result,
});
/// Requested operation.
String op;
/// ID to distinguish request or object operating on.
String? id;
/// Message or service type.
String? type;
/// Topic name operating on.
String? topic;
/// Message object (generally JSON).
dynamic msg;
/// Latch the topic when publishing.
bool? latch;
/// The type of compression to use, like 'png' or 'cbor'.
String? compression;
/// The rate (in ms between messages) at which to throttle the topic.
int? throttleRate;
/// The queue length at the bridge side used when subscribing.
int? queueLength;
/// The queue created at the bridge side for republishing topics.
int? queueSize;
/// Service name operating on.
String? service;
/// Arguments of the request (JSON).
Map<String, dynamic>? args;
/// Values returned from a request.
dynamic values;
/// Boolean value indicating the success of the operation.
bool? result;
factory Request.fromJson(dynamic jsonData) {
return Request(
op: jsonData['op'],
id: jsonData['id'],
type: jsonData['type'],
topic: jsonData['topic'],
msg: jsonData['msg'],
latch: jsonData['latch'],
compression: jsonData['compression'],
throttleRate: jsonData['throttle_rate'],
queueLength: jsonData['queue_length'],
queueSize: jsonData['queue_size'],
service: jsonData['service'],
args: jsonData['args'],
values: jsonData['values'],
result: jsonData['result'],
);
}
factory Request.decode(String raw) {
return Request.fromJson(json.decode(raw));
}
Map<String, dynamic> toJson() {
return {
'op': op,
if (id != null) 'id': id,
if (type != null) 'type': type,
if (topic != null) 'topic': topic,
if (msg != null) 'msg': msg,
if (latch != null) 'latch': latch,
if (compression != null) 'compression': compression,
if (throttleRate != null) 'throttle_rate': throttleRate,
if (queueLength != null) 'queue_length': queueLength,
if (queueSize != null) 'queue_size': queueSize,
if (service != null) 'service': service,
if (args != null) 'args': args,
if (values != null) 'values': values,
if (result != null) 'result': result,
};
}
String encode() {
return json.encode(toJson());
}
}

202
lib/core/ros.dart Normal file
View File

@ -0,0 +1,202 @@
// Copyright (c) 2019 Conrad Heidebrecht.
import 'dart:async';
import 'dart:convert';
import 'package:web_socket_channel/web_socket_channel.dart';
// ignore: uri_does_not_exist
// ignore: unused_import
import 'ros_stub.dart'
// ignore: uri_does_not_exist
if (dart.library.html) 'ros_html.dart'
// ignore: uri_does_not_exist
if (dart.library.io) 'ros_io.dart';
import 'request.dart';
/// Status enums.
enum Status { none, connecting, connected, closed, errored }
enum TopicStatus {
subscribed,
unsubscribed,
publisher,
advertised,
unadvertised,
}
/// The class through which all data to and from a ROS node goes through.
/// Manages status and key information about the connection and node.
class Ros {
/// Initializes the [_statusController] as a broadcast.
/// The [url] of the ROS node can be optionally specified at this point.
Ros({this.url}) {
_statusController = StreamController<Status>.broadcast();
}
/// The url of ROS node running the rosbridge server.
dynamic url;
/// Total subscribers to ever connect.
int subscribers = 0;
/// Total number of advertisers to ever connect.
int advertisers = 0;
/// Total number of publishers to ever connect.
int publishers = 0;
/// Total number of callers to ever call a service.
int serviceCallers = 0;
/// The sum to generate IDs with.
int get ids => subscribers + advertisers + publishers + serviceCallers;
/// The websocket connection to communicate with the ROS node.
late WebSocketChannel _channel;
/// Subscription to the websocket stream.
late StreamSubscription _channelListener;
/// JSON broadcast websocket stream.
late Stream<Map<String, dynamic>> stream;
/// The controller to update subscribers on the state of the connection.
late StreamController<Status> _statusController;
/// Subscribable stream to listen for connection status changes.
Stream<Status> get statusStream => _statusController.stream;
/// Status variable that can be used when not interested in getting live updates.
Status status = Status.none;
/// Connect to the ROS node, the [url] can override what was provided in the constructor.
void connect({dynamic url}) {
this.url = url ?? this.url;
url ??= this.url;
try {
// Initialize the connection to the ROS node with a Websocket channel.
_channel = initializeWebSocketChannel(url);
stream =
_channel.stream.asBroadcastStream().map((raw) => json.decode(raw));
// Update the connection status.
status = Status.connected;
_statusController.add(status);
// Listen for messages on the connection to update the status.
_channelListener = stream.listen((data) {
//print('INCOMING: $data');
if (status != Status.connected) {
status = Status.connected;
_statusController.add(status);
}
}, onError: (error) {
status = Status.errored;
_statusController.add(status);
}, onDone: () {
status = Status.closed;
_statusController.add(status);
});
} on WebSocketChannelException {
status = Status.errored;
_statusController.add(status);
}
}
/// Close the connection to the ROS node, an exit [code] and [reason] can
/// be optionally specified.
Future<void> close([int? code, String? reason]) async {
/// Close listener and websocket.
await _channelListener.cancel();
await _channel.sink.close(code, reason);
/// Update the connection status.
_statusController.add(Status.closed);
status = Status.closed;
}
/// Send a [message] to the ROS node
bool send(dynamic message) {
// If we're not connected give up.
if (status != Status.connected) return false;
// Format the message into JSON and then stringify.
final toSend = (message is Request)
? json.encode(message.toJson())
: (message is Map || message is List)
? json.encode(message)
: message;
//print('OUTGOING: $toSend');
// Actually send it to the node.
_channel.sink.add(toSend);
return true;
}
void authenticate({
required String mac,
required String client,
required String dest,
required String rand,
required DateTime t,
required String level,
required DateTime end,
}) async {
send({
'mac': mac,
'client': client,
'dest': dest,
'rand': rand,
't': t.millisecondsSinceEpoch,
'level': level,
'end': end.millisecondsSinceEpoch,
});
}
/// Sends a set_level request to the server.
/// [level] can be one of {none, error, warning, info}, and
/// [id] is the optional operation ID to change status level on
void setStatusLevel({String ?level, int ?id}) {
send({
'op': 'set_level',
'level': level,
'id': id,
});
}
/// Request a subscription ID.
String requestSubscriber(String name) {
subscribers++;
return 'subscribe:' + name + ':' + ids.toString();
}
/// Request an advertiser ID.
String requestAdvertiser(String name) {
advertisers++;
return 'advertise:' + name + ':' + ids.toString();
}
/// Request a publisher ID.
String requestPublisher(String name) {
publishers++;
return 'publish:' + name + ':' + ids.toString();
}
/// Request a service caller ID.
String requestServiceCaller(String name) {
serviceCallers++;
return 'call_service:' + name + ':' + ids.toString();
}
@override
bool operator ==(other) {
return other.hashCode == hashCode;
}
@override
int get hashCode =>
url.hashCode +
subscribers.hashCode +
advertisers.hashCode +
publishers.hashCode +
_channel.hashCode +
_channelListener.hashCode +
stream.hashCode +
_statusController.hashCode +
status.hashCode;
}

6
lib/core/ros_html.dart Normal file
View File

@ -0,0 +1,6 @@
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/html.dart';
WebSocketChannel initializeWebSocketChannel(String url) {
return HtmlWebSocketChannel.connect(url);
}

6
lib/core/ros_io.dart Normal file
View File

@ -0,0 +1,6 @@
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/io.dart';
WebSocketChannel initializeWebSocketChannel(String url) {
return IOWebSocketChannel.connect(url);
}

6
lib/core/ros_stub.dart Normal file
View File

@ -0,0 +1,6 @@
import 'package:web_socket_channel/web_socket_channel.dart';
//Implemented in `ros_html.dart` and `ros_io.dart`
WebSocketChannel initializeWebSocketChannel(String url) {
throw UnsupportedError('Cannot create a web socket channel without dart:html or dart:io.');
}

96
lib/core/service.dart Normal file
View File

@ -0,0 +1,96 @@
// Copyright (c) 2019 Conrad Heidebrecht.
import 'dart:async';
import 'ros.dart';
import 'request.dart';
// Receiver function to handle requests when the service is advertising.
typedef ServiceHandler = Future Function(dynamic request);
/// Wrapper to interact with ROS services.
class Service {
Service({
required this.ros,
required this.name,
required this.type,
});
/// The ROS connection.
Ros ros;
/// Name of the service.
String name;
/// Type of the service.
String type;
/// Advertiser that is listened to for service requests when advertising.
Stream? _advertiser;
/// Checks whether or not the service is currently advertising.
bool get isAdvertised => _advertiser != null;
StreamSubscription? listener;
/// Call the service with a request ([req]).
Future call(dynamic req) {
// The service can't be called if it's currently advertising.
if (isAdvertised) return Future.value(false);
// Set up the response receiver by filtering data from the ROS node by the ID generated.
final callId = ros.requestServiceCaller(name);
final receiver = ros.stream.where((message) => message['id'] == callId).map(
(Map<String, dynamic> message) => message['result'] == null
? Future.error(message['values']!)
: Future.value(message['values']));
// Wait for the receiver to receive a single response and then return.
final completer = Completer();
listener = receiver.listen((d) {
completer.complete(d);
listener!.cancel();
});
// Actually send the request.
ros.send(Request(
op: 'call_service',
id: callId,
service: name,
type: type,
args: req,
));
return completer.future;
}
// Advertise the service and provide a [handler] to deal with requests.
Future<void> advertise(ServiceHandler handler) async {
if (isAdvertised) return;
// Send the advertise request.
ros.send(Request(
op: 'advertise_service',
type: type,
service: name,
));
// Listen for requests, forward them to the handler and then
// send the response back to the ROS node.
_advertiser = ros.stream
.where((message) => message['service'] == name)
.asyncMap((req) => handler(req['args']).then((resp) {
ros.send(Request(
op: 'service_response',
id: req['id'],
service: name,
values: resp ?? {},
result: resp != null,
));
}));
}
// Stop advertising the service.
void unadvertise() {
if (!isAdvertised) return;
ros.send(Request(
op: 'unadvertise_service',
service: name,
));
_advertiser = null;
}
}

173
lib/core/topic.dart Normal file
View File

@ -0,0 +1,173 @@
// Copyright (c) 2019 Conrad Heidebrecht.
import 'dart:async';
import 'ros.dart';
import 'request.dart';
/// Wrapper to interact with ROS topics.
class Topic {
Topic({
required this.ros,
required this.name,
required this.type,
this.compression = 'none',
this.throttleRate = 0,
this.latch = false,
this.queueSize = 100,
this.queueLength = 0,
this.reconnectOnClose = true,
}) : assert(['png', 'cbor', 'none'].contains(compression)),
assert(throttleRate >= 0);
/// The ROS connection.
Ros ros;
/// Stream subscribers to the topic can listen to.
Stream<Map<String, dynamic>>? subscription;
/// Name of the topic.
String name;
/// Message type the topic uses.
String type;
/// Subscription ID provided by [ros].
String? subscribeId;
/// Advertiser ID provided by [ros].
String? advertiseId;
/// Checks whether or not the topic is currently advertising.
bool get isAdvertised => advertiseId != null;
/// Publisher ID provided by [ros].
late String publishId;
/// The type of compression to use, like 'png' or 'cbor'.
///
/// Defaults to 'none'.
String compression;
/// The rate (in ms between messages) at which to throttle the topic.
///
/// Defaults to 0.
int throttleRate;
/// Latch the topic when publishing.
///
/// Defaults to false.
bool latch;
/// The queue created at the bridge side for republishing topics.
///
/// Defaults to 100.
int queueSize;
/// The queue length at the bridge side used when subscribing.
///
/// Defaults to 0 (no queueing).
int queueLength;
/// Flag to enable resubscription and readvertisement on a ROS connection close event.
///
/// Defaults to true.
bool reconnectOnClose;
/// Subscribe to the topic if not already subscribed.
Future<void> subscribe() async {
if (subscribeId == null) {
// Create the listenable broadcast subscription stream.
subscription = ros.stream.where((message) => message['topic'] == name);
subscribeId = ros.requestSubscriber(name);
// Send the subscribe request to ROS.
await safeSend(Request(
op: 'subscribe',
id: subscribeId,
type: type,
topic: name,
compression: compression,
throttleRate: throttleRate,
queueLength: queueLength,
));
}
}
/// Unsubscribe from the topic.
Future<void> unsubscribe() async {
if (subscribeId != null) {
// Send the request and reset the subscription variables.
await safeSend(Request(
op: 'unsubscribe',
id: subscribeId,
topic: name,
));
// await ros.requestUnsubscribe(id);
subscription = null;
subscribeId = null;
}
}
/// Publish a [message] to the topic.
Future<void> publish(dynamic message) async {
// Advertise the topic and then send the publish request.
await advertise();
publishId = ros.requestPublisher(name);
await safeSend(Request(
op: 'publish',
topic: name,
id: publishId,
msg: message,
latch: latch,
));
}
/// Advertise the topic.
Future<void> advertise() async {
if (!isAdvertised) {
// Send the advertisement request.
advertiseId = ros.requestAdvertiser(name);
await safeSend(Request(
op: 'advertise',
id: advertiseId,
type: type,
topic: name,
latch: latch,
queueSize: queueSize,
));
// If the ROS connection closes show that we're not advertising anymore.
watchForClose();
}
}
/// Wait for the connection to close and then reset advertising variables.
Future<void> watchForClose() async {
if (!reconnectOnClose) {
await ros.statusStream.firstWhere((s) => s == Status.closed);
advertiseId = null;
}
}
/// Stop advertising the topic.
Future<void> unadvertise() async {
if (isAdvertised) {
// Send the unadvertise request and reset variables.
await safeSend(Request(
op: 'unadvertise',
id: advertiseId,
topic: name,
));
advertiseId = null;
}
}
/// Safely send a [message] to ROS.
Future<void> safeSend(Request message) async {
// Send the message but if we're not connected and the [reconnectOnClose] flag
// is set, wait for ROS to reconnect and then resend the [message].
ros.send(message);
if (reconnectOnClose && ros.status != Status.connected) {
await ros.statusStream.firstWhere((s) => s == Status.connected);
ros.send(message);
}
}
}

6
lib/roslib.dart Normal file
View File

@ -0,0 +1,6 @@
// Copyright (c) 2019 Conrad Heidebrecht.
library roslib;
export 'core/core.dart';
// export 'actionlib/actionlib.dart';