diff --git a/kwave/kWaveSimulation.py b/kwave/kWaveSimulation.py index f4f7d9c7..7502b555 100644 --- a/kwave/kWaveSimulation.py +++ b/kwave/kWaveSimulation.py @@ -190,7 +190,7 @@ def time_rev(self): """ if self.sensor is not None and not isinstance(self.sensor, NotATransducer): - if not self.options.simulation_type.is_elastic_simulation() and self.sensor.time_reversal_boundary_data is not None: + if not self.options.simulation_type.is_elastic and self.sensor.time_reversal_boundary_data is not None: return True else: return self.userarg_time_rev @@ -477,14 +477,13 @@ def input_checking(self, calling_func_name) -> None: self.check_calling_func_name_and_dim(calling_func_name, k_dim) # run subscript to check optional inputs - self.options = SimulationOptions.option_factory(self.kgrid, self.options) opt = self.options # TODO(Walter): clean this up with getters in simulation options pml size pml_x_size, pml_y_size, pml_z_size = opt.pml_x_size, opt.pml_y_size, opt.pml_z_size pml_size = Vector([pml_x_size, pml_y_size, pml_z_size]) - is_elastic_code = opt.simulation_type.is_elastic_simulation() + is_elastic_code = opt.simulation_type.is_elastic self.print_start_status(is_elastic_code=is_elastic_code) self.set_index_data_type() @@ -512,7 +511,7 @@ def input_checking(self, calling_func_name) -> None: kgrid_N=Vector(self.kgrid.N), pml_size=pml_size, pml_inside=opt.pml_inside, - is_axisymmetric=opt.simulation_type.is_axisymmetric(), + is_axisymmetric=opt.simulation_type.is_axisymmetric, ) @staticmethod @@ -580,7 +579,7 @@ def check_medium(medium, kgrid_k, simulation_type: SimulationType) -> bool: """ # if using the fluid code, allow the density field to be blank if the medium is homogeneous - if (not simulation_type.is_elastic_simulation()) and medium.density is None and medium.sound_speed.size == 1: + if (not simulation_type.is_elastic) and medium.density is None and medium.sound_speed.size == 1: user_medium_density_input = False medium.density = 1 else: @@ -589,7 +588,7 @@ def check_medium(medium, kgrid_k, simulation_type: SimulationType) -> bool: # check medium absorption inputs for the fluid code is_absorbing = any(medium.is_defined("alpha_coeff", "alpha_power")) - is_stokes = simulation_type.is_axisymmetric() or medium.alpha_mode == "stokes" + is_stokes = simulation_type.is_axisymmetric or medium.alpha_mode == "stokes" medium.set_absorbing(is_absorbing, is_stokes) if is_absorbing: @@ -642,7 +641,7 @@ def check_sensor(self, kgrid_dim) -> None: directivity.set_wavenumbers(self.kgrid) # check for time reversal inputs and set flags - if not self.options.simulation_type.is_elastic_simulation() and self.sensor.time_reversal_boundary_data is not None: + if not self.options.simulation_type.is_elastic and self.sensor.time_reversal_boundary_data is not None: self.record.p = False # check for sensor.record and set usage flgs - if no flgs are @@ -657,7 +656,7 @@ def check_sensor(self, kgrid_dim) -> None: assert isinstance(self.sensor.record, list), 'sensor.record must be given as a list, e.g. ["p", "u"]' # check the sensor record flgs - self.record.set_flags_from_list(self.sensor.record, self.options.simulation_type.is_elastic_simulation()) + self.record.set_flags_from_list(self.sensor.record, self.options.simulation_type.is_elastic) # enforce the sensor.mask field unless just recording the max_all # and _final variables @@ -777,7 +776,7 @@ def check_sensor(self, kgrid_dim) -> None: # display, if flgs.time_rev = true or cartesian_interp = # 'nearest' this grid is used as the sensor.mask self.sensor.mask, self.order_index, self.reorder_index = cart2grid( - self.kgrid, self.sensor.mask, self.options.simulation_type.is_axisymmetric() + self.kgrid, self.sensor.mask, self.options.simulation_type.is_axisymmetric ) # if in time reversal mode, reorder the p0 input data in @@ -1029,7 +1028,7 @@ def check_kgrid_time(self) -> None: self.kgrid.makeTime(self.medium.sound_speed, self.KSPACE_CFL) # check kgrid.t_array for stability given medium properties - if not self.options.simulation_type.is_elastic_simulation(): + if not self.options.simulation_type.is_elastic: # calculate the largest timestep for which the model is stable dt_stability_limit = check_stability(self.kgrid, self.medium) @@ -1126,7 +1125,7 @@ def check_input_combinations(self, opt: SimulationOptions, user_medium_density_i "The optional input " "StreamToDisk" " is currently only compatible " "with sensor.record = {" "p" "} (the default)." ) - is_axisymmetric = self.options.simulation_type.is_axisymmetric() + is_axisymmetric = self.options.simulation_type.is_axisymmetric # make sure the PML size is smaller than the grid if PMLInside is true if opt.pml_inside and ( (k_dim == 1 and (pml_size.x * 2 > self.kgrid.Nx)) @@ -1167,10 +1166,6 @@ def check_input_combinations(self, opt: SimulationOptions, user_medium_density_i if not self.source_p0: self.options.smooth_p0 = False - # start log if required - if opt.create_log: - raise NotImplementedError(f"diary({self.LOG_NAME}.txt');") - # update command line status if self.time_rev: logging.log(logging.INFO, " time reversal mode") @@ -1206,7 +1201,7 @@ def smooth_and_enlarge(self, source, k_dim, kgrid_N, opt: SimulationOptions) -> # update command line status logging.log(logging.INFO, " smoothing p0 distribution...") - if self.options.simulation_type.is_axisymmetric(): + if self.options.simulation_type.is_axisymmetric: if self.options.radial_symmetry in ["WSWA-FFT", "WSWA"]: # create a new kWave grid object with expanded radial grid kgrid_exp = kWaveGrid([kgrid_N.x, kgrid_N.y * 4], [self.kgrid.dx, self.kgrid.dy]) @@ -1258,7 +1253,7 @@ def smooth_and_enlarge(self, source, k_dim, kgrid_N, opt: SimulationOptions) -> ), dotdict( { - "axisymmetric": self.options.simulation_type.is_axisymmetric(), + "axisymmetric": self.options.simulation_type.is_axisymmetric, "use_sensor": self.use_sensor, "blank_sensor": self.blank_sensor, "cuboid_corners": self.cuboid_corners, @@ -1280,7 +1275,7 @@ def smooth_and_enlarge(self, source, k_dim, kgrid_N, opt: SimulationOptions) -> self.kgrid, self.index_data_type, self.p_source_pos_index, self.u_source_pos_index, self.s_source_pos_index = expand_results # get maximum prime factors - if self.options.simulation_type.is_axisymmetric(): + if self.options.simulation_type.is_axisymmetric: prime_facs = self.kgrid.highest_prime_factors(self.options.radial_symmetry[:4]) else: prime_facs = self.kgrid.highest_prime_factors() @@ -1367,7 +1362,7 @@ def create_absorption_vars(self) -> None: Returns: None """ - if not self.options.simulation_type.is_elastic_simulation() and not self.options.save_to_disk: + if not self.options.simulation_type.is_elastic and not self.options.save_to_disk: self.absorb_nabla1, self.absorb_nabla2, self.absorb_tau, self.absorb_eta = create_absorption_variables( self.kgrid, self.medium, self.equation_of_state ) diff --git a/kwave/kWaveSimulation_helper/save_to_disk_func.py b/kwave/kWaveSimulation_helper/save_to_disk_func.py index 30f901a9..d0dcae90 100644 --- a/kwave/kWaveSimulation_helper/save_to_disk_func.py +++ b/kwave/kWaveSimulation_helper/save_to_disk_func.py @@ -63,7 +63,7 @@ def save_to_disk_func( # ========================================================================= remove_z_dimension(float_variables, kgrid.dim) - save_file(opt.input_filename, integer_variables, float_variables, opt.hdf_compression_level, auto_chunk=auto_chunk) + save_file(opt.input_filename, integer_variables, float_variables, opt.compression_level, auto_chunk=auto_chunk) # update command line status logging.log(logging.INFO, " completed in ", scale_time(TicToc.toc())) diff --git a/kwave/kWaveSimulation_helper/set_sound_speed_ref.py b/kwave/kWaveSimulation_helper/set_sound_speed_ref.py index a2ab224e..afd3784a 100644 --- a/kwave/kWaveSimulation_helper/set_sound_speed_ref.py +++ b/kwave/kWaveSimulation_helper/set_sound_speed_ref.py @@ -15,7 +15,7 @@ def set_sound_speed_ref(medium: kWaveMedium, simulation_type: SimulationType): Returns: """ - if not simulation_type.is_elastic_simulation(): + if not simulation_type.is_elastic: return get_ordinary_sound_speed_ref(medium) elif simulation_type == SimulationType.ELASTIC: # pragma: no cover return get_pstd_elastic_sound_speed_ref(medium) diff --git a/kwave/kspaceFirstOrder2D.py b/kwave/kspaceFirstOrder2D.py index 536a8e97..68b012fe 100644 --- a/kwave/kspaceFirstOrder2D.py +++ b/kwave/kspaceFirstOrder2D.py @@ -425,8 +425,8 @@ def kspaceFirstOrder2D( "source_syz": k_sim.source_syz, "transducer_source": k_sim.transducer_source, "nonuniform_grid": k_sim.nonuniform_grid, - "elastic_code": k_sim.options.simulation_type.is_elastic_simulation(), - "axisymmetric": k_sim.options.simulation_type.is_axisymmetric(), + "elastic_code": k_sim.options.simulation_type.is_elastic, + "axisymmetric": k_sim.options.simulation_type.is_axisymmetric, "cuboid_corners": k_sim.cuboid_corners, } ), diff --git a/kwave/kspaceFirstOrder3D.py b/kwave/kspaceFirstOrder3D.py index 345426e6..8232ed41 100644 --- a/kwave/kspaceFirstOrder3D.py +++ b/kwave/kspaceFirstOrder3D.py @@ -446,8 +446,8 @@ def kspaceFirstOrder3D( "source_syz": k_sim.source_syz, "transducer_source": k_sim.transducer_source, "nonuniform_grid": k_sim.nonuniform_grid, - "elastic_code": k_sim.options.simulation_type.is_elastic_simulation(), - "axisymmetric": k_sim.options.simulation_type.is_axisymmetric(), + "elastic_code": k_sim.options.simulation_type.is_elastic, + "axisymmetric": k_sim.options.simulation_type.is_axisymmetric, "cuboid_corners": k_sim.cuboid_corners, } ), diff --git a/kwave/kspaceFirstOrderAS.py b/kwave/kspaceFirstOrderAS.py index 70c7d2ff..5d00e78c 100644 --- a/kwave/kspaceFirstOrderAS.py +++ b/kwave/kspaceFirstOrderAS.py @@ -350,8 +350,8 @@ def kspaceFirstOrderAS( "source_syz": k_sim.source_syz, "transducer_source": k_sim.transducer_source, "nonuniform_grid": k_sim.nonuniform_grid, - "elastic_code": k_sim.options.simulation_type.is_elastic_simulation(), - "axisymmetric": k_sim.options.simulation_type.is_axisymmetric(), + "elastic_code": k_sim.options.simulation_type.is_elastic, + "axisymmetric": k_sim.options.simulation_type.is_axisymmetric, "cuboid_corners": k_sim.cuboid_corners, } ), diff --git a/kwave/options/simulation_options.py b/kwave/options/simulation_options.py index 78a49f10..16f2fec9 100644 --- a/kwave/options/simulation_options.py +++ b/kwave/options/simulation_options.py @@ -4,347 +4,199 @@ from dataclasses import dataclass, field from enum import Enum from tempfile import gettempdir -from typing import List, Optional, TYPE_CHECKING +from typing import Literal, Optional, Union +from beartype.typing import List, Tuple +from beartype import beartype import numpy as np -if TYPE_CHECKING: - # Found here: https://adamj.eu/tech/2021/05/13/python-type-hints-how-to-fix-circular-imports/ - from kwave.kgrid import kWaveGrid from kwave.utils.data import get_date_string from kwave.utils.io import get_h5_literals from kwave.utils.pml import get_optimal_pml_size +from kwave.utils.typing import ArrayLike +from kwave.data import Vector -class SimulationType(Enum): - """ - Enum for the simulation type +class SimulationType(str, Enum): + """Simulation type with clearer string representations""" - In the original matlab code, simulation type was determined - by looking at the calling function name and the user args. + FLUID = "fluid" + AXISYMMETRIC = "axisymmetric" + ELASTIC = "elastic" + ELASTIC_WITH_KSPACE = "elastic_with_kspace" - Rules from the original matlab code: - AXISYMMETRIC => if the calling function name started with 'kspaceFirstOrderAS' - or if the userarg_axisymmetric is set to true - ELASTIC => if the calling function name started with 'pstdElastic' or 'kspaceElastic' - ELASTIC_WITH_KSPACE_CORRECTION => if the calling function name started with 'kspaceElastic' - """ + @property + def is_elastic(self) -> bool: + return self in [self.ELASTIC, self.ELASTIC_WITH_KSPACE] - FLUID = 1 - AXISYMMETRIC = 2 - ELASTIC = 3 - ELASTIC_WITH_KSPACE_CORRECTION = 4 + @property + def is_axisymmetric(self) -> bool: + return self == self.AXISYMMETRIC - def is_elastic_simulation(self): - return self in [SimulationType.ELASTIC, SimulationType.ELASTIC_WITH_KSPACE_CORRECTION] - def is_axisymmetric(self): - return self == SimulationType.AXISYMMETRIC +InterpolationMode = Literal["linear", "nearest"] +RadialSymmetry = Literal["WSWA", "WSWS", "WSWA-FFT", "WSWS-FFT"] +DataCastMode = Literal["off", "single", "double"] +@beartype @dataclass -class SimulationOptions(object): - """ - Args: - axisymmetric: Flag that indicates whether axisymmetric simulation is used - cart_interp: Interpolation mode used to extract the pressure when a Cartesian sensor mask is given. - If set to 'nearest' and more than one Cartesian point maps to the same grid point, - duplicated data points are discarded and sensor_data will be returned - with less points than that specified by sensor.mask (default = 'linear'). - pml_inside: put the PML inside the grid defined by the user - pml_alpha: Absorption within the perfectly matched layer in Nepers per grid point (default = 2). - save_to_disk: save the input data to a HDF5 file - save_to_disk_exit: exit the simulation after saving the HDF5 file - scale_source_terms: apply the source scaling term to time varying sources - smooth_c0: smooth the sound speed distribution - smooth_rho0: smooth the density distribution - smooth_p0: smooth the initial pressure distribution - use_kspace: use the k-space correction - use_sg: use a staggered grid - use_fd: Use finite difference gradients instead of spectral (in 1D) - pml_auto: automatically choose the PML size to give small prime factors - create_log: create a log using diary - use_finite_difference: use finite difference gradients instead of spectral (in 1D) - stream_to_disk: String containing a filename (including pathname if required). - If set, after the precomputation phase, the input variables used in the time loop are saved - the specified location in HDF5 format. The simulation then exits. - The saved variables can be used to run simulations using the C++ code. - data_recast: recast the sensor data back to double precision - cartesian_interp: interpolation mode for Cartesian sensor mask - hdf_compression_level: zip compression level for HDF5 input files - data_cast: data cast - pml_search_range: search range used when automatically determining PML size - radial_symmetry: radial symmetry used in axisymmetric code - multi_axial_PML_ratio: MPML settings - pml_x_alpha: PML Alpha for x-axis - pml_y_alpha: PML Alpha for y-axis - pml_z_alpha: PML Alpha for z-axis - pml_x_size: PML Size for x-axis - pml_y_size: PML Size for y-axis - pml_z_size: PML Size for z-axis - """ +class SimulationOptions: + """Configuration options for k-Wave simulations.""" + # Core simulation settings simulation_type: SimulationType = SimulationType.FLUID - cart_interp: str = "linear" - pml_inside: bool = True - pml_alpha: float = 2.0 - save_to_disk: bool = False - save_to_disk_exit: bool = False + dimension: Optional[int] = None + + # Interpolation settings + cart_interp: InterpolationMode = "linear" + radial_symmetry: RadialSymmetry = "WSWA-FFT" + + # Computation flags + use_kspace: bool = True + use_sg: bool = True # staggered grid + use_fd: Optional[int] = None scale_source_terms: bool = True + data_cast: DataCastMode = "off" + data_recast: bool = False + + # Smoothing flags + smooth_p0: bool = True smooth_c0: bool = False smooth_rho0: bool = False - smooth_p0: bool = True - use_kspace: bool = True - use_sg: bool = True - use_fd: Optional[int] = None + + # PML Configuration + pml_inside: bool = True pml_auto: bool = False - create_log: bool = False - use_finite_difference: bool = False + pml_alpha: Union[ArrayLike, Vector] = field(default_factory=lambda: np.array([2.0])) + pml_search_range: Union[ArrayLike, Vector] = field(default_factory=lambda: [10, 40]) + pml_size: Union[ArrayLike, Vector] = field(default_factory=lambda: [20]) + pml_x_size: int = 20 + pml_y_size: int = 20 + pml_z_size: int = 10 + pml_x_alpha: float = field(init=False) + pml_y_alpha: float = field(init=False) + pml_z_alpha: float = field(init=False) + pml_multi_axial_ratio: float = 0.1 + + # File Configuration + data_path: str = field(default_factory=gettempdir) + input_filename: str = field(default_factory=lambda: f"{get_date_string()}_kwave_input.h5") + output_filename: str = field(default_factory=lambda: f"{get_date_string()}_kwave_output.h5") + save_to_disk: bool = False + save_to_disk_exit: bool = False stream_to_disk: bool = False - data_recast: Optional[bool] = False - cartesian_interp: str = "linear" - hdf_compression_level: Optional[int] = None - data_cast: str = "off" - pml_search_range: List[int] = field(default_factory=lambda: [10, 40]) - radial_symmetry: str = "WSWA-FFT" - multi_axial_PML_ratio: float = 0.1 - data_path: Optional[str] = field(default_factory=lambda: gettempdir()) - input_filename: Optional[str] = field(default_factory=lambda: f"{get_date_string()}_kwave_input.h5") - output_filename: Optional[str] = field(default_factory=lambda: f"{get_date_string()}_kwave_output.h5") - pml_x_alpha: Optional[float] = None - pml_y_alpha: Optional[float] = None - pml_z_alpha: Optional[float] = None - pml_size: Optional[List[int]] = None - pml_x_size: Optional[int] = None - pml_y_size: Optional[int] = None - pml_z_size: Optional[int] = None - - def __post_init__(self): - assert self.cartesian_interp in [ - "linear", - "nearest", - ], "Optional input ''cartesian_interp'' must be set to ''linear'' or ''nearest''." - - assert isinstance(self.data_cast, str), "Optional input ''data_cast'' must be a string." - - assert self.data_cast in ["off", "double", "single"], "Invalid input for ''data_cast''." - - if self.data_cast == "double": - self.data_cast = "off" - - # load the HDF5 literals (for the default compression level) - h5_literals = get_h5_literals() - self.hdf_compression_level = h5_literals.HDF_COMPRESSION_LEVEL - # check value is an integer between 0 and 9 - assert ( - isinstance(self.hdf_compression_level, int) and 0 <= self.hdf_compression_level <= 9 - ), "Optional input ''hdf_compression_level'' must be an integer between 0 and 9." - - assert ( - np.isscalar(self.multi_axial_PML_ratio) and self.multi_axial_PML_ratio >= 0 - ), "Optional input ''multi_axial_PML_ratio'' must be a single positive value." - - assert np.isscalar(self.stream_to_disk) or isinstance( - self.stream_to_disk, bool - ), "Optional input ''stream_to_disk'' must be a single scalar or Boolean value." - - boolean_inputs = { - "use_sg": self.use_sg, - "data_recast": self.data_recast, - "save_to_disk_exit": self.save_to_disk_exit, - "use_kspace": self.use_kspace, - "save_to_disk": self.save_to_disk, - "pml_inside": self.pml_inside, - "create_log": self.create_log, - "scale_source_terms": self.scale_source_terms, - } - - for key, val in boolean_inputs.items(): - assert isinstance(val, bool), f"Optional input ''{key}'' must be Boolean." - - assert self.radial_symmetry in [ - "WSWA", - "WSWS", - "WSWA-FFT", - "WSWS-FFT", - ], "Optional input ''RadialSymmetry'' must be set to ''WSWA'', ''WSWS'', ''WSWA-FFT'', ''WSWS-FFT''." - - # automatically assign the PML size to give small prime factors + stream_to_disk_steps: int = 200 + compression_level: int = field(default_factory=lambda: get_h5_literals().HDF_COMPRESSION_LEVEL) + + def __post_init__(self) -> None: + """Validate the simulation options""" + self._validate_pml() + self._validate_files() + self._validate_dimension() + self._validate_fd() + + def _validate_pml(self) -> None: + """Validate PML configuration""" + # Set PML alphas + self.pml_x_alpha = self.pml_alpha + self.pml_y_alpha = self.pml_alpha + self.pml_z_alpha = self.pml_alpha + + if isinstance(self.pml_size, int): + if self.pml_size < 0: + raise ValueError("PML size value must be positive") + else: + if not all(size > 0 for size in self.pml_size): + raise ValueError("All PML size values must be positive") + + # Validate other parameters if self.pml_auto and self.pml_inside: - raise NotImplementedError("''pml_size'' set to ''auto'' is only supported with ''pml_inside'' set to false.") - - if self.pml_size is not None: - # TODO(walter): remove auto option in exchange for pml_auto=True - if isinstance(self.pml_size, int): - self.pml_size = np.array([self.pml_size]) - if not isinstance(self.pml_size, (list, np.ndarray)): - raise ValueError("Optional input ''PMLSize'' must be a integer array of 1, 2 or 3 dimensions.") - - # Check if each member variable is None, and set it to self.pml_alpha if it is - self.pml_x_alpha = self.pml_alpha if self.pml_x_alpha is None else self.pml_x_alpha - self.pml_y_alpha = self.pml_alpha if self.pml_y_alpha is None else self.pml_y_alpha - self.pml_z_alpha = self.pml_alpha if self.pml_z_alpha is None else self.pml_z_alpha - - # add pathname to input and output filenames - self.input_filename = os.path.join(self.data_path, self.input_filename) - self.output_filename = os.path.join(self.data_path, self.output_filename) - - assert self.use_fd is None or ( - np.issubdtype(self.use_fd, np.number) and self.use_fd in [2, 4] - ), "Optional input ''UseFD'' can only be set to 2, 4." - - @staticmethod - def option_factory(kgrid: "kWaveGrid", options: SimulationOptions): - """ - Initialize the Simulation Options - - Args: - kgrid: kWaveGrid instance - elastic_code: Flag that indicates whether elastic simulation is used - **kwargs: Dictionary that holds following optional simulation properties: - - * cart_interp: Interpolation mode used to extract the pressure when a Cartesian sensor mask is given. - If set to 'nearest', duplicated data points are discarded and sensor_data - will be returned with fewer points than specified by sensor.mask (default = 'linear'). - * create_log: Boolean controlling whether the command line output is saved using the diary function - with a date and time stamped filename (default = false). - * data_cast: String input of the data type that variables are cast to before computation. - For example, setting to 'single' will speed up the computation time - (due to the improved efficiency of fftn and ifftn for this data type) at the expense - of a loss in precision. This variable is also useful for utilising GPU parallelisation - through libraries such as the Parallel Computing Toolbox - by setting 'data_cast' to 'gpuArray-single' (default = 'off'). - * data_recast: Boolean controlling whether the output data is cast back to double precision. - If set to false, sensor_data will be returned in - the data format set using the 'data_cast' option. - * hdf_compression_level: Compression level used for writing the input HDF5 file when using - 'save_to_disk' or kspaceFirstOrder3DC. Can be set to an integer - between 0 (no compression, the default) and 9 (maximum compression). - The compression is lossless. Increasing the compression level will reduce - the file size if there are portions of the medium that are homogeneous, - but will also increase the time to create the HDF5 file. - * multi_axial_pml_ratio: MPML settings - * pml_alpha: Absorption within the perfectly matched layer in Nepers per grid point (default = 2). - * pml_inside: Boolean controlling whether the perfectly matched layer is inside or outside the grid. - If set to false, the input grids are enlarged by pml_size - before running the simulation (default = true). - * pml_range: Search range used when automatically determining PML size. Tuple of two elements - * pml_size: Size of the perfectly matched layer in grid points. By default, the PML is added evenly to - all sides of the grid, however, both pml_size and pml_alpha can be given as three element - arrays to specify the x, y, and z properties, respectively. - To remove the PML, set the appropriate pml_alpha to zero rather than forcing - the PML to be of zero size (default = 10). - * radial_symmetry: Radial symmetry used in axisymmetric code - * stream_to_disk: Boolean controlling whether sensor_data is periodically saved to disk to avoid storing - the complete matrix in memory. StreamToDisk may also be given as an integer which - specifies the number of time steps that are taken before the data - is saved to disk (default = 200). - * save_to_disk: String containing a filename (including pathname if required). - If set, after the precomputation phase, the input variables used in the time loop are - saved the specified location in HDF5 format. The simulation then exits. - The saved variables can be used to run simulations using the C++ code. - * save_to_disk_exit: Exit the simulation after saving the HDF5 file - * scale_source_terms: Apply the source scaling term to time varying sources - * use_fd: Use finite difference gradients instead of spectral (in 1D) - * use_k_space: use the k-space correction - * use_sg: Use a staggered grid - - - Returns: - SimulationOptions instance - """ - - STREAM_TO_DISK_STEPS_DEF = 200 # number of steps before streaming to disk - - if options.pml_size is not None and not isinstance(options.pml_size, bool): - if len(options.pml_size) > kgrid.dim: - if kgrid.dim > 1: - raise ValueError(f"Optional input ''pml_size'' must be a 1 or {kgrid.dim} element numerical array.") - else: - raise ValueError("Optional input ''pml_size'' must be a single numerical value.") - + raise ValueError("'pml_size=auto' requires 'pml_inside=False'") + + if not (isinstance(self.pml_multi_axial_ratio, (int, float)) and self.pml_multi_axial_ratio >= 0): + raise ValueError("pml_multi_axial_ratio must be a non-negative number") + + def _validate_files(self) -> None: + """Validate file configuration""" + if not (0 <= self.compression_level <= 9): + raise ValueError("Compression level must be between 0 and 9") + + def _validate_dimension(self) -> None: + """Validate grid dimension""" + if self.dimension is not None and not (1 <= self.dimension <= 3): + raise ValueError("Grid dimension must be 1, 2, or 3") + + def _validate_fd(self) -> None: + """Validate finite difference settings""" + if self.use_fd is not None: + if self.use_fd not in (2, 4): + raise ValueError("Finite difference order must be 2 or 4") + if self.dimension != 1: + raise ValueError("Finite difference is only supported in 1D") + if self.simulation_type.is_elastic: + raise ValueError("Finite difference is not supported for elastic simulations") + + def get_file_paths(self) -> Tuple[str, str]: + """Get full paths for input and output files""" + return (os.path.join(self.data_path, self.input_filename), os.path.join(self.data_path, self.output_filename)) + + def configure_dimensions(self, kgrid: "kWaveGrid") -> None: + """Configure dimension-dependent settings""" + self.dimension = kgrid.dim + + # Configure PML sizes based on dimension + self._configure_pml_size(kgrid) + + # Configure automatic PML if needed + if self.simulation_type.is_axisymmetric or self.pml_auto: + self._configure_automatic_pml(kgrid) + + # Validate dimension-specific constraints + self._validate_dimension_constraints() + + def _configure_pml_size(self, kgrid: "kWaveGrid") -> None: + """Configure PML sizes based on grid dimensions""" + # Convert single int to list if needed + if isinstance(self.pml_size, int): + self.pml_size = [self.pml_size] + + if len(self.pml_size) > kgrid.dim: + raise ValueError(f"PML size must be 1 or {kgrid.dim} elements") + + # Set dimension-specific PML sizes if kgrid.dim == 1: - options.pml_x_size = options.pml_size if options.pml_size else 20 - options.plot_scale = [-1.1, 1.1] + self.pml_x_size = self.pml_size[0] elif kgrid.dim == 2: - if options.pml_size is not None: - if len(options.pml_size) == kgrid.dim: - options.pml_x_size, options.pml_y_size = np.asarray(options.pml_size, dtype=int).ravel() - else: - options.pml_x_size, options.pml_y_size = (options.pml_size[0], options.pml_size[0]) - else: - options.pml_x_size, options.pml_y_size = (20, 20) - options.plot_scale = [-1, 1] - elif kgrid.dim == 3: - if (options.pml_size is not None) and (len(options.pml_size) == kgrid.dim): - options.pml_x_size, options.pml_y_size, options.pml_z_size = np.asarray(options.pml_size).ravel() + if len(self.pml_size) == 2: + self.pml_x_size, self.pml_y_size = self.pml_size else: - if options.pml_size is None: - options.pml_x_size = 10 - options.pml_y_size = 10 - options.pml_z_size = 10 - else: - options.pml_x_size = options.pml_size[0] - options.pml_y_size = options.pml_x_size - options.pml_z_size = options.pml_x_size - options.plot_scale = [-1, 1] - - # replace defaults with user defined values if provided and check inputs - if (val := options.pml_alpha) is not None and not isinstance(options.pml_alpha, str): - # check input is correct size - val = np.atleast_1d(val) - if val.size > kgrid.dim: - if kgrid.dim > 1: - raise ValueError(f"Optional input ''pml_alpha'' must be a 1 or {kgrid.dim} element numerical array.") - else: - raise ValueError("Optional input ''pml_alpha'' must be a single numerical value.") - - # assign input based on number of dimensions - if kgrid.dim == 1: - options.pml_x_alpha = val - elif kgrid.dim == 2: - options.pml_x_alpha = val[0] - options.pml_y_alpha = val[-1] - elif kgrid.dim == 2: - options.pml_x_alpha = val[0] - options.pml_y_alpha = val[len(val) // 2] - options.pml_z_alpha = val[-1] - - if options.save_to_disk_exit: - assert kgrid.dim != 1, "Optional input ''save_to_disk'' is not compatible with 1D simulations." - - if options.stream_to_disk: - assert ( - not options.simulation_type.is_elastic_simulation() and kgrid.dim == 3 - ), "Optional input ''stream_to_disk'' is currently only compatible with 3D fluid simulations." - # if given as a Boolean, replace with the default number of time steps - if isinstance(options.stream_to_disk, bool) and options.stream_to_disk: - options.stream_to_disk = STREAM_TO_DISK_STEPS_DEF - - if options.save_to_disk or options.save_to_disk_exit: - assert kgrid.dim != 1, "Optional input ''save_to_disk'' is not compatible with 1D simulations." - - if options.use_fd: - # input only supported in 1D fluid code - assert kgrid.dim == 1 and not options.simulation_type.is_elastic_simulation(), "Optional input ''use_fd'' only supported in 1D." - # get optimal pml size - if options.simulation_type.is_axisymmetric() or options.pml_auto: - if options.simulation_type.is_axisymmetric(): - pml_size_temp = get_optimal_pml_size(kgrid, options.pml_search_range, options.radial_symmetry[:4]) + self.pml_x_size = self.pml_y_size = self.pml_size[0] + else: # 3D + if len(self.pml_size) == 3: + self.pml_x_size, self.pml_y_size, self.pml_z_size = self.pml_size else: - pml_size_temp = get_optimal_pml_size(kgrid, options.pml_search_range) - - # assign to individual variables - if kgrid.dim == 1: - options.pml_x_size = int(pml_size_temp[0]) - elif kgrid.dim == 2: - options.pml_x_size = int(pml_size_temp[0]) - options.pml_y_size = int(pml_size_temp[1]) - elif kgrid.dim == 3: - options.pml_x_size = int(pml_size_temp[0]) - options.pml_y_size = int(pml_size_temp[1]) - options.pml_z_size = int(pml_size_temp[2]) - - # cleanup unused variables - del pml_size_temp - return options + self.pml_x_size = self.pml_y_size = self.pml_z_size = self.pml_size[0] + + def _configure_automatic_pml(self, kgrid: "kWaveGrid") -> None: + """Configure automatic PML sizes""" + pml_size = get_optimal_pml_size( + kgrid, self.pml_search_range, self.radial_symmetry[:4] if self.simulation_type.is_axisymmetric else None + ) + + if self.dimension == 1: + self.pml_x_size = int(pml_size[0]) + elif self.dimension == 2: + self.pml_x_size, self.pml_y_size = map(int, pml_size) + else: # 3D + self.pml_x_size, self.pml_y_size, self.pml_z_size = map(int, pml_size) + + def _validate_dimension_constraints(self) -> None: + """Validate dimension-specific constraints""" + if self.dimension == 1: + if self.save_to_disk or self.save_to_disk_exit: + raise ValueError("save_to_disk is not compatible with 1D simulations") + + if self.stream_to_disk: + if self.simulation_type.is_elastic or self.dimension != 3: + raise ValueError("stream_to_disk is only compatible with 3D fluid simulations")