diff --git a/pypsps/keras/models.py b/pypsps/keras/models.py index 195f7f2..9e1cc5e 100644 --- a/pypsps/keras/models.py +++ b/pypsps/keras/models.py @@ -1,6 +1,6 @@ """Example model architectures for pypsps.""" -from typing import List +from typing import List, Tuple import tensorflow as tf @@ -31,7 +31,9 @@ def recommended_callbacks(monitor="val_loss") -> List[tf.keras.callbacks.Callbac def _build_binary_continuous_causal_loss( - n_states: int, alpha: float = 1.0 + n_states: int, + alpha: float, + df_penalty_l1: float, ) -> losses.CausalLoss: """Builds an example of binary treatment & continuous outcome causal loss.""" psps_outcome_loss = losses.OutcomeLoss( @@ -48,7 +50,7 @@ def _build_binary_continuous_causal_loss( alpha=alpha, outcome_loss_weight=1.0, predictive_states_regularizer=pypress.keras.regularizers.DegreesOfFreedom( - 10.0, df=n_states - 1 + l1=df_penalty_l1, df=n_states - 1 ), reduction="sum_over_batch_size", ) @@ -60,6 +62,8 @@ def build_toy_model( n_features: int, compile: bool = True, alpha: float = 1.0, + df_penalty_l1: float = 1.0, + learning_rate: float = 0.01, ) -> tf.keras.Model: """Builds a pypsps toy model for binary treatment & continous outcome. @@ -72,6 +76,8 @@ def build_toy_model( n_features: number of (numeric) features to use as input. compile: if True, compiles pypsps model with the appropriate pypsps causal loss functions. alpha: propensity score penalty (by default alpha = 1., which corresponds to equal weight) + df_penalty_l1: l1 parameter for the DF regularization + learning_rate: learning rate of the optimizer. Returns: A tf.keras Model with the pypsps architecture (compiled model if `compile=True`). @@ -141,11 +147,154 @@ def build_toy_model( if compile: psps_causal_loss = _build_binary_continuous_causal_loss( - n_states=n_states, alpha=alpha + n_states=n_states, + alpha=alpha, + df_penalty_l1=df_penalty_l1, + ) + model.compile( + loss=psps_causal_loss, + optimizer=tfk.optimizers.Nadam(learning_rate=learning_rate), + metrics=[ + metrics.PropensityScoreBinaryCrossentropy(), + metrics.PropensityScoreAUC(curve="PR"), + metrics.OutcomeMeanSquaredError(), + ], + ) + + return model + + +def build_model_binary_normal( + n_states: int, + n_features: int, + predictive_state_hidden_layers: List[Tuple[int, str]], + outcome_hidden_layers: List[Tuple[int, str]], + loc_layer: Tuple[int, str] = None, + scale_layer: Tuple[int, str] = None, + compile: bool = True, + alpha: float = 1.0, + df_penalty_l1: float = 1.0, + learning_rate: float = 0.01, + dropout_rate: float = 0.2, +) -> tf.keras.Model: + """Builds a pypsps toy model for binary treatment & continous outcome. + + All pypsps keras layers can be used to build more complex causal model architectures + within a TensorFlow graph. The specific model structure here is only used + for proof-of-concept / demo purposes. + + Args: + n_states: number of predictive states to use in the pypsps model. + n_features: number of (numeric) features to use as input. + compile: if True, compiles pypsps model with the appropriate pypsps causal loss functions. + alpha: propensity score penalty (by default alpha = 1., which corresponds to equal weight) + df_penalty_l1: l1 parameter for the DF regularization + learning_rate: learning rate of the optimizer. + + Returns: + A tf.keras Model with the pypsps architecture (compiled model if `compile=True`). + """ + + assert n_states >= 1, f"Got n_states={n_states}" + assert n_features >= 1, f"Got n_features={n_features}" + + features = tfk.layers.Input(shape=(n_features,)) + treat = tfk.layers.Input(shape=(1,)) + + features_bn = tfk.layers.BatchNormalization()(features) + feat_treat = tfk.layers.Concatenate(name="features_and_treatment")( + [features_bn, treat] + ) + + ps_hidden = tf.keras.layers.Dense( + predictive_state_hidden_layers[0][0], predictive_state_hidden_layers[0][1] + )(features_bn) + ps_hidden = tf.keras.layers.Dropout(dropout_rate)(ps_hidden) + ps_hidden = tf.keras.layers.BatchNormalization()(ps_hidden) + + for units, act in predictive_state_hidden_layers[1:]: + ps_hidden = tf.keras.layers.Dense(units, act)(ps_hidden) + ps_hidden = tf.keras.layers.Dropout(dropout_rate)(ps_hidden) + ps_hidden = tf.keras.layers.BatchNormalization()(ps_hidden) + + ps_hidden = tf.keras.layers.Concatenate()([ps_hidden, features_bn]) + pss = pypress.keras.layers.PredictiveStateSimplex( + n_states=n_states, input_dim=n_features + ) + pred_states = pss(ps_hidden) + + # Propensity score for binary treatment (--> "sigmoid" activation). + prop_score = pypress.keras.layers.PredictiveStateMeans( + units=1, activation="sigmoid", name="propensity_score" + )(pred_states) + + outcome_hidden = tf.keras.layers.Dense( + outcome_hidden_layers[0][0], outcome_hidden_layers[0][1] + )(feat_treat) + outcome_hidden = tf.keras.layers.Dropout(dropout_rate)(outcome_hidden) + outcome_hidden = tf.keras.layers.BatchNormalization()(outcome_hidden) + + for units, act in outcome_hidden_layers[1:]: + outcome_hidden = tf.keras.layers.Dense(units, act)(outcome_hidden) + outcome_hidden = tf.keras.layers.Dropout(dropout_rate)(outcome_hidden) + outcome_hidden = tf.keras.layers.BatchNormalization()(outcome_hidden) + + outcome_hidden = tf.keras.layers.Concatenate()([outcome_hidden, feat_treat]) + + loc_preds = [] + scale_preds = [] + # One outcome model per state. + for state_id in range(n_states): + loc_preds.append( + tfk.layers.Dense(1, name="loc_pred_state_" + str(state_id))( + tfk.layers.Dense( + loc_layer[0], + loc_layer[1], + name="loc_feat_eng_state_" + str(state_id), + )(outcome_hidden) + ) + ) + + if scale_layer is None: + # In this toy model use a constant scale estimate (BiasOnly); if needed + # change this to a scale parameter that changes as a function of inputs / hidden layers. + scale_preds.append( + tf.keras.activations.softplus( + layers.BiasOnly(name="scale_logit_" + str(state_id))(feat_treat) + ) + ) + else: + scale_preds.append( + tfk.layers.Dense( + 1, activation="softplus", name="scale_pred_state_" + str(state_id) + )( + tfk.layers.Dense( + scale_layer[0], + scale_layer[1], + name="scale_feat_eng_state_" + str(state_id), + )(outcome_hidden) + ) + ) + + loc_comb = tfk.layers.Concatenate(name="loc_pred_combined")(loc_preds) + scale_comb = tfk.layers.Concatenate(name="scale_pred_combined")(scale_preds) + + outputs_concat = tfk.layers.Concatenate(name="output_tensor")( + [loc_comb, scale_comb, pred_states, prop_score] + ) + + model = tfk.models.Model(inputs=[features, treat], outputs=outputs_concat) + + if compile: + + psps_causal_loss = _build_binary_continuous_causal_loss( + n_states=n_states, + alpha=alpha, + df_penalty_l1=df_penalty_l1, ) model.compile( loss=psps_causal_loss, - optimizer=tfk.optimizers.Nadam(learning_rate=0.01), + optimizer=tfk.optimizers.Nadam(learning_rate=learning_rate), metrics=[ metrics.PropensityScoreBinaryCrossentropy(), metrics.PropensityScoreAUC(curve="PR"), diff --git a/pypsps/tests/test_losses.py b/pypsps/tests/test_losses.py index 977a596..9e1a7d9 100644 --- a/pypsps/tests/test_losses.py +++ b/pypsps/tests/test_losses.py @@ -1,6 +1,5 @@ """Test module for loss functions.""" - import numpy as np import pytest import tensorflow as tf @@ -81,6 +80,5 @@ def test_end_to_end_dataset_model_fit(): assert preds.shape[0] == ks_data.n_samples - outcome_pred, scale_pred, weights, prop_score = utils.split_y_pred(preds) ate = inference.predict_ate(model, inputs[0]) assert ate > 0 diff --git a/pypsps/tests/test_models.py b/pypsps/tests/test_models.py new file mode 100644 index 0000000..f1cf588 --- /dev/null +++ b/pypsps/tests/test_models.py @@ -0,0 +1,44 @@ +"""Test module for model functions.""" + +import numpy as np +import pytest +import tensorflow as tf +import random + +from .. import datasets +from ..keras import models + + +tfk = tf.keras + + +def test_build_toy_model(): + np.random.seed(10) + ks_data = datasets.KangSchafer(true_ate=10).sample(n_samples=1000) + + inputs, outputs = ks_data.to_keras_inputs_outputs() + tf.random.set_seed(10) + model = models.build_toy_model( + n_states=3, n_features=ks_data.n_features, compile=True + ) + preds = model.predict(inputs) + assert not np.isnan(preds.sum().sum()) + + +def test_build_model(): + np.random.seed(10) + ks_data = datasets.KangSchafer(true_ate=10).sample(n_samples=1000) + + inputs, outputs = ks_data.to_keras_inputs_outputs() + tf.random.set_seed(10) + model = models.build_model_binary_normal( + n_states=3, + n_features=ks_data.n_features, + compile=True, + predictive_state_hidden_layers=[(10, "selu"), (20, "relu")], + outcome_hidden_layers=[(30, "tanh"), (20, "selu")], + loc_layer=(20, "selu"), + scale_layer=(10, "tanh"), + ) + preds = model.predict(inputs) + assert not np.isnan(preds.sum().sum())