Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/abacusagent/modules/cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from abacusagent.modules.submodules.cube import abacus_cal_elf as _abacus_cal_elf
from abacusagent.modules.submodules.cube import abacus_cal_charge_density_difference as _abacus_cal_charge_density_difference
from abacusagent.modules.submodules.cube import abacus_cal_spin_density as _abacus_cal_spin_density

@mcp.tool()
def abacus_cal_elf(abacus_inputs_dir: Path):
"""
Expand Down
78 changes: 11 additions & 67 deletions src/abacusagent/modules/submodules/cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
"""
import os
from pathlib import Path
from typing import Literal, Optional, TypedDict, Dict, Any, List, Tuple, Union
from itertools import groupby
from typing import Optional, Dict, Any, List

from ase.data import chemical_symbols
from abacustest.lib_prepare.abacus import AbacusStru, ReadInput, WriteInput
from abacustest import AbacusSTRU
from abacustest.lib_prepare.abacus import ReadInput, WriteInput
from abacustest.lib_model.comm import check_abacus_inputs

from abacusagent.modules.util.comm import run_abacus, generate_work_path, link_abacusjob, collect_metrics
from abacusagent.modules.util.cube_manipulator import read_gaussian_cube, axpy, write_gaussian_cube, profile1d
from abacusagent.modules.util.comm import run_abacus, generate_work_path, link_abacusjob
from abacusagent.modules.util.cube_manipulator import read_gaussian_cube, axpy, write_gaussian_cube

def abacus_cal_elf(abacus_inputs_dir: Path):
"""
Expand Down Expand Up @@ -61,37 +60,6 @@ def abacus_cal_elf(abacus_inputs_dir: Path):
except Exception as e:
return {'message': f"Calculating electron localization function failed: {e}"}

def get_subsys_pp_orb(stru: AbacusStru,
subsys_atom_index: List[int]
) -> Tuple[str, str]:
"""
Get the pseudopotential and orbital files for a subsystem.

Args:
stru (AbacusStru): The structure of the full system.
subsys_atom_index (List[int]): Atom indices of the subsystem.

Returns:
Tuple[str, str, str]: Paths to the pseudopotential and orbital files, and labels of different kinds for the subsystem.
"""
pp_list, orb_list = stru.get_pp(), stru.get_orb()
element_indices = [key for key, _ in groupby(stru.get_element())]
elements = [chemical_symbols[i] for i in element_indices]
pp_dict, orb_dict = dict(zip(elements, pp_list)), dict(zip(elements, orb_list))

subsys_elements = [chemical_symbols[stru.get_element()[i]] for i in subsys_atom_index]
subsys_pp, subsys_orb = [], []
for element in subsys_elements:
if pp_dict[element] not in subsys_pp:
subsys_pp.append(pp_dict[element])
if orb_dict[element] not in subsys_orb:
subsys_orb.append(orb_dict[element])

label_list = stru.get_label()
subsys_label = [label_list[idx] for idx in subsys_atom_index]

return subsys_pp, subsys_orb, subsys_label

def get_total_charge_density(abacus_inputs_dir: Path):
"""
Get charge density for non spin-polarized case and total charge density for spin-polarized case.
Expand Down Expand Up @@ -150,46 +118,20 @@ def abacus_cal_charge_density_difference(

input_params = ReadInput(os.path.join(full_system_jobpath, "INPUT"))
full_system_stru_file = os.path.join(full_system_jobpath, input_params.get('stru_file', 'STRU'))
full_system_stru = AbacusStru.ReadStru(full_system_stru_file)
full_system_stru = AbacusSTRU.read(full_system_stru_file)

# Prepare labels, coordinates, pp and orbital settings needed to generate STRU file for every subsystems
subsys2_atom_index = [i for i in range(full_system_stru.get_natoms()) if i not in subsys1_atom_index]
subsys2_atom_index = [i for i in range(full_system_stru.natoms) if i not in subsys1_atom_index]
if len(subsys1_atom_index) is None:
raise ValueError("Subsystem 1 have no atoms! Aborting calculating charge density difference")
if len(subsys2_atom_index) is None:
raise ValueError("Subsystem 2 have no atoms! Aborting calculating charge density difference")

subsys1_stru_file = os.path.join(work_path, f"subsys1/{input_params.get('stru_file', 'STRU')}")
subsys2_stru_file = os.path.join(work_path, f"subsys2/{input_params.get('stru_file', 'STRU')}")
subsys1_pp, subsys1_orb, subsys1_label = get_subsys_pp_orb(full_system_stru, subsys1_atom_index)
subsys2_pp, subsys2_orb, subsys2_label = get_subsys_pp_orb(full_system_stru, subsys2_atom_index)

subsys1_coord, subsys2_coord = [], []
full_system_stru_coord = full_system_stru.get_coord()
for i in range(full_system_stru.get_natoms()):
if i in subsys1_atom_index:
subsys1_coord.append(full_system_stru_coord[i])
elif i in subsys2_atom_index:
subsys2_coord.append(full_system_stru_coord[i])
else:
raise ValueError(f"Atom {i} does not belong to neither subsystem1 nor subsystem2")

subsys1_stru = AbacusStru(label=subsys1_label,
cell=full_system_stru.get_cell(),
coord=subsys1_coord,
lattice_constant=full_system_stru.get_stru()['lat'],
pp=subsys1_pp,
orb=subsys1_orb,
cartesian=True)
subsys1_stru = full_system_stru.create_subset(subsys1_atom_index)
subsys2_stru = full_system_stru.create_subset(subsys2_atom_index)
subsys1_stru.write(subsys1_stru_file)

subsys2_stru = AbacusStru(label=subsys2_label,
cell=full_system_stru.get_cell(),
coord=subsys2_coord,
lattice_constant=full_system_stru.get_stru()['lat'],
pp=subsys2_pp,
orb=subsys2_orb,
cartesian=True)
subsys2_stru.write(subsys2_stru_file)

# Modify INPUT file to output cube file needed for calculating charge density difference
Expand Down Expand Up @@ -219,6 +161,8 @@ def abacus_cal_charge_density_difference(
return {'charge_density_diff_work_path': Path(work_path).absolute(),
'charge_density_difference_cube_file': chg_dens_diff_cube_file}
except Exception as e:
import traceback
traceback.print_exc()
return {'message': f'Calculaing charge density difference failed: {e}'}

def abacus_cal_spin_density(
Expand Down
95 changes: 95 additions & 0 deletions tests/integrate_test/test_cube.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import os
import shutil
from pathlib import Path
import unittest
import tempfile
import pytest
import inspect
from abacusagent.modules.cube import (
abacus_cal_elf,
abacus_cal_charge_density_difference,
)
from utils import initilize_test_env, load_test_ref_result

initilize_test_env()


class TestCubeCalculation(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.TemporaryDirectory()
self.addCleanup(self.test_dir.cleanup)
self.test_path = Path(self.test_dir.name)
self.abacus_inputs_dir_si_prim = (
Path(__file__).parent / "abacus_inputs_dirs/Si-prim/"
)
self.abacus_inputs_dir_h2 = Path(__file__).parent / "abacus_inputs_dirs/H2/"
self.stru_scf = self.abacus_inputs_dir_si_prim / "STRU_scf"

self.original_cwd = os.getcwd()
os.chdir(self.test_path)

def tearDown(self):
os.chdir(self.original_cwd)

@pytest.mark.smoke
def test_abacus_cal_elf_si_prim(self):
"""
Test the abacus_cal_elf function with Si primitive cell.
"""
test_func_name = inspect.currentframe().f_code.co_name

test_work_dir = self.test_path / test_func_name
shutil.copytree(self.abacus_inputs_dir_si_prim, test_work_dir)
shutil.copy2(self.abacus_inputs_dir_si_prim / "STRU_scf", test_work_dir / "STRU")

outputs = abacus_cal_elf(test_work_dir)

elf_file = outputs["elf_file"]
self.assertIsInstance(elf_file, Path)
self.assertTrue(elf_file.exists())
self.assertTrue(outputs["elf_work_path"].exists())

# Test that the ELF calculation ran successfully
self.assertTrue(outputs["elf_work_path"].joinpath("OUT.ABACUS", "ELF.cube").exists())

def test_abacus_cal_charge_density_difference_si_prim(self):
"""
Test the abacus_cal_charge_density_difference function with Si primitive cell.
"""
test_func_name = inspect.currentframe().f_code.co_name

test_work_dir = self.test_path / test_func_name
shutil.copytree(self.abacus_inputs_dir_si_prim, test_work_dir)
shutil.copy2(self.abacus_inputs_dir_si_prim / "STRU_scf", test_work_dir / "STRU")

outputs = abacus_cal_charge_density_difference(test_work_dir, subsys1_atom_index=[1])
print(outputs)

charge_density_diff_file = outputs["charge_density_difference_cube_file"]
self.assertIsInstance(charge_density_diff_file, Path)
self.assertTrue(charge_density_diff_file.exists())
self.assertTrue(outputs["charge_density_diff_work_path"].exists())

# Test that the charge density difference calculation ran successfully
self.assertTrue(outputs["charge_density_diff_work_path"].joinpath("chg_density_diff.cube").exists())

def test_abacus_cal_charge_density_difference_h2(self):
"""
Test the abacus_cal_charge_density_difference function with H2 molecule.
"""
test_func_name = inspect.currentframe().f_code.co_name

test_work_dir = self.test_path / test_func_name
shutil.copytree(self.abacus_inputs_dir_h2, test_work_dir)
shutil.copy2(self.abacus_inputs_dir_h2 / "STRU_relaxed", test_work_dir / "STRU")

outputs = abacus_cal_charge_density_difference(test_work_dir, subsys1_atom_index=[1])
print(outputs)

charge_density_diff_file = outputs["charge_density_difference_cube_file"]
self.assertIsInstance(charge_density_diff_file, Path)
self.assertTrue(charge_density_diff_file.exists())
self.assertTrue(outputs["charge_density_diff_work_path"].exists())

# Test that the charge density difference calculation ran successfully
self.assertTrue(outputs["charge_density_diff_work_path"].joinpath("chg_density_diff.cube").exists())