Appendix C: Real business cycle model codeΒΆ
This appendix presents Python code implementing the full real business cycle model, including solution of the linear rational expectations model, as described in Representation in Python. It also presents code for the parameter estimation by classical (see Maximum Likelihood Estimation) and Bayesian (see Posterior Simulation) methods.
The following code implements the real business cycle model in Python as a state space model.
from collections import OrderedDict
class SimpleRBC(sm.tsa.statespace.MLEModel):
parameters = OrderedDict([
('discount_rate', 0.95),
('disutility_labor', 3.),
('depreciation_rate', 0.025),
('capital_share', 0.36),
('technology_shock_persistence', 0.85),
('technology_shock_var', 0.04**2)
])
def __init__(self, endog, calibrated=None):
super(SimpleRBC, self).__init__(
endog, k_states=2, k_posdef=1, initialization='stationary')
self.k_predetermined = 1
# Save the calibrated vs. estimated parameters
parameters = self.parameters.keys()
calibrated = calibrated or {}
self.calibrated = OrderedDict([
(param, calibrated[param]) for param in parameters
if param in calibrated
])
self.idx_calibrated = np.array([
param in self.calibrated for param in parameters])
self.idx_estimated = ~self.idx_calibrated
self.k_params = len(self.parameters)
self.k_calibrated = len(self.calibrated)
self.k_estimated = self.k_params - self.k_calibrated
self.idx_cap_share = parameters.index('capital_share')
self.idx_tech_pers = parameters.index('technology_shock_persistence')
self.idx_tech_var = parameters.index('technology_shock_var')
# Setup fixed elements of system matrices
self['selection', 1, 0] = 1
@property
def start_params(self):
structural_params = np.array(self.parameters.values())[self.idx_estimated]
measurement_variances = [0.1] * 3
return np.r_[structural_params, measurement_variances]
@property
def param_names(self):
structural_params = np.array(self.parameters.keys())[self.idx_estimated]
measurement_variances = ['%s.var' % name for name in self.endog_names]
return structural_params.tolist() + measurement_variances
def log_linearize(self, params):
# Extract the parameters
(discount_rate, disutility_labor, depreciation_rate, capital_share,
technology_shock_persistence, technology_shock_var) = params
# Temporary values
tmp = (1. / discount_rate - (1. - depreciation_rate))
theta = (capital_share / tmp)**(1. / (1. - capital_share))
gamma = 1. - depreciation_rate * theta**(1. - capital_share)
zeta = capital_share * discount_rate * theta**(capital_share - 1)
# Coefficient matrices from linearization
A = np.eye(2)
B11 = 1 + depreciation_rate * (gamma / (1 - gamma))
B12 = (-depreciation_rate *
(1 - capital_share + gamma * capital_share) /
(capital_share * (1 - gamma)))
B21 = 0
B22 = capital_share / (zeta + capital_share*(1 - zeta))
B = np.array([[B11, B12], [B21, B22]])
C1 = depreciation_rate / (capital_share * (1 - gamma))
C2 = (zeta * technology_shock_persistence /
(zeta + capital_share*(1 - zeta)))
C = np.array([[C1], [C2]])
return A, B, C
def solve(self, params):
capital_share = params[self.idx_cap_share]
technology_shock_persistence = params[self.idx_tech_pers]
# Get the coefficient matrices from linearization
A, B, C = self.log_linearize(params)
# Jordan decomposition of B
eigvals, right_eigvecs = np.linalg.eig(np.transpose(B))
left_eigvecs = np.transpose(right_eigvecs)
# Re-order, ascending
idx = np.argsort(eigvals)
eigvals = np.diag(eigvals[idx])
left_eigvecs = left_eigvecs[idx, :]
# Blanchard-Khan conditions
k_nonpredetermined = self.k_states - self.k_predetermined
k_stable = len(np.where(eigvals.diagonal() < 1)[0])
k_unstable = self.k_states - k_stable
if not k_stable == self.k_predetermined:
raise RuntimeError('Blanchard-Kahn condition not met.'
' Unique solution does not exist.')
# Create partition indices
k = self.k_predetermined
p1 = np.s_[:k]
p2 = np.s_[k:]
p11 = np.s_[:k, :k]
p12 = np.s_[:k, k:]
p21 = np.s_[k:, :k]
p22 = np.s_[k:, k:]
# Decouple the system
decoupled_C = np.dot(left_eigvecs, C)
# Solve the explosive component (controls) in terms of the
# non-explosive component (states) and shocks
tmp = np.linalg.inv(left_eigvecs[p22])
# This is \phi_{ck}, above
policy_state = - np.dot(tmp, left_eigvecs[p21]).squeeze()
# This is \phi_{cz}, above
policy_shock = -(
np.dot(tmp, 1. / eigvals[p22]).dot(
np.linalg.inv(
np.eye(k_nonpredetermined) -
technology_shock_persistence / eigvals[p22]
)
).dot(decoupled_C[p2])
).squeeze()
# Solve for the non-explosive transition
# This is T_{kk}, above
transition_state = np.squeeze(B[p11] + np.dot(B[p12], policy_state))
# This is T_{kz}, above
transition_shock = np.squeeze(np.dot(B[p12], policy_shock) + C[p1])
# Create the full design matrix
tmp = (1 - capital_share) / capital_share
tmp1 = 1. / capital_share
design = np.array([[1 - tmp * policy_state, tmp1 - tmp * policy_shock],
[1 - tmp1 * policy_state, tmp1 * (1-policy_shock)],
[policy_state, policy_shock]])
# Create the transition matrix
transition = (
np.array([[transition_state, transition_shock],
[0, technology_shock_persistence]]))
return design, transition
def transform_discount_rate(self, param, untransform=False):
# Discount rate must be between 0 and 1
epsilon = 1e-4 # bound it slightly away from exactly 0 or 1
if not untransform:
return np.abs(1 / (1 + np.exp(param)) - epsilon)
else:
return np.log((1 - param + epsilon) / (param + epsilon))
def transform_disutility_labor(self, param, untransform=False):
# Disutility of labor must be positive
return param**2 if not untransform else param**0.5
def transform_depreciation_rate(self, param, untransform=False):
# Depreciation rate must be positive
return param**2 if not untransform else param**0.5
def transform_capital_share(self, param, untransform=False):
# Capital share must be between 0 and 1
epsilon = 1e-4 # bound it slightly away from exactly 0 or 1
if not untransform:
return np.abs(1 / (1 + np.exp(param)) - epsilon)
else:
return np.log((1 - param + epsilon) / (param + epsilon))
def transform_technology_shock_persistence(self, param, untransform=False):
# Persistence parameter must be between -1 and 1
if not untransform:
return param / (1 + np.abs(param))
else:
return param / (1 - param)
def transform_technology_shock_var(self, unconstrained, untransform=False):
# Variances must be positive
return unconstrained**2 if not untransform else unconstrained**0.5
def transform_params(self, unconstrained):
constrained = np.zeros(unconstrained.shape, unconstrained.dtype)
i = 0
for param in self.parameters.keys():
if param not in self.calibrated:
method = getattr(self, 'transform_%s' % param)
constrained[i] = method(unconstrained[i])
i += 1
# Measurement error variances must be positive
constrained[self.k_estimated:] = unconstrained[self.k_estimated:]**2
return constrained
def untransform_params(self, constrained):
unconstrained = np.zeros(constrained.shape, constrained.dtype)
i = 0
for param in self.parameters.keys():
if param not in self.calibrated:
method = getattr(self, 'transform_%s' % param)
unconstrained[i] = method(constrained[i], untransform=True)
i += 1
# Measurement error variances must be positive
unconstrained[self.k_estimated:] = constrained[self.k_estimated:]**0.5
return unconstrained
def update(self, params, **kwargs):
params = super(SimpleRBC, self).update(params, **kwargs)
# Reconstruct the full parameter vector from the
# estimated and calibrated parameters
structural_params = np.zeros(self.k_params, dtype=params.dtype)
structural_params[self.idx_calibrated] = self.calibrated.values()
structural_params[self.idx_estimated] = params[:self.k_estimated]
measurement_variances = params[self.k_estimated:]
# Solve the model
design, transition = self.solve(structural_params)
# Update the statespace representation
self['design'] = design
self['obs_cov', 0, 0] = measurement_variances[0]
self['obs_cov', 1, 1] = measurement_variances[1]
self['obs_cov', 2, 2] = measurement_variances[2]
self['transition'] = transition
self['state_cov', 0, 0] = structural_params[self.idx_tech_var]
The following code estimates the three measurement variances as well as the two technology shock parameters via maximum likelihood estimation
# Now, estimate the discount rate and the shock parameters
partially_calibrated = {
'discount_rate': 0.95,
'disutility_labor': 3.0,
'capital_share': 0.36,
'depreciation_rate': 0.025,
}
mod = SimpleRBC(rbc_data, calibrated=partially_calibrated)
res = mod.fit(maxiter=1000)
res = mod.fit(res.params, method='nm', maxiter=1000, disp=False)
print(res.summary())
estimated_irfs = res.impulse_responses(40, orthogonalized=True) * 100
Finally, the following code estimates all parameters except the disutility of labor and the depreciation rate via the Metropolis-within-Gibbs algorithm
from scipy.stats import truncnorm, norm, invgamma
def draw_posterior_rho(model, states, sigma2, truncate=False):
Z = states[1:2, 1:]
X = states[1:2, :-1]
tmp = 1 / (sigma2 + np.sum(X**2))
post_mean = tmp * np.squeeze(np.dot(X, Z.T))
post_var = tmp * sigma2
if truncate:
lower = (-1 - post_mean) / post_var**0.5
upper = (1 - post_mean) / post_var**0.5
rvs = truncnorm(lower, upper, loc=post_mean, scale=post_var**0.5).rvs()
else:
rvs = norm(post_mean, post_var**0.5).rvs()
return rvs
def draw_posterior_sigma2(model, states, rho):
resid = states[1, 1:] - rho * states[1, :-1]
post_shape = 2.00005 + model.nobs
post_scale = 0.0100005 + np.sum(resid**2)
return invgamma(post_shape, scale=post_scale).rvs()
np.random.seed(SEED)
from statsmodels.tsa.statespace.tools import is_invertible
from scipy.stats import multivariate_normal, gamma, invgamma, beta, uniform
# Create the model for likelihood evaluation
calibrated = {
'disutility_labor': 3.0,
'depreciation_rate': 0.025,
}
model = SimpleRBC(rbc_data, calibrated=calibrated)
sim_smoother = model.simulation_smoother()
# Specify priors
prior_discount = gamma(6.25, scale=0.04)
prior_cap_share = norm(0.3, scale=0.01)
prior_meas_err = invgamma(2.0025, scale=0.10025)
# Proposals
rw_discount = norm(scale=0.3)
rw_cap_share = norm(scale=0.01)
rw_meas_err = norm(scale=0.003)
# Create storage arrays for the traces
n_iterations = 10000
trace = np.zeros((n_iterations + 1, 7))
trace_accepts = np.zeros((n_iterations, 5))
trace[0] = model.start_params
trace[0, 0] = 100 * ((1 / trace[0, 0]) - 1)
loglike = None
# Iterations
for s in range(1, n_iterations + 1):
if s % 10000 == 0:
print s
# Get the parameters from the trace
discount_rate = 1 / (1 + (trace[s-1, 0] / 100))
capital_share = trace[s-1, 1]
rho = trace[s-1, 2]
sigma2 = trace[s-1, 3]
meas_vars = trace[s-1, 4:]**2
# 1. Gibbs step: draw the states using the simulation smoother
model.update(np.r_[discount_rate, capital_share, rho, sigma2, meas_vars])
sim_smoother.simulate()
states = sim_smoother.simulated_state[:, :-1]
# 2. Gibbs step: draw the autoregressive parameter, and apply
# rejection sampling to ensure an invertible lag polynomial
# In rare cases due to the combinations of other parameters,
# the mean of the normal posterior will be greater than one
# and it becomes difficult to draw from a normal distribution
# even with rejection sampling. In those cases we draw from a
# truncated normal.
rho = draw_posterior_rho(model, states, sigma2)
i = 0
while rho < -1 or rho > 1:
if i < 1e2:
rho = draw_posterior_rho(model, states, sigma2)
else:
rho = draw_posterior_rho(model, states, sigma2, truncate=True)
i += 1
trace[s, 2] = rho
# 3. Gibbs step: draw the variance parameter
sigma2 = draw_posterior_sigma2(model, states, rho)
trace[s, 3] = sigma2
# Calculate the loglikelihood
loglike = model.loglike(np.r_[discount_rate, capital_share, rho, sigma2, meas_vars])
# 4. Metropolis-step for the discount rate
discount_param = trace[s-1, 0]
proposal_param = discount_param + rw_discount.rvs()
proposal_rate = 1 / (1 + (proposal_param / 100))
if proposal_rate < 1:
proposal_loglike = model.loglike(np.r_[proposal_rate, capital_share, rho, sigma2, meas_vars])
acceptance_probability = np.exp(
proposal_loglike - loglike +
prior_discount.logpdf(proposal_param) -
prior_discount.logpdf(discount_param))
if acceptance_probability > uniform.rvs():
discount_param = proposal_param
discount_rate = proposal_rate
loglike = proposal_loglike
trace_accepts[s-1, 0] = 1
trace[s, 0] = discount_param
# 5. Metropolis-step for the capital-share
proposal = capital_share + rw_cap_share.rvs()
if proposal > 0 and proposal < 1:
proposal_loglike = model.loglike(np.r_[discount_rate, proposal, rho, sigma2, meas_vars])
acceptance_probability = np.exp(
proposal_loglike - loglike +
prior_cap_share.logpdf(proposal) -
prior_cap_share.logpdf(capital_share))
if acceptance_probability > uniform.rvs():
capital_share = proposal
trace_accepts[s-1, 1] = 1
loglike = proposal_loglike
trace[s, 1] = capital_share
# 6. Metropolis-step for the measurement errors
for i in range(3):
meas_std = meas_vars[i]**0.5
proposal = meas_std + rw_meas_err.rvs()
proposal_vars = meas_vars.copy()
proposal_vars[i] = proposal**2
if proposal > 0:
proposal_loglike = model.loglike(np.r_[discount_rate, capital_share, rho, sigma2, proposal_vars])
acceptance_probability = np.exp(
proposal_loglike - loglike +
prior_meas_err.logpdf(proposal) -
prior_meas_err.logpdf(meas_std))
if acceptance_probability > uniform.rvs():
meas_std = proposal
trace_accepts[s-1, 2+i] = 1
loglike = proposal_loglike
meas_vars[i] = proposal_vars[i]
trace[s, 4+i] = meas_std