import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
import pandas as pd
import h5py
from tensorflow.keras.metrics import binary_accuracy
from .datasets import make_classification_dataset \
as _make_classification_dataset
from .math import expectation_gauss_hermite, divergence_gauss_hermite \
as _divergence_gauss_hermite
from pathlib import Path
# shortcuts
tfd = tfp.distributions
[docs]def save_hdf5(X_train, y_train, X_test, y_test, filename):
with h5py.File(filename, 'w') as f:
f.create_dataset("X_train", data=X_train)
f.create_dataset("y_train", data=y_train)
f.create_dataset("X_test", data=X_test)
f.create_dataset("y_test", data=y_test)
[docs]def load_hdf5(filename):
with h5py.File(filename, 'r') as f:
X_train = np.array(f.get("X_train"))
y_train = np.array(f.get("y_train"))
X_test = np.array(f.get("X_test"))
y_test = np.array(f.get("y_test"))
return (X_train, y_train), (X_test, y_test)
[docs]class DistributionPair:
def __init__(self, p, q):
self.p = p
self.q = q
[docs] @classmethod
def from_covariate_shift_example(cls):
train = tfd.MixtureSameFamily(
mixture_distribution=tfd.Categorical(probs=[0.5, 0.5]),
components_distribution=tfd.MultivariateNormalDiag(
loc=[[-2.0, 3.0], [2.0, 3.0]], scale_diag=[1.0, 2.0])
)
test = tfd.MixtureSameFamily(
mixture_distribution=tfd.Categorical(probs=[0.5, 0.5]),
components_distribution=tfd.MultivariateNormalDiag(
loc=[[0.0, -1.0], [4.0, -1.0]])
)
return cls(p=test, q=train)
[docs] def make_dataset(self, num_samples, rate=0.5, dtype="float64", seed=None):
num_p = int(num_samples * rate)
num_q = num_samples - num_p
X_p = self.p.sample(sample_shape=(num_p, 1), seed=seed).numpy()
X_q = self.q.sample(sample_shape=(num_q, 1), seed=seed).numpy()
return X_p, X_q
[docs] def make_covariate_shift_dataset(self, class_posterior_fn, num_test,
num_train, threshold=0.5, seed=None):
num_samples = num_test + num_train
rate = num_test / num_samples
X_test, X_train = self.make_dataset(num_samples, rate=rate, seed=seed)
# TODO: Temporary fix. Need to address issue in `DistributionPair`.
X_train = X_train.squeeze()
X_test = X_test.squeeze()
y_train = (class_posterior_fn(*X_train.T) > threshold).numpy()
y_test = (class_posterior_fn(*X_test.T) > threshold).numpy()
return (X_train, y_train), (X_test, y_test)
[docs] def make_classification_dataset(self, num_samples, rate=0.5,
dtype="float64", seed=None):
X_p, X_q = self.make_dataset(num_samples, rate, dtype, seed)
X, y = _make_classification_dataset(X_p, X_q, dtype=dtype,
random_state=seed)
return X, y
[docs] def logit(self, x):
return self.p.log_prob(x) - self.q.log_prob(x)
[docs] def density_ratio(self, x):
return tf.exp(self.logit(x))
[docs] def optimal_score(self, x):
return tf.sigmoid(self.logit(x))
[docs] def optimal_accuracy(self, x_test, y_test):
# Required when some distributions are inherently `float32` such as
# the `MixtureSameFamily`.
# TODO: Add flexibility for whether to cast to `float64`.
y_pred = tf.cast(tf.squeeze(self.optimal_score(x_test)),
dtype=tf.float64)
return binary_accuracy(y_test, y_pred)
[docs] def kl_divergence(self):
return tfd.kl_divergence(self.p, self.q)
[docs] def divergence_monte_carlo(self):
# TODO
pass
[docs] def make_p_log_prob_estimator(self, logit_estimator):
"""
Recall log r(x) = log p(x) - log q(x). Then, we have,
log p(x) = log p(x) - log q(x) + log q(x) = log r(x) + log q(x)
"""
def p_log_prob_estimator(x):
return self.q.log_prob(x) + logit_estimator(x)
return p_log_prob_estimator
[docs]class DistributionPairGaussian(DistributionPair):
def __init__(self, q, p_loc=0.0, p_scale=1.0):
super(DistributionPairGaussian, self).__init__(
p=tfd.Normal(loc=p_loc, scale=p_scale), q=q)
[docs] def divergence_gauss_hermite(self, quadrature_size,
discrepancy_fn=tfp.vi.kl_forward):
return _divergence_gauss_hermite(self.p, self.q,
quadrature_size, under_p=True,
discrepancy_fn=discrepancy_fn)
[docs] def kl_divergence_scaled_distribution(self, logit_estimator, quadrature_size):
"""
By definition, r(x) q(x) = p(x). That is, we can think of r(x) as the
scaling factor needed to match q(x) to p(x).
Let hat{p}(x) = hat{r}(x) q(x). Then, hat{p}(x) ~= p(x) and how good
this estimate is depends entirely on how well hat{r}(x) estimates r(x).
This function computes KL[p(x) || hat{p}(x)] >= 0 using Gauss-Hermite
quadrature assuming p(x) is Gaussian. The lower the better, with
equality at p(x) == hat{p}(x).
"""
def fn(x):
self.logit(x) - logit_estimator(x)
return expectation_gauss_hermite(fn, self.p, quadrature_size)
qs = {
"same": tfd.Normal(loc=0.0, scale=1.0),
"scale_lesser": tfd.Normal(loc=0.0, scale=0.6),
"scale_greater": tfd.Normal(loc=0.0, scale=2.0),
"loc_different": tfd.Normal(loc=0.5, scale=1.0),
"additive": tfd.MixtureSameFamily(
mixture_distribution=tfd.Categorical(probs=[0.95, 0.05]),
components_distribution=tfd.Normal(loc=[0.0, 3.0], scale=[1.0, 1.0])),
"bimodal": tfd.MixtureSameFamily(
mixture_distribution=tfd.Categorical(probs=[0.4, 0.6]),
components_distribution=tfd.Normal(loc=[2.0, -3.0], scale=[1.0, 0.5]))
}
[docs]def get_distribution_pair(name):
return DistributionPairGaussian(qs[name])
[docs]def get_steps_per_epoch(num_train, batch_size):
return num_train // batch_size
[docs]def get_kl_weight(num_train, batch_size):
kl_weight = batch_size / num_train
return kl_weight
[docs]def to_numpy(transformed_variable):
return tf.convert_to_tensor(transformed_variable).numpy()
[docs]def inducing_index_points_history_to_dataframe(inducing_index_points_history):
# TODO: this will fail for `num_features > 1`
return pd.DataFrame(np.hstack(inducing_index_points_history).T)
[docs]def variational_scale_history_to_dataframe(variational_scale_history,
num_epochs):
a = np.stack(variational_scale_history, axis=0).reshape(num_epochs, -1)
return pd.DataFrame(a)
[docs]def save_results(history, name, learning_rate, beta1, beta2,
num_epochs, summary_dir, seed):
inducing_index_points_history_df = \
inducing_index_points_history_to_dataframe(history.pop("inducing_index_points"))
variational_loc_history_df = pd.DataFrame(history.pop("variational_loc"))
variational_scale_history_df = \
variational_scale_history_to_dataframe(history.pop("variational_scale"),
num_epochs)
history_df = pd.DataFrame(history).assign(name=name, seed=seed,
learning_rate=learning_rate,
beta1=beta1, beta2=beta2)
output_dir = Path(summary_dir).joinpath(name)
output_dir.mkdir(parents=True, exist_ok=True)
# TODO: add flexibility and de-clutter
inducing_index_points_history_df.to_csv(output_dir.joinpath(f"inducing_index_points.{seed:03d}.csv"), index_label="epoch")
variational_loc_history_df.to_csv(output_dir.joinpath(f"variational_loc.{seed:03d}.csv"), index_label="epoch")
variational_scale_history_df.to_csv(output_dir.joinpath(f"variational_scale.{seed:03d}.csv"), index_label="epoch")
history_df.to_csv(output_dir.joinpath(f"scalars.{seed:03d}.csv"), index_label="epoch")
[docs]def preprocess(df):
new_df = df.assign(timestamp=pd.to_datetime(df.timestamp, unit='s'))
elapsed_delta = new_df.timestamp - new_df.timestamp.min()
return new_df.assign(elapsed=elapsed_delta.dt.total_seconds())
[docs]def merge_stack_runs(series_dict, seed_key="seed", y_key="nelbo",
drop_until_all_start=False):
merged_df = pd.DataFrame(series_dict)
# fill missing values by propagating previous observation
merged_df.ffill(axis="index", inplace=True)
# NaNs can only remain if there are no previous observations
# i.e. these occur at the beginning rows.
# drop rows until all runs have recorded observations.
if drop_until_all_start:
merged_df.dropna(how="any", axis="index", inplace=True)
# TODO: Add option to impute with row-wise mean, which looks something like:
# (values in Pandas can only be filled column-by-column, so need to
# transpose, fillna and transpose back)
# merged_df = merged_df.T.fillna(merged_df.mean(axis="columns")).T
merged_df.columns.name = seed_key
stacked_df = merged_df.stack(level=seed_key)
stacked_df.name = y_key
data = stacked_df.reset_index()
return data
[docs]def make_plot_data(names, seeds, summary_dir,
process_run_fn=None,
extract_series_fn=None,
seed_key="seed",
y_key="nelbo"):
base_path = Path(summary_dir)
if process_run_fn is None:
def process_run_fn(run_df):
return run_df
df_list = []
for name in names:
path = base_path.joinpath(name)
seed_dfs = dict()
for seed in seeds:
csv_path = path.joinpath(f"scalars.{seed:03d}.csv")
seed_df = pd.read_csv(csv_path)
seed_dfs[seed] = process_run_fn(seed_df)
if extract_series_fn is not None:
series_dict = {seed: extract_series_fn(seed_df)
for seed, seed_df in seed_dfs.items()}
name_df = merge_stack_runs(series_dict, seed_key=seed_key,
y_key=y_key).assign(name=name)
else:
name_df = pd.concat(seed_dfs.values(), axis="index", sort=True)
df_list.append(name_df)
data = pd.concat(df_list, axis="index", sort=True)
return data