Skip to content

Commit

Permalink
got pinpoint working, completing inital optimizer
Browse files Browse the repository at this point in the history
  • Loading branch information
bsutherland333 committed Apr 10, 2024
1 parent 41b820a commit 9a0167b
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 87 deletions.
171 changes: 102 additions & 69 deletions rosplane_tuning/src/autotune/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ class OptimizerState(Enum):
This class defines the various states that exist within the optimizer.
"""
UNINITIALIZED = auto()
FINDING_GRADIENT = auto()
FINDING_JACOBEAN = auto()
BRACKETING = auto()
PINPOINTING = auto()

TERMINATED = auto()


Expand All @@ -37,7 +36,11 @@ def __init__(self, initial_point, optimization_params):
- alpha (float): The initial step size. The ideal value for this is very
application-specific.
- tau (float): The convergence tolerance. The optimization is considered converged
when the inifity norm is less than tau.
when the infinity norm is less than tau.
- h (float): Finite difference step size. This is used to calculate the gradient
by stepping slightly away from a point and then calculating the slope of the
line between the two points. Ideally this value will be small, but it is sensitive
to noise and can't get too small.
"""

# Save passed optimization parameters. See above for parameters details.
Expand All @@ -46,19 +49,17 @@ def __init__(self, initial_point, optimization_params):
self.sigma = optimization_params['sigma']
self.alpha = optimization_params['alpha']
self.tau = optimization_params['tau']
self.h = optimization_params['h']

# The current state of the optimization process.
self.state = OptimizerState.UNINITIALIZED

# The jacobian of the function at x_0.
# The jacobean of the function at x_0.
self.J = None

# The current search direction from x_0.
self.p = None

# The step size for calculating gradients with finite-difference.
self.h = 0.01

# The current best point in the optimization, where we are stepping away from.
self.x_0 = initial_point
# The function value (error) at x_0.
Expand All @@ -83,6 +84,16 @@ def __init__(self, initial_point, optimization_params):
# Bool to specify if this is the first bracketing step, used by the bracketing algorithm.
self.first_bracket = None

# TODO: describe these parameters
self.alpha_p = None
self.phi_p = None
self.phi_p_prime = None
self.alpha_low = None
self.alpha_high = None
self.phi_low = None
self.phi_low_prime = None
self.phi_high = None
self.phi_high_prime = None

def optimization_terminated(self):
"""
Expand All @@ -96,7 +107,7 @@ def optimization_terminated(self):
else:
return False

def get_optimiztion_status(self):
def get_optimization_status(self):
"""
This function returns the status of the optimization algorithm.
Expand All @@ -105,7 +116,7 @@ def get_optimiztion_status(self):
"""
if self.state == OptimizerState.UNINITIALIZED:
return "Optimizer awaiting first iteration."
elif self.state == OptimizerState.FINDING_GRADIENT:
elif self.state == OptimizerState.FINDING_JACOBEAN:
return "Finding gradient."
elif self.state == OptimizerState.BRACKETING:
return "Bracketing step size."
Expand All @@ -132,16 +143,16 @@ def get_next_parameter_set(self, error):

if self.state == OptimizerState.UNINITIALIZED:
# Find the gradient at the new point
self.state = OptimizerState.FINDING_GRADIENT
self.state = OptimizerState.FINDING_JACOBEAN
self.phi_0 = error[0]

# Find the values for finite difference, where each dimmension is individually
# Find the values for finite difference, where each dimension is individually
# stepped by h
return np.tile(self.x_0, (self.x_0.shape[0], 1)) \
+ np.identity(self.x_0.shape[0]) * self.h

if self.state == OptimizerState.FINDING_GRADIENT:
# Calculate the jacobian at the current point
if self.state == OptimizerState.FINDING_JACOBEAN:
# Calculate the jacobean at the current point
J_prior = self.J
self.J = (error - self.phi_0) / self.h

Expand Down Expand Up @@ -178,8 +189,8 @@ def get_next_parameter_set(self, error):
return new_points

elif self.state == OptimizerState.PINPOINTING:
self.phi_2 = error[0]
self.phi_2_prime = (error[1] - error[0]) / self.h
self.phi_p = error[0]
self.phi_p_prime = (error[1] - error[0]) / self.h
return self.pinpointing()

else: # Process Terminated
Expand All @@ -194,21 +205,41 @@ def bracketing(self):
# Needs Pinpointing
if(self.phi_2 > self.phi_0 + self.mu_1*self.alpha_2*self.phi_0_prime) \
or (not self.first_bracket and self.phi_2 > self.phi_1):
self.alpha_low = self.alpha_1
self.alpha_high = self.alpha_2
self.phi_low = self.phi_1
self.phi_low_prime = self.phi_1_prime
self.phi_high = self.phi_2
self.phi_high_prime = self.phi_2_prime
self.state = OptimizerState.PINPOINTING
return self.pinpointing()

self.alpha_p = interpolate(self.alpha_1, self.alpha_2, self.phi_1, self.phi_2,
self.phi_1_prime, self.phi_2_prime)
x_p = self.x_0 + self.alpha_p*self.p
return np.array([x_p, (x_p + self.h*self.p)])

# Optimized
if abs(self.phi_2_prime) <= -self.mu_2*self.phi_0_prime:
self.x_0 = self.x_0 + self.alpha_2*self.p
self.phi_0 = self.phi_2
self.state = OptimizerState.FINDING_GRADIENT
self.state = OptimizerState.FINDING_JACOBEAN
return np.tile(self.x_0, (self.x_0.shape[0], 1)) \
+ np.identity(self.x_0.shape[0]) * self.h

# Needs Pinpointing
elif self.phi_2_prime >= 0:
self.alpha_low = self.alpha_2
self.alpha_high = self.alpha_1
self.phi_low = self.phi_2
self.phi_low_prime = self.phi_2_prime
self.phi_high = self.phi_1
self.phi_high_prime = self.phi_1_prime
self.state = OptimizerState.PINPOINTING
return self.pinpointing()

self.alpha_p = interpolate(self.alpha_1, self.alpha_2, self.phi_1, self.phi_2,
self.phi_1_prime, self.phi_2_prime)
x_p = self.x_0 + self.alpha_p*self.p
return np.array([x_p, (x_p + self.h*self.p)])

# Needs more Bracketing
else:
Expand All @@ -222,61 +253,63 @@ def bracketing(self):
x_2 = self.x_0 + self.alpha_2*self.p
return np.array([x_2, (x_2 + self.h*self.p)])

def interpolate(self, alpha1, alpha2):
"""
This function interpolates between two points.
Parameters:
alpha1 (float): The first distance.
alpha2 (float): The second distance.
Returns:
float: The interpolated value or alphap.
"""
return (alpha1 + alpha2)/2

def pinpointing(self):
"""
This function conducts the pinpointing part of the optimization.
Returns:
alphastar (float): The optimal step size
"""

alpha1 = self.distance(self.current_gains, gains1)
alpha2 = self.distance(self.current_gains, gains2)
alphap = self.distance(self.current_gains, gainsp)

if alphap > phi1 + self.mu_1*alphap*phi1_prime or alphap > phi1:
# Continue pinpointing
gains2 = gainsp
gainsp = self.interpolate(gains1, gains2)
phi2 = phip
phi2_prime = phip_prime
self.save_gains = np.array([gains1, None, gains2, None, gainsp,
[gain + self.h for gain in gainsp]])
self.save_phis = np.array([phi1, phi1_prime, phi2, phi2_prime])
new_gains = np.array([self.save_gains[4], self.save_gains[5]])
return new_gains
if (self.phi_p > self.phi_0 + self.mu_1*self.alpha_p*self.phi_0_prime) \
or (self.phi_p > self.phi_low):
# Set interpolated point to high, continue
self.alpha_high = self.alpha_p
self.phi_high = self.phi_p
self.phi_high_prime = self.phi_p_prime
else:
# Optimizedmu_2
if abs(phip_prime) <= -self.u2*phi1_prime:
self.state == OptimizerState.SELECT_DIRECTION
alphastar = alphap
new_gains = np.array([self.current_gains + alphastar*self.p])
return new_gains
# More parameterization needed
elif phip_prime*(alpha2 - alpha1) >= 0:
gains2 = gains1
phi2 = phi1

gains1 = gainsp
phi1 = phip
gainsp = self.interpolate(gains1, gains2)

self.save_gains = np.array([gains1, None, gains2, None, gainsp,
[gain + self.h for gain in gainsp]])
self.save_phis = np.array([phi1, phi1_prime, phi2, phi2_prime])
new_gains = np.array([self.save_gains[4], self.save_gains[5]])
return new_gains
if abs(self.phi_p_prime) <= -self.mu_2*self.phi_0_prime:
# Pinpointing complete, set interpolated point to x_0 and find new jacobian
self.x_0 = self.x_0 + self.alpha_p*self.p
self.phi_0 = self.phi_p
self.state = OptimizerState.FINDING_JACOBEAN
return np.tile(self.x_0, (self.x_0.shape[0], 1)) \
+ np.identity(self.x_0.shape[0]) * self.h

elif self.phi_p_prime*(self.alpha_high - self.alpha_low) >= 0:
# Set high to low (and low to interpolated)
self.alpha_high = self.alpha_low
self.phi_high = self.phi_low
self.phi_high_prime = self.phi_low_prime

# Set low to interpolated
self.alpha_low = self.alpha_p
self.phi_low = self.phi_p
self.phi_low_prime = self.phi_p_prime

# Find value and gradient at interpolated point
self.alpha_p = interpolate(self.alpha_low, self.alpha_high, self.phi_low, self.phi_high,
self.phi_low_prime, self.phi_high_prime)
x_p = self.x_0 + self.alpha_p*self.p
return np.array([x_p, (x_p + self.h*self.p)])


def interpolate(alpha_1, alpha_2, phi_1, phi_2, phi_1_prime, phi_2_prime):
"""
This function performs quadratic interpolation between two points to find the minimum,
given their function values and derivatives.
Parameters:
alpha_1 (float): Function input 1.
alpha_2 (float): Function input 2.
phi_1 (float): Function value at alpha_1.
phi_2 (float): Function value at alpha_2.
phi_1_prime (float): Function derivative at alpha_1.
phi_2_prime (float): Function derivative at alpha_2.
Returns:
float: The function input (alpha_p) at the interpolated minimum.
"""

beta_1 = phi_1_prime + phi_2_prime - 3*(phi_1 - phi_2) / (alpha_1 - alpha_2)
beta_2 = np.sign(alpha_2 - alpha_1) * np.sqrt(beta_1**2 - phi_1_prime*phi_2_prime)
return alpha_2 - (alpha_2 - alpha_1) \
* (phi_2_prime + beta_2 - beta_1) / (phi_2_prime - phi_1_prime + 2*beta_2)

36 changes: 18 additions & 18 deletions rosplane_tuning/src/autotune/optimizer_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,56 +5,53 @@


# Function to test the optimizer with
def function(x):
def Matyas(x):
# Matyas function
print("f", x)
return 0.26 * (x[0] ** 2 + x[1] ** 2) - 0.48 * x[0] * x[1]

def gradient(x):
# Gradient of Matyas function
return np.array([0.52 * x[0] - 0.48 * x[1], 0.52 * x[1] - 0.48 * x[0]])
def Circle(x):
# Circle function
return x[0] ** 2 + x[1] ** 2

function = Circle
function = Matyas

# Initialize optimizer
curr_points = np.array([[0.0, 5.0]]) # Initial point
curr_points = np.array([[0.0, 2.0]]) # Initial point
optimization_params = {'mu_1': 1e-4,
'mu_2': 0.5,
'sigma': 1.5,
'alpha': 1.0,
'tau': 1e-3}
'tau': 1e-2,
'h': 1e-2}
optimizer = Optimizer(curr_points[0], optimization_params)


# Run optimization
all_points = []
k = 0
x_0_points = []
while not optimizer.optimization_terminated():
print("Iteration ", k)
# Print status
print(optimizer.get_optimiztion_status())
print(optimizer.state)
print(optimizer.get_optimization_status())

# Calculate error for current points
error = []
print("CP", curr_points) # Testing
for point in curr_points:
error.append(function(point))
error = np.array(error)
# Pass points to optimizer
# print("CP", curr_points) # Testing
# print("G", gradient(curr_points[0])) # Testing
curr_points = optimizer.get_next_parameter_set(error)
# print("CP", curr_points) # Testing

# Store points
for point in curr_points:
all_points.append(point)
x_0_points.append(optimizer.x_0)

# End interation step
k += 1
print()
all_points = np.array(all_points)
x_0_points = np.array(x_0_points)

print('Optimization terminated with status: {}'.format(optimizer.get_optimiztion_status()))
print('Optimization terminated with status: {}'.format(optimizer.get_optimization_status()))


# Plot the function with the optimization path
Expand All @@ -67,6 +64,9 @@ def gradient(x):
X, Y = np.meshgrid(x, y)
Z = function([X, Y])
plt.contour(X, Y, Z, 50)
plt.scatter(x_0_points[:, 0], x_0_points[:, 1], color='g', marker='x', s=200)
plt.plot(all_points[:, 0], all_points[:, 1], marker='o', color='r')
# Lock the x and y axis to be equal
plt.axis('equal')
plt.show()

0 comments on commit 9a0167b

Please sign in to comment.