Skip to content

Commit

Permalink
Moved some class definitions up to top of the file
Browse files Browse the repository at this point in the history
  • Loading branch information
oddvarlia authored and dafeda committed Dec 19, 2023
1 parent a420f4f commit fde239d
Showing 1 changed file with 159 additions and 159 deletions.
318 changes: 159 additions & 159 deletions semeio/workflows/localisation/local_script_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,165 @@ def from_list(cls, input_list):
return cls([Parameter(key, val) for key, val in result.items()])


@dataclass
class Decay:
obs_pos: list
main_range: float
perp_range: float
azimuth: float
grid: Grid

def __post_init__(self):
angle = (90.0 - self.azimuth) * math.pi / 180.0
self.cosangle = math.cos(angle)
self.sinangle = math.sin(angle)

def get_dx_dy(self, data_index):
try:
# Assume the grid is 3D Grid
x, y, _ = self.grid.get_xyz(active_index=data_index)
except AttributeError:
# Assume the grid is a 2D Surface grid
# pylint: disable=no-member
x, y = self.grid.getXY(data_index)
x_unrotated = x - self.obs_pos[0]
y_unrotated = y - self.obs_pos[1]

dx = (
x_unrotated * self.cosangle + y_unrotated * self.sinangle
) / self.main_range
dy = (
-x_unrotated * self.sinangle + y_unrotated * self.cosangle
) / self.perp_range
return dx, dy

def norm_dist_square(self, data_index):
dx, dy = self.get_dx_dy(data_index)
d2 = dx**2 + dy**2
return d2


@dataclass
class GaussianDecay(Decay):
cutoff: bool

def __call__(self, data_index):
d2 = super().norm_dist_square(data_index)
if self.cutoff and d2 > 1.0:
return 0.0
exp_arg = -3.0 * d2
return math.exp(exp_arg)


@dataclass
class ConstGaussianDecay(Decay):
normalised_tapering_range: float
cutoff: bool

def __call__(self, data_index):
d2 = super().norm_dist_square(data_index)
d = math.sqrt(d2)
if d <= 1.0:
return 1.0
if self.cutoff and d > self.normalised_tapering_range:
return 0.0

distance_from_inner_ellipse = (d - 1) / (self.normalised_tapering_range - 1)
exp_arg = -3 * distance_from_inner_ellipse**2
return math.exp(exp_arg)


@dataclass
class ExponentialDecay(Decay):
cutoff: bool

def __call__(self, data_index):
d2 = super().norm_dist_square(data_index)
d = math.sqrt(d2)
if self.cutoff and d > 1.0:
return 0.0
exp_arg = -3.0 * d
return math.exp(exp_arg)


@dataclass
class ConstExponentialDecay(Decay):
normalised_tapering_range: float
cutoff: bool

def __call__(self, data_index):
d2 = super().norm_dist_square(data_index)
d = math.sqrt(d2)
if d <= 1.0:
return 1.0
if self.cutoff and d > self.normalised_tapering_range:
return 0.0

distance_from_inner_ellipse = (d - 1) / (self.normalised_tapering_range - 1)
exp_arg = -3 * distance_from_inner_ellipse
return math.exp(exp_arg)


@dataclass
class ConstantScalingFactor:
value: float

def __call__(self, data_index):
return self.value


class ScalingValues:
scaling_param_number = 1
corr_name = None

@classmethod
def initialize(cls):
cls.scaling_param_number = 1
cls.corr_name = None

@classmethod
def write_qc_parameter(
cls,
node_name,
corr_name,
field_scale,
grid,
param_for_field,
log_level=LogLevel.OFF,
):
# pylint: disable=too-many-arguments
if param_for_field is None or field_scale is None:
return

scaling_values = np.reshape(
param_for_field, (grid.getNX(), grid.getNY(), grid.getNZ()), "F"
)

# Write scaling parameter once per corr_name
if corr_name != cls.corr_name:
cls.corr_name = corr_name
# Need a parameter name <= 8 character long
scaling_kw_name = "S_" + str(cls.scaling_param_number)
scaling_kw = grid.create_kw(scaling_values, scaling_kw_name, False)
filename = (
cls.corr_name + "_" + node_name + "_" + scaling_kw_name + ".GRDECL"
)
print(
"Write calculated scaling factor with name: "
f"{scaling_kw_name} to file: {filename}"
)
debug_print(
f"Write calculated scaling factor with name: "
f"{scaling_kw_name} to file: {filename}",
LogLevel.LEVEL3,
log_level,
)
with cwrap.open(filename, "w") as file:
grid.write_grdecl(scaling_kw, file)
# Increase parameter number to define unique parameter name
cls.scaling_param_number = cls.scaling_param_number + 1


def get_param_from_ert(ens_config):
new_params = Parameters()
for key in ens_config.parameters:
Expand Down Expand Up @@ -819,162 +978,3 @@ def check_if_ref_point_in_grid(ref_point, grid):
f"or is outside the area defined by the grid {grid.get_name()}\n"
"Check specification of reference point."
) from err


@dataclass
class Decay:
obs_pos: list
main_range: float
perp_range: float
azimuth: float
grid: object

def __post_init__(self):
angle = (90.0 - self.azimuth) * math.pi / 180.0
self.cosangle = math.cos(angle)
self.sinangle = math.sin(angle)

def get_dx_dy(self, data_index):
try:
# Assume the grid is 3D Grid
x, y, _ = self.grid.get_xyz(active_index=data_index)
except AttributeError:
# Assume the grid is a 2D Surface grid
# pylint: disable=no-member
x, y = self.grid.getXY(data_index)
x_unrotated = x - self.obs_pos[0]
y_unrotated = y - self.obs_pos[1]

dx = (
x_unrotated * self.cosangle + y_unrotated * self.sinangle
) / self.main_range
dy = (
-x_unrotated * self.sinangle + y_unrotated * self.cosangle
) / self.perp_range
return dx, dy

def norm_dist_square(self, data_index):
dx, dy = self.get_dx_dy(data_index)
d2 = dx**2 + dy**2
return d2


@dataclass
class GaussianDecay(Decay):
cutoff: bool

def __call__(self, data_index):
d2 = super().norm_dist_square(data_index)
if self.cutoff and d2 > 1.0:
return 0.0
exp_arg = -3.0 * d2
return math.exp(exp_arg)


@dataclass
class ConstGaussianDecay(Decay):
normalised_tapering_range: float
cutoff: bool

def __call__(self, data_index):
d2 = super().norm_dist_square(data_index)
d = math.sqrt(d2)
if d <= 1.0:
return 1.0
if self.cutoff and d > self.normalised_tapering_range:
return 0.0

distance_from_inner_ellipse = (d - 1) / (self.normalised_tapering_range - 1)
exp_arg = -3 * distance_from_inner_ellipse**2
return math.exp(exp_arg)


@dataclass
class ExponentialDecay(Decay):
cutoff: bool

def __call__(self, data_index):
d2 = super().norm_dist_square(data_index)
d = math.sqrt(d2)
if self.cutoff and d > 1.0:
return 0.0
exp_arg = -3.0 * d
return math.exp(exp_arg)


@dataclass
class ConstExponentialDecay(Decay):
normalised_tapering_range: float
cutoff: bool

def __call__(self, data_index):
d2 = super().norm_dist_square(data_index)
d = math.sqrt(d2)
if d <= 1.0:
return 1.0
if self.cutoff and d > self.normalised_tapering_range:
return 0.0

distance_from_inner_ellipse = (d - 1) / (self.normalised_tapering_range - 1)
exp_arg = -3 * distance_from_inner_ellipse
return math.exp(exp_arg)


@dataclass
class ConstantScalingFactor:
value: float

def __call__(self, data_index):
return self.value


class ScalingValues:
scaling_param_number = 1
corr_name = None

@classmethod
def initialize(cls):
cls.scaling_param_number = 1
cls.corr_name = None

@classmethod
def write_qc_parameter(
cls,
node_name,
corr_name,
field_scale,
grid,
param_for_field,
log_level=LogLevel.OFF,
):
# pylint: disable=too-many-arguments
if param_for_field is None or field_scale is None:
return

scaling_values = np.reshape(
param_for_field, (grid.getNX(), grid.getNY(), grid.getNZ()), "F"
)

# Write scaling parameter once per corr_name
if corr_name != cls.corr_name:
cls.corr_name = corr_name
# Need a parameter name <= 8 character long
scaling_kw_name = "S_" + str(cls.scaling_param_number)
scaling_kw = grid.create_kw(scaling_values, scaling_kw_name, False)
filename = (
cls.corr_name + "_" + node_name + "_" + scaling_kw_name + ".GRDECL"
)
print(
"Write calculated scaling factor with name: "
f"{scaling_kw_name} to file: {filename}"
)
debug_print(
f"Write calculated scaling factor with name: "
f"{scaling_kw_name} to file: {filename}",
LogLevel.LEVEL3,
log_level,
)
with cwrap.open(filename, "w") as file:
grid.write_grdecl(scaling_kw, file)
# Increase parameter number to define unique parameter name
cls.scaling_param_number = cls.scaling_param_number + 1

0 comments on commit fde239d

Please sign in to comment.