Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
gmatteo committed Oct 24, 2023
1 parent 6865204 commit ff62bc4
Showing 1 changed file with 75 additions and 34 deletions.
109 changes: 75 additions & 34 deletions abipy/dynamics/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ def to_string(self, verbose: int = 0) -> str:
app(pd.Series(self.get_params_dict()).to_string())
if verbose:
app(self.structure.spget_summary(verbose=verbose))
app("\n")

return "\n".join(lines)

Expand Down Expand Up @@ -491,15 +492,15 @@ def get_sqdt_symbol(self, symbol: str, it0: int = 0, atom_inds=None) -> np.array

return sqdt / (count + 1)

def get_msq_tt0_symbol(self, symbol: str, tmax: float, atom_inds=None, nprocs=None) -> Msdtt0:
def get_msdtt0_symbol(self, symbol: str, tmax: float, atom_inds=None, nprocs=None) -> Msdtt0:
"""
"""
index_tmax, _ = self.get_it_ts(tmax)
iatoms = self.iatoms_with_symbol(symbol, atom_inds=atom_inds)
tac = self.pos_atc[iatoms].transpose(1, 0, 2).copy()
msd_tt0 = msd_tt0_from_tac(tac, index_tmax, nprocs=nprocs)
arr_tt0 = msd_tt0_from_tac(tac, index_tmax, nprocs=nprocs)

return Msdtt0(msd_tt0=msd_tt0, mda=self, index_tmax=index_tmax, symbol=symbol)
return Msdtt0(arr_tt0=arr_tt0, mda=self, index_tmax=index_tmax, symbol=symbol)

#def export_msdt(self, filename: str) -> None:
# """
Expand Down Expand Up @@ -698,7 +699,7 @@ class Msdtt0:
"""
index_tmax: int
symbol: str
msd_tt0: np.ndarray
arr_tt0: np.ndarray
mda: MdAnalyzer

@property
Expand All @@ -709,6 +710,37 @@ def times(self):
def temperature(self):
return self.mda.temperature

@lazy_property
def msd_t(self) -> np.ndarray:
return np.mean(self.arr_tt0, axis=1)

def __str__(self) -> str:
return self.to_string()

def to_string(self, verbose=0) -> str:
"""String representation with verbosity level verbose."""
lines = []
app = lines.append
r = self.get_linfit_results()
app(r.label)
return "\n".join(lines)

def get_linfit_results(self):
"""
Perform linear fit. Return namedtuple with results.
"""
t_start = self.mda.nt - self.index_tmax
ts = self.times[t_start:] - self.times[t_start]
from scipy.stats import linregress
fit = linregress(ts, self.msd_t)
naive_d = fit.slope * Ang2PsTocm2S / 6

# TODO: Improve format.
dw = f"D = {naive_d:.2E} cm$^2$/s"
#label = r"Linear fit $\alpha={:.2f}$, $r^2$={:.2f}".format(fit.slope, fit.rvalue**2)
label = r"Linear fit a la Waroquieres: {}, $r^2$={:.2f}".format(dw, fit.rvalue**2)
return dict2namedtuple(naive_d=naive_d, fit=fit, label=label)

@add_fig_kwargs
def plot(self, ax=None, xy_log=None, fontsize=30, xlims=None, **kwargs) -> Figure:
"""
Expand All @@ -720,23 +752,19 @@ def plot(self, ax=None, xy_log=None, fontsize=30, xlims=None, **kwargs) -> Figur
xlims: Set the data limits for the x-axis. Accept tuple e.g. ``(left, right)``
or scalar e.g. ``left``. If left (right) is None, default values are used
"""
msd_t = np.mean(self.msd_tt0, axis=1)
index_tmax = self.index_tmax
t_start = self.mda.nt - index_tmax
ts = self.times[t_start:] - self.times[t_start]

ax, fig, plt = get_ax_fig_plt(ax=ax)

ax.plot(ts, msd_t,
ax.plot(ts, self.msd_t,
label=self.symbol + ' <msd($t, t_0$)>$\{$t_0$\}$, t = [0, ' + str(int(self.times[index_tmax]))+ ' ps]',
color=self.mda.color_symbol[self.symbol],
)

from scipy.stats import linregress
fit = linregress(ts, msd_t)
naive_d = fit.slope * Ang2PsTocm2S / 6
label = r"Linear fit $\alpha={:.2f}$, $r^2$={:.2f}".format(fit.slope, fit.rvalue**2)
ax.plot(ts, fit.slope*ts + fit.intercept, label=label)
# Linear fit.
r = self.get_linfit_results()
ax.plot(ts, r.fit.slope*ts + r.fit.intercept, label=r.label)

set_axlims(ax, xlims, "x")
ax.legend(fontsize=16, loc=2)
Expand All @@ -754,7 +782,7 @@ def get_sigma_berend(self, t1, t2, nblock_step=1, tot_block=1000) -> SigmaBerend
# choose the time elapsed
it1, _ = sfind_ind_val(self.times, t1)
it2, _ = sfind_ind_val(self.times, t2)
size_t = self.msd_tt0.shape[0]
size_t = self.arr_tt0.shape[0]

if it1 >= it2:
raise ValueError(f"For input {t1=} and {t2=}, got {it1=} >= {it2=}")
Expand All @@ -763,10 +791,10 @@ def get_sigma_berend(self, t1, t2, nblock_step=1, tot_block=1000) -> SigmaBerend
if it2 >= size_t:
raise ValueError(f"For input {t2=}, got {it2=} >= {size_t=}")

block_sizes1, sigmas1, delta_sigmas1 = sigma_berend(nblock_step, tot_block, self.msd_tt0[it1,:])
block_sizes2, sigmas2, delta_sigmas2 = sigma_berend(nblock_step, tot_block, self.msd_tt0[it2,:])
block_sizes1, sigmas1, delta_sigmas1 = sigma_berend(nblock_step, tot_block, self.arr_tt0[it1,:])
block_sizes2, sigmas2, delta_sigmas2 = sigma_berend(nblock_step, tot_block, self.arr_tt0[it2,:])

# Build SigmaBerend instance from locals dict.
# Build instance from locals dict.
mda = self.mda
latex_formula = mda.latex_formula
temperature = mda.temperature
Expand Down Expand Up @@ -809,7 +837,7 @@ def get_diffusion_with_sigma(self,
qDataInBlock = size1 - mDataInBlock*it1

# and find error for anytime
msd_tt0 = self.msd_tt0
msd_tt0 = self.arr_tt0
err_msd = np.zeros(msd_tt0.shape[0], dtype=float)
for t in range(msd_tt0.shape[0]):
err_msd[t] = abs(mSigma * t + qSigma)
Expand Down Expand Up @@ -852,31 +880,30 @@ def get_diffusion_with_sigma(self,
err_conductivity = e2s / kbs * ncarriers / avg_volume / temperature * err_diffusion_coeff * 1.e09

print(f"{ncarriers=} for {symbol=}")
print('{:.2E}'.format(best_angcoeff*Ang2PsTocm2S/6,2))
print('{:.2E}'.format(best_angcoeff*Ang2PsTocm2S/6, 2))
print(best_angcoeff)
print(max_angcoeff)
print(min_angcoeff)

# Build DiffusionData instance from locals dict.
# Build instance from locals dict.
data = locals()
return DiffusionData(**{k: data[k] for k in [field.name for field in dataclasses.fields(DiffusionData)]})

"""
outMSD_file.write('timeStepJump = ' + str(timeStepJump) + '\n')
outMSD_file.write('time, cel and pos were cut before ' + str(timeArray[0]+tInitial)+ 'ps' + '\n')
outMSD_file.write('t elapsed max is ' + str(estimatedTMax)+ 'ps' + '\n')
outMSD_file.write('trajectory length is ' + str(timeArrayTmp[timeArrayTmp.shape[0]-1])+ 'ps' + '\n')
outMSD_file.write('error on msd(t) evaluated at ' + str(estimatedFirstTElapsed) + 'ps' + ' and ' + str(estimatedSecondTElapsed) + 'ps' + '\n')
outMSD_file.write('evaluated n. of blocks at ' + str(estimatedFirstTElapsed) + 'ps' + ' is ' + str(block_size1) + '\n')
outMSD_file.write('evaluated n. of blocks at ' + str(estimatedSecondTElapsed) + 'ps' + ' is ' + str(block_size2) + '\n')
outMSD_file.write('number of decorrelated msd(t) data that we fit: ' + str(timeArrScorrelated.shape[0]) + '\n')
outDiff_file.write(str(temperature) + ' ' + str(diffusion_coeff) + ' ' + str(err_diffusion_coeff) + ' ' +str(volumeArrayTmp[0])+ '\n')
('timeStepJump = ' + str(timeStepJump) + '\n')
('time, cel and pos were cut before ' + str(timeArray[0]+tInitial)+ 'ps' + '\n')
('t elapsed max is ' + str(estimatedTMax)+ 'ps' + '\n')
('trajectory length is ' + str(timeArrayTmp[timeArrayTmp.shape[0]-1])+ 'ps' + '\n')
('error on msd(t) evaluated at ' + str(estimatedFirstTElapsed) + 'ps' + ' and ' + str(estimatedSecondTElapsed) + 'ps' + '\n')
('evaluated n. of blocks at ' + str(estimatedFirstTElapsed) + 'ps' + ' is ' + str(block_size1) + '\n')
('evaluated n. of blocks at ' + str(estimatedSecondTElapsed) + 'ps' + ' is ' + str(block_size2) + '\n')
('number of decorrelated msd(t) data that we fit: ' + str(timeArrScorrelated.shape[0]) + '\n')
"""

@add_fig_kwargs
def plot_mat(self, cmap="jet", fontsize=8, ax=None, **kwargs) -> Figure:
ax, fig, plt = get_ax_fig_plt(ax=ax)
im = ax.matshow(self.msd_tt0, cmap=cmap)
im = ax.matshow(self.arr_tt0, cmap=cmap)
fig.colorbar(im, ax=ax)
ax.set_xlabel('t0 (ps)', fontsize=fontsize)
ax.set_ylabel('t (ps)', fontsize=fontsize)
Expand All @@ -886,7 +913,20 @@ def plot_mat(self, cmap="jet", fontsize=8, ax=None, **kwargs) -> Figure:


class Msdtt0List(list):
"""A list of Msdtt0 objects."""
"""
A list of Msdtt0 objects.
"""

def __str__(self) -> str:
return self.to_string()

def to_string(self, verbose=0) -> str:
"""String representation with verbosity level verbose."""
lines = []
app = lines.append
for i, obj in enumerate(self):
app(obj.to_string(verbose=verbose))
return "\n".join(lines)

@add_fig_kwargs
def plot(self, sharex=True, sharey=True, **kwargs) -> Figure:
Expand All @@ -897,7 +937,8 @@ def plot(self, sharex=True, sharey=True, **kwargs) -> Figure:
if len(self) % ncols != 0: ax_list[-1].axis("off")

for ix, (obj, ax) in enumerate(zip(self, ax_list)):
obj.plot(ax=ax, show=False)
obj.plot(ax=ax, show=False, **kwargs)

return fig


Expand Down Expand Up @@ -1221,10 +1262,10 @@ def iter_mdatc(self):
for itemp, mda in enumerate(self):
yield mda, mda.temperature, self.temp_cmap(float(itemp) / len(self))

def get_msq_tt0_symbol(self, symbol: str, tmax: float, atom_inds=None, nprocs=None) -> Msdtt0List:
def get_msdtt0_symbol(self, symbol: str, tmax: float, atom_inds=None, nprocs=None) -> Msdtt0List:
msdtt0_list = Msdtt0List()
for mda in self:
obj = mda.get_msq_tt0_symbol(symbol, tmax, atom_inds=atom_inds, nprocs=nprocs)
obj = mda.get_msdtt0_symbol(symbol, tmax, atom_inds=atom_inds, nprocs=nprocs)
msdtt0_list.append(obj)

return msdtt0_list
Expand Down Expand Up @@ -1252,7 +1293,7 @@ def _nrows_ncols_nplots(self, size=None):
# data_list = DiffusionDataList()
# for it, ((mda, temp), params) in enumerate(zip(self.iter_mdat(), params_list)):
# p = AttrDict(**params)
# msq_tt0 = mda.get_msq_tt0_symbol(symbol, p.tmax, nprocs=nprocs)
# msq_tt0 = mda.get_msdtt0_symbol(symbol, p.tmax, nprocs=nprocs)
# sigma = msq_tt0.get_sigma_berend(t1=p.t1, t2=p.t2)
# data_t = msq_tt0.get_diffusion_with_sigma(p.fit_time_start, p.fit_time_stop, p.block_size1, p.block_size2, sigma)
# data_list.append(data_t)
Expand Down

0 comments on commit ff62bc4

Please sign in to comment.