我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用sensor_msgs.msg.Joy()。
def __init__(self): """ Create HeadLiftJoy object. """ # params self._head_axes = rospy.get_param('~head_axes', 3) self._lift_axes = rospy.get_param('~lift_axes', 3) self._head_button = rospy.get_param('~head_button', 4) self._lift_button = rospy.get_param('~lift_button', 5) # pubs self._head_pub = rospy.Publisher('head_angle', Float64, queue_size=1) self._lift_pub = rospy.Publisher('lift_height', Float64, queue_size=1) # subs self._joy_sub = rospy.Subscriber('joy', Joy, self._joy_cb, queue_size=1)
def __init__(self): self.joy_pub = rospy.Publisher('sensors/joystick/1', Joy, queue_size=1) self.joy_pub2 = rospy.Publisher('sensors/joystick/2', Joy, queue_size=1) self.rospack = RosPack() self.rate = rospy.Rate(20) count = len(devices.gamepads) if count < 2: rospy.logerr("Sensors: Expecting 2 joysticks but found only {}, exiting".format(count)) sys.exit(-1) gamepads = sorted(devices.gamepads, key=lambda pad: int(str(pad.name)[-1])) rospy.loginfo(gamepads) self.joysticks = [JoystickThread(pad) for pad in gamepads] [joystick.start() for joystick in self.joysticks]
def __init__(self): self.topics = { "ball": {"topic": "environment/ball", "sub": None, "data": CircularState(), "type": CircularState}, "light": {"topic": "environment/light", "sub": None, "data": UInt8(), "type": UInt8}, "sound": {"topic": "environment/sound", "sub": None, "data": Float32(), "type": Float32}, "ergo_state": {"topic": "ergo/state", "sub": None, "data": CircularState(), "type": CircularState}, "joy1": {"topic": "sensors/joystick/1", "sub": None, "data": Joy(), "type": Joy}, "joy2": {"topic": "sensors/joystick/2", "sub": None, "data": Joy(), "type": Joy}, "torso_l_j": {"topic": "{}/joint_state".format("poppy_torso"), "sub": None, "data": JointState(), "type": JointState}, "torso_l_eef": {"topic": "{}/left_arm/end_effector_pose".format("poppy_torso"), "sub": None, "data": PoseStamped(), "type": PoseStamped}, "torso_r_eef": {"topic": "{}/right_arm/end_effector_pose".format("poppy_torso"), "sub": None, "data": PoseStamped(), "type": PoseStamped} } self.topics["ball"]["sub"] = rospy.Subscriber(self.topics["ball"]["topic"], self.topics["ball"]["type"], self.cb_ball) self.topics["light"]["sub"] = rospy.Subscriber(self.topics["light"]["topic"], self.topics["light"]["type"], self.cb_light) self.topics["sound"]["sub"] = rospy.Subscriber(self.topics["sound"]["topic"], self.topics["sound"]["type"], self.cb_sound) self.topics["ergo_state"]["sub"] = rospy.Subscriber(self.topics["ergo_state"]["topic"], self.topics["ergo_state"]["type"], self.cb_ergo) self.topics["joy1"]["sub"] = rospy.Subscriber(self.topics["joy1"]["topic"], self.topics["joy1"]["type"], self.cb_joy1) self.topics["joy2"]["sub"] = rospy.Subscriber(self.topics["joy2"]["topic"], self.topics["joy2"]["type"], self.cb_joy2) self.topics["torso_l_j"]["sub"] = rospy.Subscriber(self.topics["torso_l_j"]["topic"], self.topics["torso_l_j"]["type"], self.cb_torso_l_j) self.topics["torso_l_eef"]["sub"] = rospy.Subscriber(self.topics["torso_l_eef"]["topic"], self.topics["torso_l_eef"]["type"], self.cb_torso_l_eef) self.topics["torso_r_eef"]["sub"] = rospy.Subscriber(self.topics["torso_r_eef"]["topic"], self.topics["torso_r_eef"]["type"], self.cb_torso_r_eef)
def _joy_cb(self, msg): """ The joy/game-pad message callback. :type msg: Joy :param msg: The incoming joy message. """ if msg.buttons[self._head_button] == 1: angle_deg = ((msg.axes[self._head_axes] + 1.0) / 2.0) * SUM_HEAD_ANGLE + MIN_HEAD_ANGLE rospy.logdebug('Send head angle: {} degrees.'.format(angle_deg)) self._head_pub.publish(data=angle_deg) if msg.buttons[self._lift_button] == 1: lift_mm = abs(msg.axes[self._lift_axes]) * SUM_LIFT_HEIGHT + MIN_LIFT_HEIGHT rospy.logdebug('Send lift height: {} mm.'.format(lift_mm)) self._lift_pub.publish(data=abs(msg.axes[self._lift_axes]))
def _on_joy(self, msg): """ callback for messages from joystick input Args: msg(Joy): a joystick input message """ raise NotImplementedError()
def _on_joy(self, msg): """ callback for messages from joystick input Args: msg(Joy): a joystick input message """ self._controls['btnLeft'] = (msg.buttons[2] == 1) self._controls['btnUp'] = (msg.buttons[3] == 1) self._controls['btnDown'] = (msg.buttons[0] == 1) self._controls['btnRight'] = (msg.buttons[1] == 1) self._controls['dPadUp'] = (msg.axes[7] > 0.5) self._controls['dPadDown'] = (msg.axes[7] < -0.5) self._controls['dPadLeft'] = (msg.axes[6] > 0.5) self._controls['dPadRight'] = (msg.axes[6] < -0.5) self._controls['leftStickHorz'] = msg.axes[0] self._controls['leftStickVert'] = msg.axes[1] self._controls['rightStickHorz'] = msg.axes[3] self._controls['rightStickVert'] = msg.axes[4] self._controls['leftBumper'] = (msg.buttons[4] == 1) self._controls['rightBumper'] = (msg.buttons[5] == 1) self._controls['leftTrigger'] = (msg.axes[2] < 0.0) self._controls['rightTrigger'] = (msg.axes[5] < 0.0) self._controls['function1'] = (msg.buttons[6] == 1) self._controls['function2'] = (msg.buttons[10] == 1)
def _on_joy(self, msg): """ callback for messages from joystick input Args: msg(Joy): a joystick input message """ self._controls['btnLeft'] = (msg.buttons[2] == 1) self._controls['btnUp'] = (msg.buttons[3] == 1) self._controls['btnDown'] = (msg.buttons[0] == 1) self._controls['btnRight'] = (msg.buttons[1] == 1) self._controls['dPadUp'] = (msg.axes[7] > 0.5) self._controls['dPadDown'] = (msg.axes[7] < -0.5) self._controls['dPadLeft'] = (msg.axes[6] > 0.5) self._controls['dPadRight'] = (msg.axes[6] < -0.5) self._controls['leftStickHorz'] = msg.axes[0] self._controls['leftStickVert'] = msg.axes[1] self._controls['rightStickHorz'] = msg.axes[3] self._controls['rightStickVert'] = msg.axes[4] self._controls['leftBumper'] = (msg.buttons[4] == 1) self._controls['rightBumper'] = (msg.buttons[5] == 1) self._controls['leftTrigger'] = (msg.axes[2] < 0.0) self._controls['rightTrigger'] = (msg.axes[5] < 0.0) self._controls['function1'] = (msg.buttons[6] == 1) self._controls['function2'] = (msg.buttons[7] == 1)
def _on_joy(self, msg): """ callback for messages from joystick input Args: msg(Joy): a joystick input message """ self._controls['btnLeft'] = (msg.buttons[15] == 1) self._controls['btnUp'] = (msg.buttons[12] == 1) self._controls['btnDown'] = (msg.buttons[14] == 1) self._controls['btnRight'] = (msg.buttons[13] == 1) self._controls['dPadUp'] = (msg.buttons[4] == 1) self._controls['dPadDown'] = (msg.buttons[6] == 1) self._controls['dPadLeft'] = (msg.buttons[7] == 1) self._controls['dPadRight'] = (msg.buttons[5] == 1) self._controls['leftStickHorz'] = msg.axes[0] self._controls['leftStickVert'] = msg.axes[1] self._controls['rightStickHorz'] = msg.axes[2] self._controls['rightStickVert'] = msg.axes[3] self._controls['leftBumper'] = (msg.buttons[10] == 1) self._controls['rightBumper'] = (msg.buttons[11] == 1) self._controls['leftTrigger'] = (msg.buttons[8] == 1) self._controls['rightTrigger'] = (msg.buttons[9] == 1) self._controls['function1'] = (msg.buttons[0] == 1) self._controls['function2'] = (msg.buttons[3] == 1)
def __init__(self): self.lastButtonState = 0 self.buttonWasPressed = False rospy.Subscriber("/joy", Joy, self.joyChanged)
def __init__(self): self.initial_poses = {} self.planning_groups_tips = {} self.tf_listener = tf.TransformListener() self.marker_lock = threading.Lock() self.prev_time = rospy.Time.now() self.counter = 0 self.history = StatusHistory(max_length=10) self.pre_pose = PoseStamped() self.pre_pose.pose.orientation.w = 1 self.current_planning_group_index = 0 self.current_eef_index = 0 self.initialize_poses = False self.initialized = False self.parseSRDF() self.plan_group_pub = rospy.Publisher('/rviz/moveit/select_planning_group', String, queue_size=5) self.updatePlanningGroup(0) self.updatePoseTopic(0, False) self.joy_pose_pub = rospy.Publisher("/joy_pose", PoseStamped, queue_size=1) self.plan_pub = rospy.Publisher("/rviz/moveit/plan", Empty, queue_size=5) self.execute_pub = rospy.Publisher("/rviz/moveit/execute", Empty, queue_size=5) self.update_start_state_pub = rospy.Publisher("/rviz/moveit/update_start_state", Empty, queue_size=5) self.update_goal_state_pub = rospy.Publisher("/rviz/moveit/update_goal_state", Empty, queue_size=5) self.interactive_marker_sub = rospy.Subscriber("/rviz_moveit_motion_planning_display/robot_interaction_interactive_marker_topic/update_full", InteractiveMarkerInit, self.markerCB, queue_size=1) self.sub = rospy.Subscriber("/joy", Joy, self.joyCB, queue_size=1)
def __init__(self, fig, gs, rows, cols, actions_arr): """ Constructs an ActionPanel assuming actions_arr is an array of fully initialized actions. Each action must have: key, name, func. Each action can have: axis_pos, keyboard_binding, ps3_binding. """ assert len(actions_arr) <= rows*cols, 'Too many actions to put into gridspec.' self._fig = fig self._gs = gridspec.GridSpecFromSubplotSpec(rows, cols, subplot_spec=gs) self._axarr = [plt.subplot(self._gs[i]) for i in range(len(actions_arr))] # Read keyboard_bindings and ps3_bindings from config self._actions = {action.key: action for action in actions_arr} for key, action in self._actions.iteritems(): if key in config['keyboard_bindings']: action.kb = config['keyboard_bindings'][key] if key in config['ps3_bindings']: action.pb = config['ps3_bindings'][key] self._buttons = None self._initialize_buttons() self._cid = self._fig.canvas.mpl_connect('key_press_event', self.on_key_press) if ROS_ENABLED: self._ps3_count = 0 rospy.Subscriber(config['ps3_topic'], Joy, self.ps3_callback)
def publish_joy(self, x, y, publisher): joy = Joy() joy.header.stamp = rospy.Time.now() joy.axes.append(y) joy.axes.append(x) publisher.publish(joy)
def __init__(self): self.rospack = RosPack() with open(join(self.rospack.get_path('apex_playground'), 'config', 'ergo.json')) as f: self.params = json.load(f) self.button = Button(self.params) self.rate = rospy.Rate(self.params['publish_rate']) self.button.switch_led(False) # Service callers self.robot_reach_srv_name = '{}/reach'.format(self.params['robot_name']) self.robot_compliant_srv_name = '{}/set_compliant'.format(self.params['robot_name']) rospy.loginfo("Ergo node is waiting for poppy controllers...") rospy.wait_for_service(self.robot_reach_srv_name) rospy.wait_for_service(self.robot_compliant_srv_name) self.reach_proxy = rospy.ServiceProxy(self.robot_reach_srv_name, ReachTarget) self.compliant_proxy = rospy.ServiceProxy(self.robot_compliant_srv_name, SetCompliant) rospy.loginfo("Controllers connected!") self.state_pub = rospy.Publisher('ergo/state', CircularState, queue_size=1) self.button_pub = rospy.Publisher('sensors/buttons/help', Bool, queue_size=1) self.goals = [] self.goal = 0. self.joy_x = 0. self.joy_y = 0. self.motion_started_joy = 0. self.js = JointState() rospy.Subscriber('sensors/joystick/{}'.format(self.params["control_joystick_id"]), Joy, self.cb_joy) rospy.Subscriber('{}/joint_state'.format(self.params['robot_name']), JointState, self.cb_js) rospy.Subscriber('sensors/button_leds/pause', Bool, self.cb_bt_led) self.t = rospy.Time.now() self.srv_reset = None self.extended = False self.standby = False self.last_activity = rospy.Time.now() self.delta_t = rospy.Time.now()
def __init__(self): self.joy_pub = rospy.Publisher('sensors/joystick/1', Joy, queue_size=1) self.joy_pub2 = rospy.Publisher('sensors/joystick/2', Joy, queue_size=1) self.ball_pub = rospy.Publisher('environment/ball', CircularState, queue_size=1) self.light_pub = rospy.Publisher('environment/light', UInt8, queue_size=1) self.sound_pub = rospy.Publisher('environment/sound', Float32, queue_size=1) self.rospack = RosPack() with open(join(self.rospack.get_path('apex_playground'), 'config', 'ergo.json')) as f: self.ergo_params = json.load(f) with open(join(self.rospack.get_path('apex_playground'), 'config', 'environment.json')) as f: self.env_params = json.load(f) self.rate = rospy.Rate(self.ergo_params['publish_rate']) vrep_port = rospy.get_param('vrep/environment_port', 29997) self.simulation_id = vrep.simxStart('127.0.0.1', vrep_port, True, False, 5000, 5) # Object names in V-Rep self.joystick_left_joints = ['Joystick_2_Axis_2', 'Joystick_2_Axis_1'] self.joystick_right_joints = ['Joystick_1_Axis_2', 'Joystick_1_Axis_1'] self.ball_name = 'TennisBall' self.arena_name = 'Arena' if self.ergo_params["control_joystick_id"] != 2: useless_joy = self.joystick_left_joints self.joystick_left_joints = self.joystick_right_joints self.joystick_right_joints = useless_joy self.joints = JointTracker(self.joystick_left_joints + self.joystick_right_joints, self.simulation_id) self.objects = ObjectTracker([self.ball_name, self.arena_name], self.simulation_id) self.conversions = EnvironmentConversions()
def start_subbing(self): stick_sub = rospy.Subscriber("joy", Joy, self.handle_joystick)
def __init__(self, joy_topic): rospy.loginfo("waiting for petrone") rospy.wait_for_message('battery', Float32) rospy.loginfo("found petrone") # fly control publisher self.pub_fly = rospy.Publisher('cmd_fly', Int8, queue_size=1) self.pub_led = rospy.Publisher('led_color', String, queue_size=1) # subscribe to the joystick at the end to make sure that all required # services were found self._buttons = None rospy.Subscriber(joy_topic, Joy, self.cb_joy)
def joy_listener(): # start node rospy.init_node("turtlesim_joy", anonymous=True) # subscribe to joystick messages on topic "joy" rospy.Subscriber("joy", Joy, tj_callback, queue_size=1) # keep node alive until stopped rospy.spin() # called when joy message is received
def __init__(self, swarmcontroller): self._sc = swarmcontroller topic = rospy.get_param("~joy_topic", "joy") rospy.Subscriber(topic, Joy, self._joystickUpdate) self._buttons = None
def __init__(self): self.pub = rospy.Publisher('/mavros/rc/override', OverrideRCIn, queue_size=10) self.sub = rospy.Subscriber('/joy', Joy, self.callback)
def __init__(self, scale=1.0, offset=0.0, deadband=0.1): """ Maps joystick input to robot control. @type scale: float @param scale: scaling applied to joystick values [1.0] @type offset: float @param offset: joystick offset values, post-scaling [0.0] @type deadband: float @param deadband: deadband post scaling and offset [0.1] Raw joystick valuess are in [1.0...-1.0]. """ sub = rospy.Subscriber("/joy", Joy, self._on_joy) self._scale = scale self._offset = offset self._deadband = deadband self._controls = {} self._buttons = {} self._sticks = {} button_names = ( 'btnLeft', 'btnUp', 'btnDown', 'btnRight', 'dPadUp', 'dPadDown', 'dPadLeft', 'dPadRight', 'leftBumper', 'rightBumper', 'leftTrigger', 'rightTrigger', 'function1', 'function2') stick_names = ( 'leftStickHorz', 'leftStickVert', 'rightStickHorz', 'rightStickVert') #doing this with lambda won't work def gen_val_func(name, type_name): def val_func(): return type_name( name in self._controls and self._controls[name]) return val_func for name in button_names: self._buttons[name] = ButtonTransition(gen_val_func(name, bool)) for name in stick_names: self._sticks[name] = StickTransition(gen_val_func(name, float))
def __init__(self): rate = rospy.Rate(5) rospy.Subscriber("/bluetooth_teleop/joy", Joy, self.joy_callback) # Initialize button variables for button input self.x = 0 self.circ = 0 self.sq = self.tri = self.L1 = self.R1 = self.share = self.options = self.p4 = self.L3 = self.R3 = self.DL = self.DR = self.DU = self.DD = 0 # Boolean variable used for tracking the status of object tracking process self.active = False objtrack_process = None rospy.loginfo("In start") while not rospy.is_shutdown(): # If object tracking process is not currenlty active if self.active == False: # Start Object Tracking if circle button pressed if (self.circ == 1): rospy.loginfo("Joystick code received, commencing object tracking protocol...") self.active = True package = 'obj_track' executable = 'track.py' node = roslaunch.core.Node(package, executable) launch = roslaunch.scriptapi.ROSLaunch() launch.start() objtrack_process = launch.launch(node) # If object tracking process is active else: # Stop Object Tracking if x button pressed if (self.x == 1): rospy.loginfo("Joystick code recieved, terminating object tracking protocol") self.active = False objtrack_process.stop() # Reset button variables for next pass self.x = 0 self.circ = 0 # callback function maps button data observed from joystick topic