Extract converged MSD values for all(mean of unique sites), max and mean @ 300 and 600K

Extract converged MSD values for all(mean of unique sites), max and mean @ 300 and 600K#

import pickle
import json
import pandas as pd
import numpy as np
from pymatgen.core import Structure
from pymatgen.analysis.structure_matcher import StructureMatcher
from pymatgen.symmetry.analyzer import SpacegroupAnalyzer
def mapp_phonon_pr_to_lob_pr(mpid, lobster_calc_path="", phononpy_calc_path="", **kwargs):
    """
    Mapps sites of phonopy primitive cell to primitive cell used for LOBSTER calculation

    Args:
        mpid: name of the calculation directory
        phononpy_calc_path: parent directory where phonopy calculation files are stored for all mpids
        lobster_calc_path: parent directory where LOBSTER calculation files are stored for all mpids
        **kwargs: kwargs for StructureMatcher class of pymatgen 

    Returns:
        Dict with phonopy primitive bonds mapped to icoxxs for equivalent bonds of lobster primitive and a bool indicating if mapping was successfull
    """


    lob_st = Structure.from_file(f"{lobster_calc_path}/{mpid}/CONTCAR.gz")
    phonopy_primitive = Structure.from_file(f"{phononpy_calc_path}/{mpid}/POSCAR")


    # wrap coordinates to unit cell of phonon primitive structure
    phonopy_primitive = Structure(lattice=phonopy_primitive.lattice,
         species=phonopy_primitive.species,
         coords=phonopy_primitive.frac_coords,
         to_unit_cell=True)
    
    # initialize structure matcher
    default_kwargs = {"primitive_cell": False, "scale": False, "attempt_supercell": False, "ltol": 0.2, "stol": 0.3, "angle_tol": 5}
    if kwargs:
        default_kwargs.update(kwargs) 
    sm = StructureMatcher(**default_kwargs)
    
    # get sc matrix and translation
    sc_mat, shift , mapping = sm.get_transformation(phonopy_primitive, lob_st)
    M_inv = np.linalg.inv(sc_mat)

    # check if values in shift are close to zero, if yes then set it explicitly to zero
    shift = np.array([i if not np.isclose(i, 0, atol=1e-5) else 0 for i in shift])

    
    # set a mapping dict of phonopy primitive atoms as keys and equivalent atoms in lobster primitive atoms as values
    mapping_dict = {ix: i for ix, i in enumerate(mapping)}


    site_matches = []
    for ix, i in enumerate(mapping):
        new_basis_coord = np.mod(np.dot(M_inv.T, lob_st.frac_coords[i]) + shift, 1)
        
        site_match = np.allclose(phonopy_primitive.frac_coords[ix], new_basis_coord, atol=0.02)

        site_matches.append(site_match)

        
    return all(site_matches), mapping_dict
def map_msd_data(
    mpid: str,
    lobster_path: str,
    phonon_path: str,
    msd_data: dict,
):
    """
    Maps phonon sites to LOBSTER sites and extracts mean squared displacement data.

    Parameters
    ----------
    mpid : str
        Materials Project ID.
    lobster_path : str
        Path to the LOBSTER calculation directory.
    phonon_path : str
        Path to the phonon calculation directory.
    msd_data : dict
        Dictionary containing phonon/thermal displacement data.

    Returns
    -------
    mapped_data : dict
        Dictionary with mapped msd data for inequivalent sites at 300 and 600K.
    """

    # Map phonon sites to lobster sites
    all_mapped, mapping_dict = mapp_phonon_pr_to_lob_pr(
        mpid=mpid,
        lobster_calc_path=lobster_path,
        phononpy_calc_path=phonon_path,
    )

    # Load structures
    ph_struct = Structure.from_file(f"{phonon_path}/{mpid}/POSCAR")
    lob_struct = Structure.from_file(f"{lobster_path}/{mpid}/CONTCAR.gz")

    # Symmetry analysis
    sga_lob = SpacegroupAnalyzer(structure=lob_struct)
    inequivalent_sites = {
        ix: site for ix, site in enumerate(sga_lob.get_symmetry_dataset().equivalent_atoms)
    }

    sg_ph_struct = ph_struct.get_space_group_info()[-1]
    sg_lob_struct = lob_struct.get_space_group_info()[-1]

    mapped_data={mpid : {}}
    
    if all_mapped and sg_lob_struct == sg_ph_struct:
        converged_at_mesh = tdisp_data[mpid][0.1]["converged_at"]
        index_mesh = tdisp_data[mpid][0.1]["mesh"].index(converged_at_mesh)

        for temp in (300, 600):
            mapped_data[mpid].update({temp : {}})
            tdisp_temp  = tdisp_data[mpid][0.1]["tdisp_sites_mean"][temp]

            for ph_site, lob_site in mapping_dict.items():
                equ_site = inequivalent_sites.get(lob_site)
                tdip_val = tdisp_temp[ph_struct.species[ph_site].symbol + str(ph_site + 1)][index_mesh]
                
                if equ_site not in mapped_data[mpid][temp]:
                    mapped_data[mpid][temp][equ_site] = [tdip_val]
                else:
                    mapped_data[mpid][temp][equ_site].append(tdip_val)

    return mapped_data

Load the save convergence_data.pkl obtained after running the msd_convergence.ipynb script

with open("convergence_data.pkl", "rb") as f:
    msd_data = pickle.load(f)
with open("msd_stable_mpids.txt", "r", encoding="utf-8") as f:
    stable_mpids = [line.rstrip("\n") for line in f]

Adjust the lobster_path and phonon_path as per locations where you have stored the files

lobster_path = "parent/path/to/all/lobster/calcs/"
phonon_path = "example_phonon_db_files/phonopy_fc"
# Initialize empty dict to store converged msd data
all_data = {}
for mpid in stable_mpids:
    mapped_data = map_msd_data(mpid=mpid, lobster_path=lobster_path, phonon_path=phonon_path, tdisp_data=msd_data)
    all_data.update(mapped_data)
with open("all_data.json", "w") as f:
    json.dump(all_data, f)
# Initialize empty dicts to store converged msd data seperately i.e all sites, mean and max for each temperature
msd_300_all = {}
msd_600_all = {}
msd_300_mean = {}
msd_600_mean = {}
msd_300_max = {}
msd_600_max = {}
for mpid in all_data:
    if all_data[mpid]:
        for temp in (300, 600):
            if temp == 300:
                site_msd_300 = []
                for site in all_data[mpid][temp]:
                    msd_300_all[f"{mpid}_{site}"] = np.log10(np.mean(all_data[mpid][temp][site])) # mean of msd at unique site
                    site_msd_300.extend(all_data[mpid][temp][site])
                msd_300_mean[mpid] = np.log10(np.mean(site_msd_300)) # mean of msd from all sites in a structure
                msd_300_max[mpid] = np.log10(np.max(site_msd_300)) # max of msd from all sites in a structure
            else:
                site_msd_600 = []
                for site in all_data[mpid][temp]:
                    msd_600_all[f"{mpid}_{site}"] = np.log10(np.mean(all_data[mpid][temp][site])) # mean of msd at unique site
                    site_msd_600.extend(all_data[mpid][temp][site])
                msd_600_mean[mpid] = np.log10(np.mean(site_msd_600)) # mean of msd from all sites in a structure
                msd_600_max[mpid] = np.log10(np.max(site_msd_600)) # max of msd from all sites in a structure
                    
df_log_msd_300_all = pd.DataFrame.from_dict(data=msd_300_all, columns=["log_msd_all_300"], orient="index")
df_log_msd_600_all = pd.DataFrame.from_dict(data=msd_600_all, columns=["log_msd_all_600"], orient="index")
df_log_msd_300_mean = pd.DataFrame.from_dict(data=msd_300_mean, columns=["log_msd_mean_300"], orient="index")
df_log_msd_600_mean = pd.DataFrame.from_dict(data=msd_600_mean, columns=["log_msd_mean_600"], orient="index")
df_log_msd_300_max = pd.DataFrame.from_dict(data=msd_300_max, columns=["log_msd_max_300"], orient="index")
df_log_msd_600_max = pd.DataFrame.from_dict(data=msd_600_max, columns=["log_msd_max_600"], orient="index")
# Save msd datasets
df_log_msd_300_all.to_json("log_msd_all_300.json")
df_log_msd_600_all.to_json("log_msd_all_600.json")
df_log_msd_300_mean.to_json("log_msd_mean_300.json")
df_log_msd_600_mean.to_json("log_msd_mean_600.json")
df_log_msd_300_max.to_json("log_msd_max_300.json")
df_log_msd_600_max.to_json("log_msd_max_600.json")