Skip to content

Commit

Permalink
started cleaning up - removed plotting and testing code
Browse files Browse the repository at this point in the history
  • Loading branch information
tommyod committed Nov 21, 2024
1 parent 48776a4 commit b24fd33
Showing 1 changed file with 74 additions and 131 deletions.
205 changes: 74 additions & 131 deletions src/fmu/tools/sensitivities/iman_conover.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,40 @@
- https://blogs.sas.com/content/iml/2021/06/16/geometry-iman-conover-transformation.html
"""
import io
import matplotlib.pyplot as plt
Using Iman-Conover with Latin Hybercube sampling
------------------------------------------------
Sample on the unit hypercube using LatinHypercube
>>> import scipy as sp
>>> sampler = sp.stats.qmc.LatinHypercube(d=2, seed=42, scramble=True)
>>> samples = sampler.random(n=100)
Map to distributions
>>> X = np.vstack((sp.stats.triang(0.5).ppf(samples[:, 0]),
... sp.stats.gamma.ppf(samples[:, 1], a=1))).T
Induce correlations
>>> sp.stats.pearsonr(*X.T).statistic
0.0658...
>>> correlation_matrix = np.array([[1, 0.3], [0.3, 1]])
>>> transform = ImanConover(correlation_matrix)
>>> X_transformed = transform(X)
>>> sp.stats.pearsonr(*X_transformed.T).statistic
0.2796...
"""

import numpy as np
import pandas as pd
import pytest
import requests
import scipy as sp


Expand All @@ -37,12 +62,12 @@ def __init__(self, correlation_matrix):
Create a desired correction of 0.7 and a data set X with no correlation.
>>> correlation_matrix = np.array([[1, 0.7], [0.7, 1]])
>>> transform = ImanConover(correlation_matrix)
>>> X = np.array([[0, 0],
>>> X = np.array([[0, 0 ],
... [0, 0.5],
... [0, 1],
... [1, 0],
... [1, 1],
... [1, 0.5]])
... [0, 1 ],
... [1, 0 ],
... [1, 0.5],
... [1, 1 ]])
>>> X_transformed = transform(X)
>>> X_transformed
array([[0. , 0. ],
Expand All @@ -64,6 +89,24 @@ def __init__(self, correlation_matrix):
input matrix above, there is no permutation of the columns that yields
the exact desired correlation of 0.7. Iman-Conover is a heuristic that
tries to get as close as possible.
With many samples, we get good results if the data are normal:
>>> rng = np.random.default_rng(42)
>>> X = rng.normal(size=(1000, 2))
>>> X_transformed = transform(X)
>>> sp.stats.pearsonr(*X_transformed.T).statistic
0.6977...
But if the data are far from normal (here:lognormal), the results are
not as good. This is because correlation is induced in a normal space
before the result is mapped back to the original marginal distributions.
>>> rng = np.random.default_rng(42)
>>> X = rng.lognormal(size=(1000, 2))
>>> X_transformed = transform(X)
>>> sp.stats.pearsonr(*X_transformed.T).statistic
0.5925...
"""
if not isinstance(correlation_matrix, np.ndarray):
raise TypeError("Input argument `correlation_matrix` must be NumPy array.")
Expand All @@ -89,7 +132,8 @@ def __call__(self, X):
----------
X : ndarray
Input matrix of shape (N, K). This is the data set that we want to
induce correlation structure on.
induce correlation structure on. X must have at least K + 1
independent rows, because corr(X) cannot be singular.
Returns
-------
Expand All @@ -110,10 +154,14 @@ def __call__(self, X):
msg += f"correlation matrix ({self.P.shape})"
raise ValueError(msg)

if N <= K:
msg = f"The matrix X must have rows > columns. Got shape: {X.shape}"
raise ValueError(msg)

# STEP ONE - Use van der Waerden scores to transform data to
# approximately multivariate normal (but with correlations).
# The new data has the same rank correlation as the original data.
ranks = sp.stats.rankdata(X, method="average", axis=0) / (N + 1)
ranks = sp.stats.rankdata(X, axis=0) / (N + 1)
normal_scores = sp.stats.norm.ppf(ranks)

# TODO: Remove these sanity checks eventually
Expand All @@ -123,13 +171,12 @@ def __call__(self, X):
assert np.allclose(spearman_before, spearman_after), msg

# STEP TWO - Remove correlations from the transformed data
I = np.corrcoef(normal_scores, rowvar=False)
Q = np.linalg.cholesky(I) # QQ' = I
# Equivalent to: decorrelated_scores = normal_scores @ np.linalg.inv(Q).T
empirical_correlation = np.corrcoef(normal_scores, rowvar=False)
decorrelation_matrix = np.linalg.cholesky(empirical_correlation)
# We exploit the fact that Q is lower-triangular and avoid the inverse.
# X = N @ inv(Q)^T => X @ Q^T = N => (Q @ X^T)^T = N
decorrelated_scores = sp.linalg.solve_triangular(
Q, normal_scores.T, lower=True
decorrelation_matrix, normal_scores.T, lower=True
).T

# TODO: Remove these sanity checks eventually
Expand All @@ -138,10 +185,6 @@ def __call__(self, X):
# STEP THREE - Induce correlations in transformed space
correlated_scores = decorrelated_scores @ self.P.T

# TODO: Remove these sanity checks eventually (why does this fail??)
# correlations = np.corrcoef(correlated_scores, rowvar=False)
# assert np.allclose(correlations, self.C)

# STEP FOUR - Map back to original space using ranks, ensuring
# that marginal distributions are preserved
result = np.empty_like(X)
Expand All @@ -157,143 +200,43 @@ def __call__(self, X):
class TestImanConover:
@pytest.mark.parametrize("seed", range(100))
def test_marginals_and_correlation_distance(self, seed):
rng = np.random.default_rng(42)
rng = np.random.default_rng(seed)

n_variables = rng.integers(2, 10)
n_observations = 10 * n_variables
n_observations = n_variables + 1
rng = np.random.default_rng(42)

# Create a random correlation matrix and a random data matrix
A = rng.normal(size=(n_variables * 2, n_variables))
corr = np.corrcoef(A, rowvar=False)
desired_corr = np.corrcoef(A, rowvar=False)
X = rng.normal(size=(n_observations, n_variables))

# Tranform the data
transform = ImanConover(corr)
transform = ImanConover(desired_corr)
X_transformed = transform(X)

# Check that all columns (variables) have equal marginals.
# In other words, Iman-Conover can permute each column individually,
# but they should have the same entries before and after.
# but they should have identical entries before and after.
for j in range(X.shape[1]):
assert np.allclose(np.sort(X[:, j]), np.sort(X_transformed[:, j]))

# After the Iman-Conover transform, the distance between the desired
# correlation matrix should be smaller than it was before.
distance_before = sp.linalg.norm(np.corrcoef(X, rowvar=False) - corr)
distance_after = sp.linalg.norm(np.corrcoef(X_transformed, rowvar=False) - corr)
X_corr = np.corrcoef(X, rowvar=False)
distance_before = sp.linalg.norm(X_corr - desired_corr, ord="fro")

X_trans_corr = np.corrcoef(X_transformed, rowvar=False)
distance_after = sp.linalg.norm(X_trans_corr - desired_corr, ord="fro")

assert distance_after <= distance_before


if __name__ == "__main__":
import pytest

pytest.main(
args=[
__file__,
"--doctest-modules",
"-v",
]
)


if False:

def plotmat(mat, title):
"""Plot a matrix."""
plt.figure()
plt.title(title)
plt.scatter(*mat.T)
plt.show()

# Download the CARS dataset used in
# https://blogs.sas.com/content/iml/2021/06/16/geometry-iman-conover-transformation.html
csv = requests.get(
"https://raw.githubusercontent.com/sassoftware/sas-viya-programming/refs/heads/master/data/cars.csv"
).text
filepath = io.StringIO(csv)
df = pd.read_csv(filepath) # .sample(5, random_state=42)
X = df[["EngineSize", "MPG_Highway"]].to_numpy()

# Add some noise to tie-break variables
# TODO: look more into what happens when ties are not broken
X = X + np.random.randn(*X.shape) * 0.1

# TODO: Another interesting data set is
if False:
theta = np.random.rand(100) * np.pi * 2
x = np.cos(theta) + np.random.randn(len(theta)) * 0.1
y = np.sin(theta) + np.random.randn(len(theta)) * 0.1
X = np.vstack((x, y)).T

# TODO: Look into QMC methods like LatinHypercube
if True:
# LatinHypercube or Sobol
sampler = sp.stats.qmc.Sobol(d=2, seed=42, scramble=False)
lhs_samples = sampler.random(n=100) # on interval [0, 1]
lhs_samples = np.clip(lhs_samples, a_min=1e-3, a_max=1 - 1e-3)
plotmat(lhs_samples, "lhs_samples")
X = sp.stats.norm.ppf(lhs_samples) # map to normal
X = np.vstack(
(
sp.stats.norm.ppf(lhs_samples[:, 0]),
sp.stats.gamma.ppf(lhs_samples[:, 1], a=1),
)
).T

# Create a correlation matrix that we want to replicate
C = np.eye(2)
C[0, 1] = C[1, 0] = 0.6

plotmat(X, "input data X")

# Step one - Map data to normal scores
N, K = X.shape
ranks = sp.stats.rankdata(X, method="average", axis=0) / (N + 1)
normal_scores = sp.stats.norm.ppf(ranks)
plotmat(normal_scores, "normal_scores")

assert np.isclose(
sp.stats.spearmanr(X).statistic, sp.stats.spearmanr(normal_scores).statistic
), "spearman corr before and after should be the same"

# Step two - Remove correlations
I = np.corrcoef(normal_scores, rowvar=False)
Q = np.linalg.cholesky(I) # QQ' = I
decorrelated_scores = normal_scores @ np.linalg.inv(Q).T
plotmat(decorrelated_scores, "decorrelated_scores")
assert np.allclose(np.corrcoef(decorrelated_scores, rowvar=False), np.eye(K))

decorrelated_scores2 = np.linalg.solve(Q, normal_scores.T).T
assert np.allclose(decorrelated_scores, decorrelated_scores2)

# Step three - Induce correlations
P = np.linalg.cholesky(C) # PP' = C
correlated_scores = decorrelated_scores @ P.T
assert np.allclose(np.corrcoef(correlated_scores, rowvar=False), C)
plotmat(correlated_scores, "correlated_scores")

corr = sp.stats.spearmanr(correlated_scores).statistic
print(f"Rank correlation of correlated_scores: {corr:.6f}")
corr = sp.stats.pearsonr(*correlated_scores.T).statistic
print(f"Correlation of correlated_scores: {corr:.6f}")

# Step four - Restore marginal distributions
result = np.empty_like(X)
for k in range(K):
# sorted_idx = np.argsort(R_star[:, k])
# result[:, k] = np.sort(X[:, k])[sorted_idx] # X[sorted_idx, k]
ranks = sp.stats.rankdata(correlated_scores[:, k]).astype(int) - 1
result[:, k] = np.sort(X[:, k])[ranks]

plotmat(result, "result")

assert np.allclose(np.sort(X[:, 0]), np.sort(result[:, 0])), "Marginal must match"

corr = sp.stats.pearsonr(*result.T).statistic
print(f"Correlation of result: {corr:.6f}")
corr = sp.stats.spearmanr(result).statistic
print(f"Rank correlation of result: {corr:.6f}")

obs_corr = sp.stats.pearsonr(*result.T).statistic
assert np.isclose(obs_corr, 0.6, atol=0.1)

0 comments on commit b24fd33

Please sign in to comment.