API Reference - Search
circular_search(slope_data, method_name, rapid=False, tol=0.01, max_iter=50, shrink_factor=0.5, fs_fail=9999, depth_tol_frac=0.03, diagnostic=False, num_slices=30)
Global 9-point circular search with adaptive grid refinement.
| Returns: |
|
|---|
Source code in xslope/search.py
def circular_search(slope_data, method_name, rapid=False, tol=1e-2, max_iter=50, shrink_factor=0.5,
fs_fail=9999, depth_tol_frac=0.03, diagnostic=False, num_slices=30):
"""
Global 9-point circular search with adaptive grid refinement.
Returns:
list of dict: sorted fs_cache by FS
bool: convergence flag
list of dict: search path
list of dict: circle_cache - all circles tested during search
"""
if not slope_data.get('circles'):
print("\nERROR: Circular search requires at least one circle defined in the input.")
print(" Add circle data to the 'circles' sheet in the input template.")
raise SystemExit(1)
if rapid:
validate_rapid_drawdown(slope_data)
solver = getattr(solve, method_name)
circle_cache = [] # Store ALL circles tested for plotting
start_time = time.time() # Start timing
ground_surface = slope_data['ground_surface']
ground_surface = LineString([(x, y) for x, y in ground_surface.coords])
y_max = max(y for _, y in ground_surface.coords)
y_min = slope_data['max_depth']
delta_y = y_max - y_min
tol = delta_y * depth_tol_frac
circles = slope_data['circles']
max_depth = slope_data['max_depth']
def optimize_depth(x, y, depth_guess, depth_step_init, depth_shrink_factor, tol_frac, fs_fail, circle_cache, diagnostic=False):
depth_step = min(10.0, depth_step_init)
best_depth = max(depth_guess, max_depth)
best_fs = fs_fail
best_result = None
depth_tol = depth_step * tol_frac
iterations = 0
while depth_step > depth_tol:
depths = [
max(best_depth - depth_step, max_depth),
best_depth,
best_depth + depth_step
]
fs_results = []
for d in depths:
test_circle = {'Xo': x, 'Yo': y, 'Depth': d, 'R': y - d}
success, result = generate_slices(slope_data, circle=test_circle, num_slices=num_slices)
if not success:
FS = fs_fail
df_slices = None
failure_surface = None
solver_result = None
else:
df_slices, failure_surface = result
if rapid:
solver_success, solver_result = rapid_drawdown(df_slices, method_name, debug_level=0)
else:
solver_success, solver_result = solver(df_slices)
FS = solver_result['FS'] if solver_success else fs_fail
fs_results.append((FS, d, df_slices, failure_surface, solver_result))
# Add to circle_cache for plotting all tested circles
if FS != fs_fail:
circle_cache.append({
"Xo": x,
"Yo": y,
"Depth": d,
"R": y - d,
"FS": FS,
"failure_surface": failure_surface
})
fs_results.sort(key=lambda t: t[0])
best_fs, best_depth, best_df, best_surface, best_solver_result = fs_results[0]
if all(FS == fs_fail for FS, *_ in fs_results):
if diagnostic:
print(f"[❌ all fail] x={x:.2f}, y={y:.2f}")
return best_depth, fs_fail, None, None, None
if diagnostic:
print(f"[✓ depth-opt] x={x:.2f}, y={y:.2f}, depth={best_depth:.2f}, FS={best_fs:.4f}, step={depth_step:.2f}")
depth_step *= depth_shrink_factor
iterations += 1
if iterations > 50:
if diagnostic:
print(f"[⚠️ warning] depth iterations exceeded at (x={x:.2f}, y={y:.2f})")
break
return best_depth, best_fs, best_df, best_surface, best_solver_result
def evaluate_grid(x0, y0, grid_size, depth_guess, slope_data, diagnostic=False, fs_cache=None, circle_cache=None):
if fs_cache is None:
fs_cache = {}
Xs = [x0 - grid_size, x0, x0 + grid_size]
Ys = [y0 - grid_size, y0, y0 + grid_size]
points = [(x, y) for y in Ys for x in Xs]
for i, (x, y) in enumerate(points):
if (x, y) in fs_cache:
result = fs_cache[(x, y)]
if diagnostic:
print(f"[cache hit] grid pt {i + 1}/9 at (x={x:.2f}, y={y:.2f}) → FS={result['FS']:.4f}")
continue
depth_step_init = grid_size * 0.75
d, FS, df_slices, failure_surface, solver_result = optimize_depth(
x, y, depth_guess, depth_step_init, depth_shrink_factor=0.25, tol_frac=0.01, fs_fail=fs_fail,
circle_cache=circle_cache, diagnostic=diagnostic
)
fs_cache[(x, y)] = {
"Xo": x,
"Yo": y,
"Depth": d,
"FS": FS,
"slices": df_slices,
"failure_surface": failure_surface,
"solver_result": solver_result
}
if diagnostic:
print(f"[grid pt {i + 1}/9] x={x:.2f}, y={y:.2f} → FS={FS:.4f} at d={d:.2f}")
sorted_fs = sorted(fs_cache.items(), key=lambda item: item[1]['FS'])
best_point = sorted_fs[0][1]
best_index = list(fs_cache.keys()).index((best_point['Xo'], best_point['Yo']))
if diagnostic:
print(f"[★ grid best {best_index + 1}/9] FS={best_point['FS']:.4f} at (x={best_point['Xo']:.2f}, y={best_point['Yo']:.2f})")
return fs_cache, best_point
# === Step 1: Evaluate starting circles ===
all_starts = []
fs_cache = {} # Shared cache for all starting circles
for i, start_circle in enumerate(circles):
x0 = start_circle['Xo']
y0 = start_circle['Yo']
r0 = y0 - start_circle['Depth']
if diagnostic:
print(f"\n[⏱ starting circle {i+1}] x={x0:.2f}, y={y0:.2f}, r={r0:.2f}")
grid_size = r0 * 0.15
depth_guess = start_circle['Depth']
fs_cache, best_point = evaluate_grid(x0, y0, grid_size, depth_guess, slope_data, diagnostic=diagnostic, fs_cache=fs_cache, circle_cache=circle_cache)
all_starts.append((start_circle, best_point))
all_starts.sort(key=lambda t: t[1]['FS'])
start_circle, best_start = all_starts[0]
x0 = best_start['Xo']
y0 = best_start['Yo']
depth_guess = best_start['Depth']
grid_size = (y0 - depth_guess) * 0.15
best_fs = best_start['FS']
# Include initial jump from user-defined circle to best point on its grid
search_path = [
{"x": start_circle['Xo'], "y": start_circle['Yo'], "FS": None},
{"x": x0, "y": y0, "FS": best_fs}
]
converged = False
if diagnostic:
print(f"\n[✅ launch grid] Starting refinement from FS={best_fs:.4f} at ({x0:.2f}, {y0:.2f})")
for iteration in range(max_iter):
print(f"[🔁 iteration {iteration+1}] center=({x0:.2f}, {y0:.2f}), FS={best_fs:.4f}, grid={grid_size:.4f}")
fs_cache, best_point = evaluate_grid(x0, y0, grid_size, depth_guess, slope_data, diagnostic=diagnostic, fs_cache=fs_cache, circle_cache=circle_cache)
if best_point['FS'] < best_fs:
best_fs = best_point['FS']
x0 = best_point['Xo']
y0 = best_point['Yo']
depth_guess = best_point['Depth']
search_path.append({"x": x0, "y": y0, "FS": best_fs})
else:
grid_size *= shrink_factor
if grid_size < tol:
converged = True
end_time = time.time()
elapsed = end_time - start_time
print(f"[✅ converged] Iter={iteration+1}, FS={best_fs:.4f} at (x={x0:.2f}, y={y0:.2f}, depth={depth_guess:.2f}), elapsed time={elapsed:.2f} seconds")
break
if not converged and diagnostic:
print(f"\n[❌ max iterations reached] FS={best_fs:.4f} at (x={x0:.2f}, y={y0:.2f})")
sorted_fs_cache = sorted(fs_cache.values(), key=lambda d: d['FS'])
return sorted_fs_cache, converged, search_path, circle_cache
noncircular_search(slope_data, method_name, rapid=False, diagnostic=True, movement_distance=4.0, shrink_factor=0.8, fs_tol=0.001, max_iter=100, move_tol=0.1, num_slices=30)
Non-circular search using the specified solver.
Parameters:
data : dict Input data dictionary containing all necessary parameters method_name : str The method name to use (e.g., 'lowe_karafiath', 'spencer') diagnostic : bool If True, print diagnostic information during search movement_distance : float Initial distance to move points in each iteration shrink_factor : float Factor to reduce movement_distance by when no improvement is found fs_tol : float Factor of safety convergence tolerance max_iter : int Maximum number of iterations move_tol : float Minimum movement distance for convergence (AND logic with fs_tol)
Returns:
tuple : (fs_cache, converged, search_path) fs_cache : dict of all evaluated surfaces and their FS values converged : bool indicating if search converged search_path : list of surfaces evaluated during search
Source code in xslope/search.py
def noncircular_search(slope_data, method_name, rapid=False, diagnostic=True, movement_distance=4.0, shrink_factor=0.8, fs_tol=0.001, max_iter=100, move_tol=0.1, num_slices=30):
"""
Non-circular search using the specified solver.
Parameters:
-----------
data : dict
Input data dictionary containing all necessary parameters
method_name : str
The method name to use (e.g., 'lowe_karafiath', 'spencer')
diagnostic : bool
If True, print diagnostic information during search
movement_distance : float
Initial distance to move points in each iteration
shrink_factor : float
Factor to reduce movement_distance by when no improvement is found
fs_tol : float
Factor of safety convergence tolerance
max_iter : int
Maximum number of iterations
move_tol : float
Minimum movement distance for convergence (AND logic with fs_tol)
Returns:
--------
tuple : (fs_cache, converged, search_path)
fs_cache : dict of all evaluated surfaces and their FS values
converged : bool indicating if search converged
search_path : list of surfaces evaluated during search
"""
if not slope_data.get('non_circ'):
print("\nERROR: Non-circular search requires a non-circular surface defined in the input.")
print(" Add surface point data to the 'circles' sheet (non-circular section) in the input template.")
raise SystemExit(1)
if rapid:
validate_rapid_drawdown(slope_data)
# Get the solver function from solve module
solver = getattr(solve, method_name)
def move_point(points, i, dx, dy, movement_type, ground_surface, max_depth):
"""Move a point while respecting constraints"""
# Get current point
point = points[i]
# Calculate new position
new_x = point[0] + dx
new_y = point[1] + dy
# For endpoints, ensure they stay on ground surface
if i == 0 or i == len(points)-1:
# Create vertical line at new_x
vertical_line = LineString([(new_x, 0), (new_x, 1000)]) # Arbitrary high y value
intersection = ground_surface.intersection(vertical_line)
y = get_y_from_intersection(intersection)
if y is None:
return False
new_y = y
else:
# For middle points, ensure they stay below ground surface but above max_depth
if new_y > ground_surface.interpolate(ground_surface.project(Point(new_x, new_y))).y:
return False
if new_y < max_depth:
return False
# Check x-ordering constraints
if i > 0 and new_x <= points[i-1][0]: # Don't move past left neighbor
return False
if i < len(points)-1 and new_x >= points[i+1][0]: # Don't move past right neighbor
return False
# Update point
points[i] = [new_x, new_y]
return True
def evaluate_surface(points, distance, fs_cache=None):
"""Evaluate factor of safety for current surface configuration"""
if fs_cache is None:
fs_cache = {}
# Create non_circ format from points
non_circ = [{'X': x, 'Y': y, 'Movement': movements[i]} for i, (x, y) in enumerate(points)]
# Generate slices and compute FS
success, result = generate_slices(slope_data, non_circ=non_circ, num_slices=num_slices)
if not success:
return float('inf'), None, None, None, fs_cache
df_slices, failure_surface = result
if rapid:
solver_success, solver_result = rapid_drawdown(df_slices, method_name, debug_level=0)
else:
solver_success, solver_result = solver(df_slices)
FS = solver_result['FS'] if solver_success else float('inf')
# Cache result
key = tuple(map(tuple, points))
fs_cache[key] = {
'points': points.copy(),
'FS': FS,
'slices': df_slices,
'failure_surface': failure_surface,
'solver_result': solver_result
}
return FS, df_slices, failure_surface, solver_result, fs_cache
# Get initial surface from non_circ data
non_circ = slope_data['non_circ']
points = np.array([[p['X'], p['Y']] for p in non_circ])
movements = [p['Movement'] for p in non_circ]
ground_surface = slope_data['ground_surface']
# Initialize cache and search path
fs_cache = {}
search_path = []
# Evaluate initial surface
FS, df_slices, failure_surface, solver_result, fs_cache = evaluate_surface(
points, movement_distance, fs_cache)
# Initialize best surface with initial evaluation
best_points = points.copy()
best_fs = FS
best_df = df_slices
best_surface = failure_surface
best_solver_result = solver_result
# Track convergence
converged = False
start_time = time.time()
prev_fs = best_fs
if diagnostic:
print(f"\n[✅ starting search] Initial FS={best_fs:.4f}\n")
print("Initial failure surface:")
for i, point in enumerate(points):
print(f"Point {i}: ({point[0]:.2f}, {point[1]:.2f})")
print("\nGround surface:")
for i, point in enumerate(ground_surface.coords):
print(f"Point {i}: ({point[0]:.2f}, {point[1]:.2f})")
# Main search loop
for iteration in range(max_iter):
improved = False
if diagnostic:
print(f"\nIteration {iteration + 1}")
print("Current surface points:")
for i, point in enumerate(best_surface.coords):
print(f"Point {i}: ({point[0]:.2f}, {point[1]:.2f})")
# Try moving each point
for i in range(len(points)):
# Try both positive and negative directions
for direction in [-1, 1]:
test_points = points.copy()
# Get movement direction based on point type
if i == 0 or i == len(points)-1: # End points
dx = direction * movement_distance
dy = 0 # y will be determined by ground surface
elif movements[i] == 'Horiz':
dx = direction * movement_distance
dy = 0
elif movements[i] == 'Free':
# For free points, move perpendicular to tangent
dx_tangent = points[i+1][0] - points[i-1][0] if i > 0 and i < len(points)-1 else 1
dy_tangent = points[i+1][1] - points[i-1][1] if i > 0 and i < len(points)-1 else 0
length = np.sqrt(dx_tangent**2 + dy_tangent**2)
if length > 0:
dx = -dy_tangent/length * direction * movement_distance
dy = dx_tangent/length * direction * movement_distance
else:
dx = direction * movement_distance
dy = 0
else: # Fixed
continue
# Try to move the point
if move_point(test_points, i, dx, dy, movements[i], ground_surface, slope_data['max_depth']):
# Evaluate new surface
FS, df_slices, failure_surface, solver_result, fs_cache = evaluate_surface(
test_points, movement_distance, fs_cache)
if FS < best_fs:
best_fs = FS
best_points = test_points.copy()
best_df = df_slices
best_surface = failure_surface
best_solver_result = solver_result
improved = True
if diagnostic:
print(f"[✓ improved] iter={iteration}, point={i}, FS={FS:.4f}")
# print iteration results
print(f"iteration {iteration+1} FS={best_fs:.4f}")
# Check convergence based on FS change and movement distance (AND logic)
fs_change = abs(best_fs - prev_fs)
if fs_change < fs_tol and movement_distance < move_tol:
converged = True
if diagnostic:
print(f"[✓ converged] FS change {fs_change:.6f} < tolerance {fs_tol} and movement_distance {movement_distance:.4f} < move_tol {move_tol}")
break
prev_fs = best_fs
if not improved or fs_change < fs_tol:
movement_distance *= shrink_factor
if True:
print(f"[↘️ shrinking] movement_distance={movement_distance:.4f}")
points = best_points.copy()
end_time = time.time()
elapsed = end_time - start_time
if converged:
print(f"\n[✅ converged] Iter={iteration+1}, FS={best_fs:.4f}, elapsed time={elapsed:.2f} seconds")
else:
print(f"\n[❌ max iterations reached] FS={best_fs:.4f}, elapsed time={elapsed:.2f} seconds")
sorted_fs_cache = sorted(fs_cache.values(), key=lambda d: d['FS'])
return sorted_fs_cache, converged, search_path