Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Permanent Shocks #21

Open
ValterNobrega opened this issue Apr 29, 2024 · 2 comments
Open

Permanent Shocks #21

ValterNobrega opened this issue Apr 29, 2024 · 2 comments

Comments

@ValterNobrega
Copy link

ValterNobrega commented Apr 29, 2024

Hi everyone.

I'm attempting to apply a permanent shock in a Neoclassical Model with heterogeneous labor supply and variation across discount factors and permanent abilities.

The model functions perfectly for a transitory fiscal shock. However, when it comes to the permanent fiscal shock, I encounter an issue. After calculating the new steady state following the shock, which I've termed "ss_terminal," I use the following code:

# transition
unknowns = ['K', 'L']
targets = ['asset_mkt', 'labor_mkt']
dG = 0.01 * ss_initial['Y'] * rho ** (np.arange(T)) # 1%
td_nonlin = neoclassic.solve_impulse_nonlinear(ss_terminal, unknowns, targets, {"G": dG},
                                               ss_initial = ss_initial)

I consistently receive the following error: ValueError: max() arg is an empty sequence. It seems that the code fails to recognize any inputs when applying the ss_initial option in the CombinedBlock.impulse_nonlinear function:

     72     if input_args or ss_initial is not None:
     73         # If this block is actually perturbed, or we start from different initial ss
     74         # TODO: be more selective about ss_initial here - did any inputs change that matter for this one block?
---> 75         impulses.update(block.impulse_nonlinear(ss, input_args, outputs & block.outputs, internals, Js, options, ss_initial))

To investigate whether this issue is related to changes in ss_terminal, I attempted to use the same code but with the same steady state:

td_nonlin = neoclassic.solve_impulse_nonlinear(ss_terminal, unknowns, targets, {"G": dG},
                                               ss_initial = ss_terminal)

However, the error persists.

I verified this code with the option 'ss_initial' in a model lacking heterogeneity across discount factors and permanent abilities, and it worked flawlessly. To incorporate these factors, I created nb (number of betas) * na (number of abilities) het_blocks, then aggregated them all with respect to their weights (that I know).

What puzzles me is that everything has functioned smoothly thus far for transitory shocks. When I apply the code without the option 'ss_initial', it works fine. The problem only arises when I use this option, which is necessary for the permanent shock.

Do you have an idea where the problem might be?

Thank you
Valter Nóbrega

@ludwigstraub
Copy link
Collaborator

Dear Valter,

Thank you for raising your issue. Could you send a minimal code snippet that I can run that gets you the error? Say two blocks that you aggregate and compute a nonlinear transition for? And then maybe also show the full error message?

Hopefully we can figure it out then!

Ludwig

@ValterNobrega
Copy link
Author

ValterNobrega commented May 21, 2024

Dear Ludwig

Thank you for your reply.

The model that I am trying to do is a neoclassical model with heterogeneous labor supply and permanent heterogeneity across discount factors and permanent ability.

To do so, I imported the household block with labor choice:
hh = hetblocks.hh_labor.hh

and then added the corresponding hetoutputs and hetinputs:

def utility(c, eis, vphi, n, frisch):
    u =  (c ** (1 - 1/eis) / (1 - 1/eis) ) - vphi * ((n ** (1 + frisch)) / (1 + frisch))
    return u

def make_grids(rho_s, sigma_s, nS, amax, amin, nA):
    e_grid, Pi, pi_e = utils.tauchen_hans(sigma_s, rho_s, nS)
    a_grid = utils.a_grid_fortran(nx=nA, convex_param=4.0, ubound=amax, lbound=amin) # Function that mimics the grids from the Fortran Code
    return e_grid, pi_e, Pi, a_grid

def wages(w, e_grid, tau_l, gamma0, ability):
    we = (1 - tau_l) * w * np.exp(gamma0 + ability + e_grid)
    return we

def labor_supply(n, e_grid, gamma0, w, ability):
    ne = (np.exp(gamma0 + ability + e_grid))[:, np.newaxis] * n
    av_e = (w * np.exp(gamma0 + ability + e_grid))[:, np.newaxis] * n
    return ne, av_e

def transfers(Tax, e_grid):
    T = -Tax / np.ones(len(e_grid))
    return T

household = hh.add_hetinputs([transfers, wages, make_grids])
household = household.add_hetoutputs([labor_supply, utility])

Then, I add firms and government, and the corresponding market clearing conditions:

@simple
def firm(K, L, alpha, delta):
    r = alpha * (K(-1) / L) ** (alpha-1) - delta
    w = (1 - alpha) * (K(-1) / L) ** alpha
    Y = K(-1) ** alpha * L ** (1 - alpha)
    return r, w, Y

@simple
def mkt_clearing(A, NE, G, C, L, Y, B, delta, K):
    asset_mkt = A - B - K
    labor_mkt = NE - L
    I = K - (1 - delta) * K(-1)
    goods_mkt = Y - C - G - I
    return asset_mkt, labor_mkt, goods_mkt

@simple
def fiscal(r, B, G, tau_l, w, L):
    Tax = (1 + r) * B(-1) + G - B - tau_l * w * L
    return Tax

Since TFP is normalized to 1, and we need to calibrate after K/Y, I created a new block for those ratios:

@simple
def macro_ratios(Y, G, B, Tax, K):
    G_Y = G / Y
    B_Y = B / Y
    T_Y = Tax / Y
    K_Y = K / Y
    return G_Y, B_Y, T_Y, K_Y

After all this, I added the ingredients for the heterogeneity. Since I am trying to replicate a model with 3 betas and 2 different permanenent abilities, what I did was to aggregate them in the same way as in the KS example.

ability_grid = utils.tauchen_hans(0.712, 0, 2)[0]
hh_list = []
to_map = ['beta', 'ability', *household.outputs]
for i in range(1, 4):  # Loop for beta variations
    for j in range(1, len(ability_grid) + 1):  # Loop for ability variations
        beta_suffix = f'_beta{i}'
        ability_suffix = f'_ability{j}'
        remapped_keys = {k: k + beta_suffix + ability_suffix for k in to_map}
        renamed_key = f'hh_beta{i}_ability{j}'
        new_hh = household.remap(remapped_keys).rename(renamed_key)
        hh_list.append(new_hh)

@simple
def aggregate(A_beta1_ability1, A_beta1_ability2,
                        A_beta2_ability1, A_beta2_ability2,
                        A_beta3_ability1, A_beta3_ability2,

                        C_beta1_ability1, C_beta1_ability2,
                        C_beta2_ability1, C_beta2_ability2,
                        C_beta3_ability1, C_beta3_ability2,

                        NE_beta1_ability1, NE_beta1_ability2,
                        NE_beta2_ability1, NE_beta2_ability2,
                        NE_beta3_ability1, NE_beta3_ability2,

                        N_beta1_ability1, N_beta1_ability2,
                        N_beta2_ability1, N_beta2_ability2,
                        N_beta3_ability1, N_beta3_ability2,

                        sigma_a, nAB):

    # This has to be done by hand
    mass_ability = utils.tauchen_hans(sigma_a, 0, nAB)[2]

    # For A
    A_beta1 = A_beta1_ability1 * mass_ability[0] + \
              A_beta1_ability2 * mass_ability[1]

    A_beta2 = A_beta2_ability1 * mass_ability[0] + \
              A_beta2_ability2 * mass_ability[1]

    A_beta3 = A_beta3_ability1 * mass_ability[0] + \
              A_beta3_ability2 * mass_ability[1]

    # For C
    C_beta1 = C_beta1_ability1 * mass_ability[0] + \
              C_beta1_ability2 * mass_ability[1]

    C_beta2 = C_beta2_ability1 * mass_ability[0] + \
              C_beta2_ability2 * mass_ability[1]

    C_beta3 = C_beta3_ability1 * mass_ability[0] + \
              C_beta3_ability2 * mass_ability[1]

    # For NE
    NE_beta1 = NE_beta1_ability1 * mass_ability[0] + \
              NE_beta1_ability2 * mass_ability[1]

    NE_beta2 = NE_beta2_ability1 * mass_ability[0] + \
              NE_beta2_ability2 * mass_ability[1]

    NE_beta3 = NE_beta3_ability1 * mass_ability[0] + \
              NE_beta3_ability2 * mass_ability[1]

    # For N
    N_beta1 = N_beta1_ability1 * mass_ability[0] + \
              N_beta1_ability2 * mass_ability[1]

    N_beta2 = N_beta2_ability1 * mass_ability[0] + \
              N_beta2_ability2 * mass_ability[1]

    N_beta3 = N_beta3_ability1 * mass_ability[0] + \
              N_beta3_ability2 * mass_ability[1]

    A = (A_beta1 + A_beta2 + A_beta3) / 3
    C = (C_beta1 + C_beta2 + C_beta3) / 3
    NE = (NE_beta1 + NE_beta2 + NE_beta3) / 3
    N = (N_beta1 + N_beta2 + N_beta3) / 3

    return C, A, NE, N

Then, since the betas parameters that I use for calibration, in order to avoid having to write "beta_beta1_ability_1", etc..., I added a simple block that maps the values from the calibration dictionary to the household block:

@simple
def betas(beta1, beta2, beta3):
    # Beta 1
    beta_beta1_ability1 = beta1
    beta_beta1_ability2 = beta1

    # Beta 2
    beta_beta2_ability1 = beta2
    beta_beta2_ability2 = beta2

    # Beta 3
    beta_beta3_ability1 = beta3
    beta_beta3_ability2 = beta3

    return beta_beta1_ability1, beta_beta1_ability2, beta_beta2_ability1, beta_beta2_ability2, beta_beta3_ability1, beta_beta3_ability2

and to help to build the calibration dictionary, I created a function to add each of the new values abilities and betas (flexible to the number of points in the ability grid):

def parameters(beta_values, sigma_a, nAb):
    ability_grid = utils.tauchen_hans(sigma_a, 0, nAb)[0]
    parameter_values = {}

    # Loop over beta indices
    for i, beta in enumerate(beta_values, start=1):
        # Loop over ability indices
        for j in range(1, nAb + 1):
            # Construct variable names
            beta_variable_name = f'beta_beta{i}_ability{j}'
            ability_variable_name = f'ability_beta{i}_ability{j}'

            # Assign value to variables
            parameter_values[beta_variable_name] = beta
            parameter_values[ability_variable_name] = ability_grid[j - 1]

    # Return variable names and values as a dictionary
    return parameter_values

# Steady state
calibration.update(parameters([calibration['beta1'], calibration['beta2'], calibration['beta3']],calibration['sigma_a'], calibration['nAB']))

So the model is created as:
neoclassic = create_model([*hh_list, firm, fiscal, aggregate, betas, mkt_clearing, macro_ratios], name="Neoclassical")

For this calibration, we get a steady state:

# Steady state
calibration = {'eis': 1.2**(-1),
               'frisch': 1.0,
               'delta': 0.015,
               'alpha': 0.33,
               'rho_s': 0.761,
               'sigma_s': 0.211,
               'nS': 5,
               'nA': 200,
               'amax': 400,
               'amin': -1.7,
               'gamma0': 0.,
               'phi_tau': 0.1,
               'beta1': 0.9858655,
               'beta2': 0.9872655,
               'beta3': 0.9882,
               'sigma_a': 0.712,
               'nAB': 2}

calibration.update(parameters([calibration['beta1'], calibration['beta2'], calibration['beta3']],calibration['sigma_a'], calibration['nAB']))

unknowns_ss = {'K': 18.34,
               'L': 0.4337,
               'vphi': 10,
               'G': 0.22,
               'B': 2.56,
               'tau_l': 0.35,
               'beta3': 0.9882}

targets_ss = {'asset_mkt': 0.,
              'labor_mkt': 0.,
              'N': 0.248,
              'T_Y': -0.07,
              'B_Y': 1.714,
              'G_Y': 0.15,
              'K_Y': 12.292}

ss_cal = neoclassic.solve_steady_state(calibration, unknowns_ss, targets_ss, solver='broyden_custom',
                                      ttol=1e-5, solver_kwargs={'maxcount': 10000})

Then, if I apply a nonlinear MIT shock on G, the model also converges:

# setup
T = 300
exogenous = ['G']
unknowns = ['K', 'L']
targets = ['asset_mkt', 'labor_mkt']

rho = 0.975
dG = 0.01 * ss_cal['Y'] * rho ** (np.arange(T))

td_nonlin = neoclassic.solve_impulse_nonlinear(ss_cal, unknowns, targets, {"G": dG})

However, if I try to add the line "ss_initial", it does not run.
If I give the model the same steady state:
td_nonlin = neoclassic.solve_impulse_nonlinear(ss_cal, unknowns, targets, {"G": dG}, ss_initial=ss_cal)

I get the error message: ValueError: max() arg is an empty sequence

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[20], line 11
      8 dG = 0.01 * ss_cal['Y'] * rho ** (np.arange(T))
     10 # general equilibrium jacobians
---> 11 td_nonlin = neoclassic.solve_impulse_nonlinear(ss_cal, unknowns, targets, {"G": dG}, ss_initial=ss_cal)

File D:\Projects\venv\lib\site-packages\sequence_jacobian\blocks\block.py:197, in Block.solve_impulse_nonlinear(self, ss, unknowns, targets, inputs, outputs, internals, Js, options, H_U_factored, ss_initial, **kwargs)
    195     print(f'Solving {self.name} for {unknowns} to hit {targets}')
    196 for it in range(opts['maxit']):
--> 197     results = self.impulse_nonlinear(ss, inputs | U, actual_outputs | targets, internals, Js, options, ss_initial, **kwargs)
    198     errors = {k: np.max(np.abs(results[k])) for k in targets}
    199     if opts['verbose']:

File D:\Projects\venv\lib\site-packages\sequence_jacobian\blocks\block.py:66, in Block.impulse_nonlinear(self, ss, inputs, outputs, internals, Js, options, ss_initial, **kwargs)
     61 actual_outputs, inputs_as_outputs = self.process_outputs(ss,
     62     self.make_ordered_set(inputs), self.make_ordered_set(outputs))
     64 if isinstance(self, Parent):
     65     # SolvedBlocks may use Js and may be nested in a CombinedBlock, so we need to pass them down to any parent
---> 66     out = self.M @ self._impulse_nonlinear(self.M.inv @ ss, self.M.inv @ inputs, self.M.inv @ actual_outputs, internals, Js, options, self.M.inv @ ss_initial, **own_options)
     67 elif hasattr(self, 'internals'):
     68     out = self.M @ self._impulse_nonlinear(self.M.inv @ ss, self.M.inv @ inputs, self.M.inv @ actual_outputs, self.internals_to_report(internals), self.M.inv @ ss_initial, **own_options)

File D:\Projects\venv\lib\site-packages\sequence_jacobian\blocks\combined_block.py:75, in CombinedBlock._impulse_nonlinear(self, ss, inputs, outputs, internals, Js, options, ss_initial)
     70     input_args = {k: v for k, v in impulses.items() if k in block.inputs}
     72     if input_args or ss_initial is not None:
     73         # If this block is actually perturbed, or we start from different initial ss
     74         # TODO: be more selective about ss_initial here - did any inputs change that matter for this one block?
---> 75         impulses.update(block.impulse_nonlinear(ss, input_args, outputs & block.outputs, internals, Js, options, ss_initial))
     77 return ImpulseDict({k: impulses.toplevel[k] for k in original_outputs if k in impulses.toplevel}, impulses.internals, impulses.T)

File D:\Projects\venv\lib\site-packages\sequence_jacobian\blocks\block.py:60, in Block.impulse_nonlinear(self, ss, inputs, outputs, internals, Js, options, ss_initial, **kwargs)
     57 """Calculate a partial equilibrium, non-linear impulse response of `outputs` to a set of shocks in `inputs`
     58 around a steady state `ss`."""
     59 own_options = self.get_options(options, kwargs, 'impulse_nonlinear')
---> 60 inputs = ImpulseDict(inputs)
     61 actual_outputs, inputs_as_outputs = self.process_outputs(ss,
     62     self.make_ordered_set(inputs), self.make_ordered_set(outputs))
     64 if isinstance(self, Parent):
     65     # SolvedBlocks may use Js and may be nested in a CombinedBlock, so we need to pass them down to any parent

File D:\Projects\venv\lib\site-packages\sequence_jacobian\classes\impulse_dict.py:22, in ImpulseDict.__init__(self, data, internals, T)
     20     raise ValueError('ImpulseDicts are initialized with a `dict` of top-level impulse responses.')
     21 super().__init__(data, internals)
---> 22 self.T = (T if T is not None else self.infer_length())

File D:\Projects\venv\lib\site-packages\sequence_jacobian\classes\impulse_dict.py:100, in ImpulseDict.infer_length(self)
     98 def infer_length(self):
     99     lengths = [len(v) for v in self.toplevel.values()]
--> 100     length = max(lengths)
    101     if length != min(lengths):
    102         raise ValueError(f'Building ImpulseDict with inconsistent lengths {max(lengths)} and {min(lengths)}')

ValueError: max() arg is an empty sequence

I believe the problem is precisely in one of the simple blocks I added, for example the "betas" block.
What I did to overcome this problem was to create a "steady state model" with the "betas" block, and then run the "neoclassical" model without it, similar to what is on the "two asset" notebook, and it worked.

neoclassic_ss = create_model([*hh_list, firm, fiscal, aggregate, betas, mkt_clearing, macro_ratios], name="Neoclassical SS")
neoclassic = create_model([*hh_list, firm, fiscal, aggregate, mkt_clearing, macro_ratios], name="Neoclassical")

ss_1 = neoclassic_ss.solve_steady_state(calibration, unknowns_ss, targets_ss, solver='broyden_custom',
                                      ttol=1e-5, solver_kwargs={'maxcount': 10000})
ss_cal = neoclassic.steady_state(ss_1)

However, for the purpose of my learning process, I would like to know and understand why the nonlinear impulse responses work without the 'ss_initial' argument when I run the code above, and with that, it doesn't.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants