Skip to content

Commit

Permalink
Merge pull request #206 from CooperUnion/user/Vaibhav-Hariani/tested_…
Browse files Browse the repository at this point in the history
…lane_change

Tested Lane Change code: To be merged into subteam/algorithms
  • Loading branch information
Vaibhav-Hariani authored Jun 2, 2024
2 parents 104f0d7 + be2d092 commit a26bf00
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 118 deletions.
83 changes: 42 additions & 41 deletions ros2/cmd/src/modules/State_Machine_Interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

class Interface(Node):
LANE_WIDTH = 3.048 # METERS
MIN_SAFE_BRAKING_DIST = 2.2
MIN_SAFE_BRAKING_DIST = 2.2
# Given that we are moving at 5 mph (2.35 m/s)
# We should start braking at least x meters before our end goal, given the safe braking decel Jeanette has specified.

Expand Down Expand Up @@ -80,45 +80,43 @@ def object_distance_callback(self, msg):
):
self.Estop_Action(error="Too Close for Comfort With an Obstacle")

# Lane Following Action: takes no arguments: We follow the lanes, and if an empty_error is thrown, we panic!
# Lane Following Action: takes no arguments: We follow the lanes, and if an empty_error is thrown, we panic!
def Lane_Follow_Action(self, args=None):
try:
cmd = Float32MultiArray()
cmd = Float32MultiArray()

if self.lane_follow.empty_error:
self.car_sm.Emergency_Trigger()
self.Run() # ESTOP if we lose the lane lines in our vision
if self.lane_follow.empty_error:
self.car_sm.Emergency_Trigger()
self.Run() # ESTOP if we lose the lane lines in our vision

[steer_cmd, vel_cmd] = self.lane_follow.follow_lane(1 / 200)
# Publish Lane Following command
cmd.data = [
steer_cmd,
vel_cmd,
]
self.cmd_publisher.publish(cmd)
except:
self.Estop_Action()
[steer_cmd, vel_cmd] = self.lane_follow.follow_lane(1 / 200)
# Publish Lane Following command
cmd.data = [
steer_cmd,
vel_cmd,
]
# self.cmd_publisher.publish(cmd)

# self.cmd_publisher.publish(cmd)

# Lane Change Action: args[0] is our relative x coordinate, and args[1] is the relative y coordinate. args[2] is the end yaw.
# (yaw relative to current heading)
def Lane_Change_Action(self, args=None):
if args == None:
try:
# TODO: add function to calculate relative position for path
relative_x = args[
0
] # replace this with subscriber data from obj detection
relative_y = args[
1
] # replace this with subscriber data from obj detection
end_yaw = args[
2 # We should never be sending an end yaw of more than zero
]
self.lane_change.create_path(relative_x, relative_y, end_yaw)
# self.lane_change.follow_path()
except:
self.Estop_Action(error="No Lane Data Provided", args=[True])
print("In Lane Change")
if args is None:
self.Estop_Action(error="No Lane Data Provided", args=[True])
else:
# TODO: add function to calculate relative position for path
relative_x = args[
0
] # replace this with subscriber data from obj detection
relative_y = args[
1
] # replace this with subscriber data from obj detection
end_yaw = args[
2 # We should never be sending an end yaw of more than zero
]
self.lane_change.create_path(relative_x, relative_y, end_yaw)
self.lane_change.follow_path()

# Cstop Action: When args[0], we are computing slope based on distance: we don't exceed 1.32 in this action
# Those calculations should be done before_hand, in the state machines.
Expand All @@ -145,16 +143,17 @@ def Cstop_Action(self, args=None):
difference = (initial - current_time).total_seconds()
cmd.data = [
0.0,
max(current_speed - (slope * difference),0),
max(current_speed - (slope * difference), 0),
] # Allows us to keep slope @ set time
initial = current_time
self.cmd_publisher.publish(cmd)
# self.cmd_publisher.publish(cmd)

# Estop Action: When things break. args[0] is "soft estop", no args or args[0] false is a hard estop.
# hard estop is HARD
def Estop_Action(self, error="Entered Error State", args=None):
print("ESTOP REACHED")
slope = 2.0
# Args[0] is a "soft" estop: We aren't in a physical emergency, but something has gone wrong.
if args is not None and args[0]:
slope = 1.32
current_speed = self.lane_follow.odom_sub.vel
Expand All @@ -171,7 +170,7 @@ def Estop_Action(self, error="Entered Error State", args=None):
current_speed - (slope * difference),
] # Allows us to keep slope @ set time
initial = current_time
self.cmd_publisher.publish(cmd)
# self.cmd_publisher.publish(cmd)
print(error)
raise State_Machine_Failure(error)

Expand Down Expand Up @@ -200,11 +199,13 @@ def Unique_Object(self):
# This returns true - position_x - position_y - distance if an object in the object list is detected.
# If check_in_lane is called, the experimental lane checker is run.

#wrapper to tell you what lane we are currently in.
def current_lane(self):
# wrapper to tell you what lane we are currently in.
def current_lane(self):
return self.lane_follow._Left_Lane

def Object_Detection(self, distance_threshold, object_list=[], check_in_lane=False):
def Object_Detection(
self, distance_threshold, object_list=[], check_in_lane=False
):
# if the name is what we expected
# is it in our lane (add after)
# is it close enough
Expand Down Expand Up @@ -254,19 +255,19 @@ def Object_Detection(self, distance_threshold, object_list=[], check_in_lane=Fal
)
return False, -1, -1, -1

#Executes the current state
# Executes the current state
# This might seem arbitrary, but the idea is that the state machine is what's handling things: We can prevent illegal states
# and other failures by using this model.
# Eventually, we should pipe this interface directly into the state machine, and have the function tests loop over the state:
# This would theoretically give us more modularity
# This would theoretically give us more modularity
def Run(self, args=None):
function_dict = {
"Lane_Change": self.Lane_Change_Action,
"Lane_Following": self.Lane_Follow_Action,
"Cstop": self.Cstop_Action,
"Estop": self.Estop_Action,
}
function_dict[self.car_sm.current_state.id](args=args)
function_dict[self.car_SM.current_state.id](args=args)


class State_Machine_Failure(Exception):
Expand Down
12 changes: 6 additions & 6 deletions ros2/cmd/src/modules/lane_behaviors/controller/stanley.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ def __init__(
self,
max_dist_to_path=0.1,
wheelbase=1.75,
k_stanley=0.5,
max_steer_angle=0.31,
k_stanley=0.8,
max_steer_angle=0.35,
logger=None,
):

Expand Down Expand Up @@ -54,6 +54,8 @@ def follow_path(self, x, y, yaw, v, cx, cy, cyaw):
halfwheelbase is distance from com to front axle
k_stanley is gain
'''
print(f'Stanley Inputs: \n {x, y, yaw, v }')

if v < 0: # moving backwards!
yaw = coterminal_angle(yaw + math.pi)

Expand All @@ -65,7 +67,7 @@ def follow_path(self, x, y, yaw, v, cx, cy, cyaw):
dx = cx[target_idx] - x
dy = cy[target_idx] - y
path_yaw = cyaw[target_idx]
cross_track_error = dx * np.sin(path_yaw) - dy * np.cos(path_yaw)
cross_track_error = -(dx * np.sin(path_yaw) - dy * np.cos(path_yaw))

# May need to catch exceptions
# see if we missed the end of the path?
Expand Down Expand Up @@ -103,10 +105,8 @@ def get_steering_cmd(self, heading_error, cross_track_error, v):
delta = np.clip(
steer_angle, -self.max_steer_angle, self.max_steer_angle
)
# curv = math.tan(delta)/self.wheelbase

print(
f'heading_error={heading_error} cross_track_error={cross_track_error} delta={delta}'
f'heading_error={heading_error} cross_track_error={cross_track_error} delta={delta} v={v}'
)

return delta
12 changes: 5 additions & 7 deletions ros2/cmd/src/modules/lane_behaviors/lane_change.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
from collections import namedtuple




def calculate_yaw(pathx, pathy, start_yaw):
yaw = [start_yaw]
for i in range(1, len(pathx)):
Expand Down Expand Up @@ -128,21 +126,21 @@ def follow_path(self):
self.odom_sub.xpos,
self.odom_sub.ypos,
self.odom_sub.yaw,
# self.odom_sub.vel,
2.235, # comment this out when you want to use the actual velocity
self.odom_sub.vel,
# 2.235, # comment this out when you want to use the actual velocity
self.pathx,
self.pathy,
self.pathyaw,
)
cmd.data = [
steer_cmd,
# self.odom_sub.vel,
2.235, # Unsure if the velocity command should always be the target
1.8, # Unsure if the velocity command should always be the target
]
# Uncomment when you actually want to drive
self.cmd_publisher.publish(cmd)
time.sleep(0.003)
# time.sleep(0.005) # Generate command at 200Hz
# time.sleep(0.001)
time.sleep(0.05) # Generate command at 20Hz

print("End of path reached")

Expand Down
37 changes: 33 additions & 4 deletions ros2/cmd/src/modules/lane_change_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import argparse
import rclpy
from threading import Thread

Expand All @@ -6,18 +7,46 @@


def main(args=None):
parser = argparse.ArgumentParser(description='test throttle')

parser.add_argument(
'-x',
'--relx',
help='Relative X Distance',
metavar='n.n',
type=float,
required=True,
)
parser.add_argument(
'-y',
'--rely',
help='Relative y Distance',
metavar='n.n',
type=float,
required=True,
)

parser.add_argument(
'--yaw',
help='end yaw Position',
default="0",
metavar='n.n',
type=float,
required=False,
)
try:
rclpy.init(args=args)
rclpy.init(args=None)

args = parser.parse_args()
max_dist_to_goal = 0.5
max_dist_to_path = 1.5

odom_sub = OdomSubscriber()
lane_change = LaneChange(odom_sub, max_dist_to_goal, max_dist_to_path)

relative_x = 10
relative_y = 10
end_yaw = 0
relative_x = args.relx
relative_y = args.rely
end_yaw = args.yaw

executor = rclpy.executors.MultiThreadedExecutor()
executor.add_node(lane_change)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#Executor
# Executor
from State_Machine_Executor import main
from State_Machine_Interface import Interface


# Test Q.3 Lane Keeping (Go straight and stop at barrel)


class Function_Test_Q3:
# Get data from 5 meters away
threshold_distance = 5 # meters
Expand All @@ -16,7 +17,7 @@ def __init__(self, interface: Interface):
def function_test(self):
barrel_counter = 0
distance = 0
self.interface.car_sm.Resume()
self.interface.car_SM.Resume()
while barrel_counter < 1:
obj_data = self.interface.Object_Detection(
self.threshold_distance, object_list=["Barrel"]
Expand All @@ -26,9 +27,9 @@ def function_test(self):
barrel_counter += 1
self.interface.Run()
# 0.9144 = 3 feet to meters
self.interface.car_sm.Stop_Trigger()
self.interface.car_SM.Stop_Trigger()
self.interface.Run(distance - 0.9144)


if __name__ == "__main__":
main("Function_Test_Q3",Function_Test_Q3)
main("Function_Test_Q3", Function_Test_Q3)
Loading

0 comments on commit a26bf00

Please sign in to comment.