diff --git a/ros2/cmd/src/modules/State_Machine_Interface.py b/ros2/cmd/src/modules/State_Machine_Interface.py index 603b16af2..31ce0d4ee 100644 --- a/ros2/cmd/src/modules/State_Machine_Interface.py +++ b/ros2/cmd/src/modules/State_Machine_Interface.py @@ -11,7 +11,7 @@ class Interface(Node): LANE_WIDTH = 3.048 # METERS - MIN_SAFE_BRAKING_DIST = 2.2 + MIN_SAFE_BRAKING_DIST = 2.2 # Given that we are moving at 5 mph (2.35 m/s) # We should start braking at least x meters before our end goal, given the safe braking decel Jeanette has specified. @@ -80,45 +80,43 @@ def object_distance_callback(self, msg): ): self.Estop_Action(error="Too Close for Comfort With an Obstacle") - # Lane Following Action: takes no arguments: We follow the lanes, and if an empty_error is thrown, we panic! + # Lane Following Action: takes no arguments: We follow the lanes, and if an empty_error is thrown, we panic! def Lane_Follow_Action(self, args=None): - try: - cmd = Float32MultiArray() + cmd = Float32MultiArray() - if self.lane_follow.empty_error: - self.car_sm.Emergency_Trigger() - self.Run() # ESTOP if we lose the lane lines in our vision + if self.lane_follow.empty_error: + self.car_sm.Emergency_Trigger() + self.Run() # ESTOP if we lose the lane lines in our vision - [steer_cmd, vel_cmd] = self.lane_follow.follow_lane(1 / 200) - # Publish Lane Following command - cmd.data = [ - steer_cmd, - vel_cmd, - ] - self.cmd_publisher.publish(cmd) - except: - self.Estop_Action() + [steer_cmd, vel_cmd] = self.lane_follow.follow_lane(1 / 200) + # Publish Lane Following command + cmd.data = [ + steer_cmd, + vel_cmd, + ] # self.cmd_publisher.publish(cmd) + # self.cmd_publisher.publish(cmd) + # Lane Change Action: args[0] is our relative x coordinate, and args[1] is the relative y coordinate. args[2] is the end yaw. # (yaw relative to current heading) def Lane_Change_Action(self, args=None): - if args == None: - try: - # TODO: add function to calculate relative position for path - relative_x = args[ - 0 - ] # replace this with subscriber data from obj detection - relative_y = args[ - 1 - ] # replace this with subscriber data from obj detection - end_yaw = args[ - 2 # We should never be sending an end yaw of more than zero - ] - self.lane_change.create_path(relative_x, relative_y, end_yaw) - # self.lane_change.follow_path() - except: - self.Estop_Action(error="No Lane Data Provided", args=[True]) + print("In Lane Change") + if args is None: + self.Estop_Action(error="No Lane Data Provided", args=[True]) + else: + # TODO: add function to calculate relative position for path + relative_x = args[ + 0 + ] # replace this with subscriber data from obj detection + relative_y = args[ + 1 + ] # replace this with subscriber data from obj detection + end_yaw = args[ + 2 # We should never be sending an end yaw of more than zero + ] + self.lane_change.create_path(relative_x, relative_y, end_yaw) + self.lane_change.follow_path() # Cstop Action: When args[0], we are computing slope based on distance: we don't exceed 1.32 in this action # Those calculations should be done before_hand, in the state machines. @@ -145,16 +143,17 @@ def Cstop_Action(self, args=None): difference = (initial - current_time).total_seconds() cmd.data = [ 0.0, - max(current_speed - (slope * difference),0), + max(current_speed - (slope * difference), 0), ] # Allows us to keep slope @ set time initial = current_time - self.cmd_publisher.publish(cmd) + # self.cmd_publisher.publish(cmd) # Estop Action: When things break. args[0] is "soft estop", no args or args[0] false is a hard estop. # hard estop is HARD def Estop_Action(self, error="Entered Error State", args=None): print("ESTOP REACHED") slope = 2.0 + # Args[0] is a "soft" estop: We aren't in a physical emergency, but something has gone wrong. if args is not None and args[0]: slope = 1.32 current_speed = self.lane_follow.odom_sub.vel @@ -171,7 +170,7 @@ def Estop_Action(self, error="Entered Error State", args=None): current_speed - (slope * difference), ] # Allows us to keep slope @ set time initial = current_time - self.cmd_publisher.publish(cmd) + # self.cmd_publisher.publish(cmd) print(error) raise State_Machine_Failure(error) @@ -200,11 +199,13 @@ def Unique_Object(self): # This returns true - position_x - position_y - distance if an object in the object list is detected. # If check_in_lane is called, the experimental lane checker is run. - #wrapper to tell you what lane we are currently in. - def current_lane(self): + # wrapper to tell you what lane we are currently in. + def current_lane(self): return self.lane_follow._Left_Lane - def Object_Detection(self, distance_threshold, object_list=[], check_in_lane=False): + def Object_Detection( + self, distance_threshold, object_list=[], check_in_lane=False + ): # if the name is what we expected # is it in our lane (add after) # is it close enough @@ -254,11 +255,11 @@ def Object_Detection(self, distance_threshold, object_list=[], check_in_lane=Fal ) return False, -1, -1, -1 - #Executes the current state + # Executes the current state # This might seem arbitrary, but the idea is that the state machine is what's handling things: We can prevent illegal states # and other failures by using this model. # Eventually, we should pipe this interface directly into the state machine, and have the function tests loop over the state: - # This would theoretically give us more modularity + # This would theoretically give us more modularity def Run(self, args=None): function_dict = { "Lane_Change": self.Lane_Change_Action, @@ -266,7 +267,7 @@ def Run(self, args=None): "Cstop": self.Cstop_Action, "Estop": self.Estop_Action, } - function_dict[self.car_sm.current_state.id](args=args) + function_dict[self.car_SM.current_state.id](args=args) class State_Machine_Failure(Exception): diff --git a/ros2/cmd/src/modules/lane_behaviors/controller/stanley.py b/ros2/cmd/src/modules/lane_behaviors/controller/stanley.py index a8d192484..7f3ae5136 100644 --- a/ros2/cmd/src/modules/lane_behaviors/controller/stanley.py +++ b/ros2/cmd/src/modules/lane_behaviors/controller/stanley.py @@ -13,8 +13,8 @@ def __init__( self, max_dist_to_path=0.1, wheelbase=1.75, - k_stanley=0.5, - max_steer_angle=0.31, + k_stanley=0.8, + max_steer_angle=0.35, logger=None, ): @@ -54,6 +54,8 @@ def follow_path(self, x, y, yaw, v, cx, cy, cyaw): halfwheelbase is distance from com to front axle k_stanley is gain ''' + print(f'Stanley Inputs: \n {x, y, yaw, v }') + if v < 0: # moving backwards! yaw = coterminal_angle(yaw + math.pi) @@ -65,7 +67,7 @@ def follow_path(self, x, y, yaw, v, cx, cy, cyaw): dx = cx[target_idx] - x dy = cy[target_idx] - y path_yaw = cyaw[target_idx] - cross_track_error = dx * np.sin(path_yaw) - dy * np.cos(path_yaw) + cross_track_error = -(dx * np.sin(path_yaw) - dy * np.cos(path_yaw)) # May need to catch exceptions # see if we missed the end of the path? @@ -103,10 +105,8 @@ def get_steering_cmd(self, heading_error, cross_track_error, v): delta = np.clip( steer_angle, -self.max_steer_angle, self.max_steer_angle ) - # curv = math.tan(delta)/self.wheelbase print( - f'heading_error={heading_error} cross_track_error={cross_track_error} delta={delta}' + f'heading_error={heading_error} cross_track_error={cross_track_error} delta={delta} v={v}' ) - return delta diff --git a/ros2/cmd/src/modules/lane_behaviors/lane_change.py b/ros2/cmd/src/modules/lane_behaviors/lane_change.py index 01535cfd9..35c27b321 100644 --- a/ros2/cmd/src/modules/lane_behaviors/lane_change.py +++ b/ros2/cmd/src/modules/lane_behaviors/lane_change.py @@ -20,8 +20,6 @@ from collections import namedtuple - - def calculate_yaw(pathx, pathy, start_yaw): yaw = [start_yaw] for i in range(1, len(pathx)): @@ -128,8 +126,8 @@ def follow_path(self): self.odom_sub.xpos, self.odom_sub.ypos, self.odom_sub.yaw, - # self.odom_sub.vel, - 2.235, # comment this out when you want to use the actual velocity + self.odom_sub.vel, + # 2.235, # comment this out when you want to use the actual velocity self.pathx, self.pathy, self.pathyaw, @@ -137,12 +135,12 @@ def follow_path(self): cmd.data = [ steer_cmd, # self.odom_sub.vel, - 2.235, # Unsure if the velocity command should always be the target + 1.8, # Unsure if the velocity command should always be the target ] # Uncomment when you actually want to drive self.cmd_publisher.publish(cmd) - time.sleep(0.003) - # time.sleep(0.005) # Generate command at 200Hz + # time.sleep(0.001) + time.sleep(0.05) # Generate command at 20Hz print("End of path reached") diff --git a/ros2/cmd/src/modules/lane_change_test.py b/ros2/cmd/src/modules/lane_change_test.py index 763afdbee..58b93bfb7 100644 --- a/ros2/cmd/src/modules/lane_change_test.py +++ b/ros2/cmd/src/modules/lane_change_test.py @@ -1,3 +1,4 @@ +import argparse import rclpy from threading import Thread @@ -6,18 +7,46 @@ def main(args=None): + parser = argparse.ArgumentParser(description='test throttle') + + parser.add_argument( + '-x', + '--relx', + help='Relative X Distance', + metavar='n.n', + type=float, + required=True, + ) + parser.add_argument( + '-y', + '--rely', + help='Relative y Distance', + metavar='n.n', + type=float, + required=True, + ) + + parser.add_argument( + '--yaw', + help='end yaw Position', + default="0", + metavar='n.n', + type=float, + required=False, + ) try: - rclpy.init(args=args) + rclpy.init(args=None) + args = parser.parse_args() max_dist_to_goal = 0.5 max_dist_to_path = 1.5 odom_sub = OdomSubscriber() lane_change = LaneChange(odom_sub, max_dist_to_goal, max_dist_to_path) - relative_x = 10 - relative_y = 10 - end_yaw = 0 + relative_x = args.relx + relative_y = args.rely + end_yaw = args.yaw executor = rclpy.executors.MultiThreadedExecutor() executor.add_node(lane_change) diff --git a/ros2/cmd/src/state_machines/qualifying_tests/qualifying_test_q3.py b/ros2/cmd/src/state_machines/qualifying_tests/qualifying_test_q3.py index ad13411dd..c7745c24a 100644 --- a/ros2/cmd/src/state_machines/qualifying_tests/qualifying_test_q3.py +++ b/ros2/cmd/src/state_machines/qualifying_tests/qualifying_test_q3.py @@ -1,10 +1,11 @@ -#Executor +# Executor from State_Machine_Executor import main from State_Machine_Interface import Interface # Test Q.3 Lane Keeping (Go straight and stop at barrel) + class Function_Test_Q3: # Get data from 5 meters away threshold_distance = 5 # meters @@ -16,7 +17,7 @@ def __init__(self, interface: Interface): def function_test(self): barrel_counter = 0 distance = 0 - self.interface.car_sm.Resume() + self.interface.car_SM.Resume() while barrel_counter < 1: obj_data = self.interface.Object_Detection( self.threshold_distance, object_list=["Barrel"] @@ -26,9 +27,9 @@ def function_test(self): barrel_counter += 1 self.interface.Run() # 0.9144 = 3 feet to meters - self.interface.car_sm.Stop_Trigger() + self.interface.car_SM.Stop_Trigger() self.interface.Run(distance - 0.9144) if __name__ == "__main__": - main("Function_Test_Q3",Function_Test_Q3) \ No newline at end of file + main("Function_Test_Q3", Function_Test_Q3) diff --git a/ros2/cmd/src/state_machines/qualifying_tests/qualifying_test_q5.py b/ros2/cmd/src/state_machines/qualifying_tests/qualifying_test_q5.py index f24901c5c..08c30f95a 100644 --- a/ros2/cmd/src/state_machines/qualifying_tests/qualifying_test_q5.py +++ b/ros2/cmd/src/state_machines/qualifying_tests/qualifying_test_q5.py @@ -1,52 +1,57 @@ -#Test Q.5 Left Turn +# Test Q.5 Left Turn from State_Machine_Executor import main from State_Machine_Interface import Interface -import math -distance_threshold = 1.524 -class Function_Test_Q5(): - def __init__(self, interface): +import math + + +class Function_Test_Q5: + def __init__(self, interface: Interface): self.interface = interface # State transistion logic def function_test(self): + distance_threshold = 5 white_line_detected = False - barrel_detected = False + barrel_detected = False white_line_data = [] barrel_data = [] - - self.interface.car_SM.Resume() - while not white_line_detected: - white_line_data = self.interface.Object_Detection(distance_threshold, object_lists=["White Line"]) - self.interface.Run() - - if(white_line_data[0]): - white_line_detected = True + + self.interface.car_SM.Resume() + while not white_line_detected: + white_line_data = self.interface.Object_Detection( + distance_threshold, object_list=["White Line"] + ) + self.interface.Run() + + if white_line_data[0]: + white_line_detected = True self.interface.car_SM.Stop_Trigger() self.interface.Run(white_line_data[1]) - - self.interface.car_SM.Turn_Trigger() - # y might be negative or positive - self.interface.Run([4.5,7.6,math.pi/2]) + self.interface.car_SM.Turn_Trigger() + # y might be negative or positive + self.interface.Run([4.5, 7.6, math.pi / 2]) + self.interface.car_SM.Return_To_Follow() # Turn - # x: 4,5 m - # y: 7.6 m + # x: 4,5 m + # y: 7.6 m # t: math.pi / 2 - distance_threshold = 5 + # distance_threshold = 5 while not barrel_detected: - - barrel_data = self.interface.Object_Detection(distance_threshold, object_lists=["Barrel"]) - self.interface.Run() + barrel_data = self.interface.Object_Detection( + distance_threshold, object_list=["Barrel"] + ) + self.interface.Run() - if(barrel_data[0]): + if barrel_data[0]: barrel_detected = True self.interface.car_SM.Stop_Trigger() - self.interface.Run((barrel_data[1]+0.4) - 1.524) + self.interface.Run((barrel_data[1] + 0.4) - 1.524) # self.interface.car_SM.Stop_Trigger() # self.interface.Run() -if __name__ == "__main__": - main("Function_Test_Q3",Function_Test_Q5) +if __name__ == "__main__": + main("Function_Test_Q3", Function_Test_Q5) diff --git a/ros2/cmd/src/state_machines/qualifying_tests/qualifying_test_q6.py b/ros2/cmd/src/state_machines/qualifying_tests/qualifying_test_q6.py index 06c63eb2d..6ac105cba 100644 --- a/ros2/cmd/src/state_machines/qualifying_tests/qualifying_test_q6.py +++ b/ros2/cmd/src/state_machines/qualifying_tests/qualifying_test_q6.py @@ -5,43 +5,41 @@ import math -distance_threshold = 5 - class Function_Test_Q6: - def __init__(self, interface): + def __init__(self, interface: Interface): self.interface = interface # State transistion logic def function_test(self): + distance_threshold = 5 white_line_detected = False barrel_detected = False white_line_data = [] - - self.interface.car_sm.Resume() + self.interface.car_SM.Resume() while not white_line_detected: white_line_data = self.interface.Object_Detection( - distance_threshold, object_lists=["White Line"] + distance_threshold, object_list=["White Line"] ) self.interface.Run() if white_line_data[0]: white_line_detected = True - self.interface.car_sm.Stop_Trigger() + self.interface.car_SM.Stop_Trigger() self.interface.Run(white_line_data[1]) - self.interface.car_sm.Turn_Trigger() + self.interface.car_SM.Turn_Trigger() # y might be negative or positive - self.interface.Run([1, 5, -1.5, math.pi / 2]) + self.interface.Run([1.5, -1.5, -math.pi / 2]) + self.interface.car_SM.Return_To_Follow() while not barrel_detected: - barrel_data = self.interface.Object_Detection( - distance_threshold, object_lists=["Barrel"] + distance_threshold, object_list=["Barrel"] ) self.interface.Run() - - if white_line_data[0]: + # 1.542 5ft + if barrel_data[0]: barrel_detected = True self.interface.car_sm.Stop_Trigger() self.interface.Run((barrel_data[1] + 0.4) - 1.524) @@ -49,5 +47,7 @@ def function_test(self): # self.interface.Run() # 1.542 5ft + + if __name__ == "__main__": - main("Function_Test_Q3",Function_Test_Q6) + main("Function_Test_Q3", Function_Test_Q6) diff --git a/ros2/techbus/src/techbus.py b/ros2/techbus/src/techbus.py index a00d9216b..27fa40d5f 100644 --- a/ros2/techbus/src/techbus.py +++ b/ros2/techbus/src/techbus.py @@ -5,8 +5,6 @@ from std_msgs.msg import UInt16MultiArray, Float32MultiArray -from math import atan2 - ##### DEFINES ###### # CAN Messages @@ -37,24 +35,23 @@ def __init__(self): self.encoder_ticks_subscription # prevent unused variable warning # msg.data[0] is the steer_cmd in radians - #msg.data[1] is the vel_cmd in m/s + # msg.data[1] is the vel_cmd in m/s def cmd_callback(self, msg): - angle = 0 steer_cmd = msg.data[0] vel_cmd = msg.data[1] - #bounding for safety reasons: 2.32 m/s is competition maximum + # bounding for safety reasons: 2.32 m/s is competition maximum if vel_cmd > 2.32: vel_cmd = 2.32 - if steer_cmd != 0 and vel_cmd != 0: - angle = -atan2(steer_cmd * 1.8, vel_cmd) - else: - self.get_logger().info( - f"Warning: invalid steering angle combo: steer_cmd:{steer_cmd}, vel_cmd:{vel_cmd}" - ) - angle = 0 + # if steer_cmd != 0 and vel_cmd != 0: + # #angle = -atan2(steer_cmd * 1.8, vel_cmd) + # else: + # self.get_logger().info( + # f"Warning: invalid steering angle combo: steer_cmd:{steer_cmd}, vel_cmd:{vel_cmd}" + # ) + # angle = 0 self.bus.send(VelocityCAN, {'DBW_linearVelocity': vel_cmd}) - self.bus.send(SteerCAN, {'DBW_steeringAngleCmd': angle}) + self.bus.send(SteerCAN, {'DBW_steeringAngle': -steer_cmd}) # Establishes publisher for the /encoder_ticks topic @@ -72,7 +69,7 @@ def __init__(self): ) # 2 publish timer - timer_period = 0.005 # seconds + timer_period = 0.05 # seconds self.timer = self.create_timer(timer_period, self.timer_callback) def timer_callback(self):