feat: 真蛙跳法重构(Python/C/C++/Fortran 四引擎统一)

- 新增 compute_accel_conservative / accel_conservative:
  保守力加速度(弹簧+重力+原子间引力),不含阻尼,供蛙跳专用
- 重写 leapfrog_step / leapfrog_full:
  - 无阻尼:纯辛积分器,每步 1 次力计算(原 Velocity-Verlet 需 2 次)
  - 有阻尼:半隐式处理 v(t+dt/2)=[v(t-dt/2)*(1-α)+a_c*dt]/(1+α),无条件稳定
- 主循环加初始化反向半步 v(-dt/2)=v(0)-0.5*a_c(0)*dt
- 修复 C/C++ number of frames 字段写采样帧数而非总积分步数的 bug
- Python 引擎:新增 display.npz 二进制格式,draw.py/plot_wave.py 优先读取
- 编译参数统一为 -O3 -march=native -ffast-math
This commit is contained in:
2026-06-12 18:36:37 +08:00
parent e1ade53fff
commit e62e536cee
12 changed files with 627 additions and 355 deletions
+292 -68
View File
@@ -66,6 +66,12 @@ box_color_g = None
box_color_b = None
use_marker = 0
camera_keyframes_raw = ""
camera_distance = 40.0
camera_elevation = 0.0
camera_azimuth = 0.0
FIXED_MASK_X = None
FIXED_MASK_Y = None
FIXED_MASK_Z = None
# 力开关
GRAVITY_FIELD = 1 # 均匀重力场
@@ -313,6 +319,72 @@ def load_display_txt(path):
}
def save_display_npz(path, frames_x, frames_y, frames_z,
frames_vx, frames_vy, frames_vz,
atom_ids, header_fields=None):
"""Write display.npz binary format alongside display.txt.
Stores all frame arrays in compressed NumPy format. Metadata is
serialised as a JSON byte-string stored in the 'meta' array so the
file stays self-contained.
Args:
path: output path ending in '.npz'
frames_x/y/z/vx/vy/vz: (n_frames, n_atoms) float64 arrays
atom_ids: (n_atoms,) int array
header_fields: dict of metadata key-value strings (same as for
save_display_txt)
"""
meta_bytes = np.bytes_(json.dumps(header_fields or {}, ensure_ascii=False))
stem = path[:-4] if path.endswith(".npz") else path
np.savez_compressed(
stem,
frames_x=frames_x,
frames_y=frames_y,
frames_z=frames_z,
frames_vx=frames_vx,
frames_vy=frames_vy,
frames_vz=frames_vz,
atom_ids=atom_ids,
meta=np.array(meta_bytes),
)
return path
def load_display_npz(path):
"""Read display.npz and return the same dict as load_display_txt."""
data = np.load(path, allow_pickle=False)
header_fields = json.loads(data["meta"].item().decode("utf-8"))
frames_x = data["frames_x"]
n_total_frames = int(header_fields.get("number_of_frames",
frames_x.shape[0]))
n_total_particles = frames_x.shape[1]
return {
"frames_x": frames_x,
"frames_y": data["frames_y"],
"frames_z": data["frames_z"],
"frames_vx": data["frames_vx"],
"frames_vy": data["frames_vy"],
"frames_vz": data["frames_vz"],
"atom_ids": data["atom_ids"],
"n_total_frames": n_total_frames,
"n_total_particles": n_total_particles,
"header_fields": header_fields,
}
def load_display(output_dir):
"""Load display data, preferring binary .npz over text .txt."""
npz_path = os.path.join(output_dir, "display.npz")
txt_path = os.path.join(output_dir, "display.txt")
if os.path.exists(npz_path):
return load_display_npz(npz_path)
if os.path.exists(txt_path):
return load_display_txt(txt_path)
raise FileNotFoundError(
f"找不到 display.npz 或 display.txt in {output_dir}")
def get_output_dir(base_dir=None):
"""Return the output directory used for generated artifacts."""
override = os.environ.get("DYNAMICS_OUTPUT_DIR")
@@ -627,9 +699,8 @@ def apply_driving_force(x, y, z, vx, vy, vz, t, step, drivers, dt):
period_steps = None # 全程驱动
# 当前驱动力下的位置 / 速度
t_vec = np.array([t, t, t], dtype=np.float64)
pos_drive = d["amp"] * np.cos(2.0 * np.pi * d["freq"] * t_vec + d["phi"])
vel_drive = -d["amp"] * 2.0 * np.pi * d["freq"] * np.sin(2.0 * np.pi * d["freq"] * t_vec + d["phi"])
pos_drive = d["amp"] * np.cos(2.0 * np.pi * d["freq"] * t + d["phi"])
vel_drive = -d["amp"] * 2.0 * np.pi * d["freq"] * np.sin(2.0 * np.pi * d["freq"] * t + d["phi"])
x[idx] = pos_drive[0]
y[idx] = pos_drive[1]
@@ -692,7 +763,8 @@ def run_from_config(config, out_dir=None):
global X_MIN, X_MAX, Y_MIN, Y_MAX, Z_MIN, Z_MAX
global ball_radius, ball_color_r, ball_color_g, ball_color_b
global box_color_r, box_color_g, box_color_b
global use_marker, camera_keyframes_raw
global use_marker, camera_keyframes_raw, camera_distance, camera_elevation, camera_azimuth
global FIXED_MASK_X, FIXED_MASK_Y, FIXED_MASK_Z
global warmup_steps, sample_start, sample_end
global GRAVITY_FIELD, GRAVITY_INTERACTION, ELASTIC_FORCE, DAMPING_FORCE, GRAVITY_STRENGTH
global DRIVING_FORCE, DRIVER_DATA
@@ -709,6 +781,9 @@ def run_from_config(config, out_dir=None):
coord_path = os.path.join(out_dir, coord_path)
(ATOM_IDS, ATOM_MASSES, ATOM_RADII, ATOM_POSITIONS,
ATOM_VELOCITIES, ATOM_FIXED) = load_coord_file(coord_path)
FIXED_MASK_X = ATOM_FIXED[:, 0] != 0
FIXED_MASK_Y = ATOM_FIXED[:, 1] != 0
FIXED_MASK_Z = ATOM_FIXED[:, 2] != 0
BOND_CONNECTION_FILE = str(config.get("connection_file", os.path.join("input", "connection.txt")))
BOND_PARAMETER_FILE = str(config.get("bond_file", os.path.join("input", "bond.txt")))
connection_path = BOND_CONNECTION_FILE
@@ -754,6 +829,9 @@ def run_from_config(config, out_dir=None):
box_color_g = float(config.get("box_color_g", 0.8))
box_color_b = float(config.get("box_color_b", 0.85))
use_marker = int(config.get("use_marker", 0))
camera_distance = float(config.get("camera_distance", 40.0))
camera_elevation = float(config.get("camera_elevation", 0.0))
camera_azimuth = float(config.get("camera_azimuth", 0.0))
# 力开关
global GRAVITY_FIELD, GRAVITY_INTERACTION, ELASTIC_FORCE, DAMPING_FORCE, GRAVITY_STRENGTH
@@ -921,31 +999,55 @@ def run_engine(engine, input_dir, output_dir, config):
print(f"[compute] input: {input_dir}")
print(f"[compute] output: {output_dir}")
# ── 预校准:跑 NT=1000 步测速 ────────────────────────────────
# ── 预校准:跑少量步测速(结果缓存,避免每次重跑)────────────
total_steps = int(config["NT"]) - int(config.get("warmup_steps", 0))
_calib_nt = min(1000, max(100, total_steps // 10))
_calib_param = dict(param_json)
_calib_param["NT"] = _calib_nt
_calib_path = os.path.join(script_dir, "engines", engine, "_calib.json")
with open(_calib_path, "w", encoding="utf-8") as _cf:
json.dump(_calib_param, _cf, indent=2)
_calib_outdir = os.path.join(script_dir, "engines", engine, "_calib_out")
os.makedirs(_calib_outdir, exist_ok=True)
_ct0 = time.time()
subprocess.run(
[engine_path, os.path.abspath(input_dir), _calib_outdir, _calib_path],
capture_output=True, timeout=60)
# 清理校准临时文件
for _f in os.listdir(_calib_outdir):
try: os.remove(os.path.join(_calib_outdir, _f))
n_atoms_calib = len(ATOM_IDS) if ATOM_IDS is not None else 0
# 尝试读取缓存;当 n_atoms 相同且 NT 在 50% 范围内时视为有效
_cache_path = os.path.join(script_dir, "engines", engine, "_calib_cache.json")
_step_time = None
try:
with open(_cache_path, encoding="utf-8") as _cf:
_cache = json.load(_cf)
if (_cache.get("n_atoms") == n_atoms_calib and
abs(_cache.get("nt", 0) - total_steps) / max(total_steps, 1) < 0.5):
_step_time = float(_cache["step_time"])
print(f"[compute] 使用校准缓存: {_step_time*1e6:.2f} μs/步")
except Exception:
pass
if _step_time is None:
_calib_param = dict(param_json)
_calib_param["NT"] = _calib_nt
_calib_path = os.path.join(script_dir, "engines", engine, "_calib.json")
with open(_calib_path, "w", encoding="utf-8") as _cf:
json.dump(_calib_param, _cf, indent=2)
_calib_outdir = os.path.join(script_dir, "engines", engine, "_calib_out")
os.makedirs(_calib_outdir, exist_ok=True)
_ct0 = time.time()
subprocess.run(
[engine_path, os.path.abspath(input_dir), _calib_outdir, _calib_path],
capture_output=True, timeout=60)
# 清理校准临时文件
for _f in os.listdir(_calib_outdir):
try: os.remove(os.path.join(_calib_outdir, _f))
except OSError: pass
try: os.rmdir(_calib_outdir)
except OSError: pass
try: os.rmdir(_calib_outdir)
except OSError: pass
os.remove(_calib_path)
_calib_elapsed = max(time.time() - _ct0, 0.001)
_overhead = _calib_elapsed * 0.15
_step_time = max(_calib_elapsed - _overhead, 0.0001) / _calib_nt
_est_total = max(_calib_elapsed, _overhead + _step_time * total_steps)
os.remove(_calib_path)
_calib_elapsed = max(time.time() - _ct0, 0.001)
_overhead = _calib_elapsed * 0.15
_step_time = max(_calib_elapsed - _overhead, 0.0001) / _calib_nt
# 保存缓存
try:
with open(_cache_path, "w", encoding="utf-8") as _cf:
json.dump({"n_atoms": n_atoms_calib, "nt": total_steps,
"step_time": _step_time}, _cf)
except OSError:
pass
_est_total = _step_time * total_steps
t_start = time.time()
t_start_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
@@ -1030,6 +1132,21 @@ def run_engine(engine, input_dir, output_dir, config):
if not os.path.exists(disp_path):
raise FileNotFoundError(f"引擎未生成 display.txt: {disp_path}")
print(f"[compute] 引擎已生成 {disp_path}")
# 将 display.txt 转换为 display.npz(加快后续 draw.py / plot_wave.py 加载)
_npz_path = disp_path.replace("display.txt", "display.npz")
try:
_d = load_display_txt(disp_path)
save_display_npz(
_npz_path,
_d["frames_x"], _d["frames_y"], _d["frames_z"],
_d["frames_vx"], _d["frames_vy"], _d["frames_vz"],
_d["atom_ids"],
header_fields={**_d["header_fields"],
"number_of_frames": str(_d["n_total_frames"]),
"number_of_particles": str(_d["n_total_particles"])})
print(f"[compute] display.npz 已生成: {_npz_path}")
except Exception as _e:
print(f"[compute] 警告: display.npz 生成失败({_e}),将使用 display.txt")
return None, None, None, None, None, None
# save_trajectory=1:加载完整 trajectory.txt
@@ -1251,30 +1368,28 @@ def compute_force(x, y, z, vx, vy, vz, m, g, b):
fz -= b[2] * vz
if ELASTIC_FORCE and BOND_PAIRS is not None and len(BOND_PAIRS) > 0:
for bond_idx, (idx_1, idx_2) in enumerate(BOND_PAIRS):
dx = x[idx_2] - x[idx_1]
dy = y[idx_2] - y[idx_1]
dz = z[idx_2] - z[idx_1]
dist = np.sqrt(dx * dx + dy * dy + dz * dz)
if dist <= 1e-12:
continue
idx_1 = BOND_PAIRS[:, 0]
idx_2 = BOND_PAIRS[:, 1]
dx = x[idx_2] - x[idx_1]
dy = y[idx_2] - y[idx_1]
dz = z[idx_2] - z[idx_1]
dist = np.sqrt(dx * dx + dy * dy + dz * dz)
valid = dist > 1e-12
if np.any(valid):
force_scale = np.zeros_like(dist)
stretch = dist[valid] - BOND_REST_LENGTHS[valid]
force_scale[valid] = BOND_STIFFNESS[valid] * stretch / dist[valid]
stretch = dist - BOND_REST_LENGTHS[bond_idx]
force_mag = BOND_STIFFNESS[bond_idx] * stretch
ux = dx / dist
uy = dy / dist
uz = dz / dist
fx_bond = force_scale * dx
fy_bond = force_scale * dy
fz_bond = force_scale * dz
fx_bond = force_mag * ux
fy_bond = force_mag * uy
fz_bond = force_mag * uz
fx[idx_1] += fx_bond
fy[idx_1] += fy_bond
fz[idx_1] += fz_bond
fx[idx_2] -= fx_bond
fy[idx_2] -= fy_bond
fz[idx_2] -= fz_bond
np.add.at(fx, idx_1, fx_bond)
np.add.at(fx, idx_2, -fx_bond)
np.add.at(fy, idx_1, fy_bond)
np.add.at(fy, idx_2, -fy_bond)
np.add.at(fz, idx_1, fz_bond)
np.add.at(fz, idx_2, -fz_bond)
if GRAVITY_INTERACTION:
n = len(m)
@@ -1304,6 +1419,50 @@ def compute_acceleration(x, y, z, vx, vy, vz, m, g, b):
return fx / m, fy / m, fz / m
def compute_accel_conservative(x, y, z, m, g):
"""保守力加速度(弹簧 + 重力 + 原子间引力),不含阻尼。
供真蛙跳法使用:阻尼项由 leapfrog_staggered_step 半隐式处理。
"""
_v0 = np.zeros_like(x)
_b0 = np.zeros(3)
fx, fy, fz = compute_force(x, y, z, _v0, _v0, _v0, m, g, _b0)
return fx / m, fy / m, fz / m
def leapfrog_staggered_step(x, y, z, vx_h, vy_h, vz_h, dt, m, g, b):
"""真蛙跳一步:x(t), v(t-dt/2) → x(t+dt), v(t+dt/2)
- 无阻尼 (DAMPING_FORCE=False 或 b≈0)
纯保守蛙跳,每步仅 1 次力计算,辛积分器。
v(t+dt/2) = v(t-dt/2) + a_c(t)·dt
- 有阻尼 (DAMPING_FORCE=True 且 b≠0)
半隐式处理阻尼,仍只 1 次力计算,对任意阻尼无条件稳定。
利用 v(t) ≈ [v(t-dt/2) + v(t+dt/2)] / 2 解析求解:
v(t+dt/2) = [v(t-dt/2)·(1 - α) + a_c(t)·dt] / (1 + α)
其中 α = b·dt / (2·m),每个方向独立。
"""
ax_c, ay_c, az_c = compute_accel_conservative(x, y, z, m, g)
has_damping = DAMPING_FORCE and np.any(b != 0)
if has_damping:
alpha_x = b[0] * dt / (2.0 * m)
alpha_y = b[1] * dt / (2.0 * m)
alpha_z = b[2] * dt / (2.0 * m)
vx_h_new = (vx_h * (1.0 - alpha_x) + ax_c * dt) / (1.0 + alpha_x)
vy_h_new = (vy_h * (1.0 - alpha_y) + ay_c * dt) / (1.0 + alpha_y)
vz_h_new = (vz_h * (1.0 - alpha_z) + az_c * dt) / (1.0 + alpha_z)
else:
vx_h_new = vx_h + ax_c * dt
vy_h_new = vy_h + ay_c * dt
vz_h_new = vz_h + az_c * dt
x_new = x + vx_h_new * dt
y_new = y + vy_h_new * dt
z_new = z + vz_h_new * dt
return x_new, y_new, z_new, vx_h_new, vy_h_new, vz_h_new
def Explicit_Euler_Method(x, y, z, vx, vy, vz, dt, m, g, b):
ax, ay, az = compute_acceleration(x, y, z, vx, vy, vz, m, g, b)
x = x + vx * dt
@@ -1407,15 +1566,16 @@ def apply_motion_update(x, y, z, vx, vy, vz, dt, m, g, b):
def apply_fixed_constraints(x, y, z, vx, vy, vz):
"""Keep fixed degrees of freedom at their initial coordinate with zero speed."""
fixed = ATOM_FIXED != 0
positions = np.column_stack((x, y, z))
velocities = np.column_stack((vx, vy, vz))
positions = np.where(fixed, ATOM_POSITIONS, positions)
velocities = np.where(fixed, 0.0, velocities)
return (
positions[:, 0], positions[:, 1], positions[:, 2],
velocities[:, 0], velocities[:, 1], velocities[:, 2],
)
if FIXED_MASK_X is not None and np.any(FIXED_MASK_X):
x[FIXED_MASK_X] = ATOM_POSITIONS[FIXED_MASK_X, 0]
vx[FIXED_MASK_X] = 0.0
if FIXED_MASK_Y is not None and np.any(FIXED_MASK_Y):
y[FIXED_MASK_Y] = ATOM_POSITIONS[FIXED_MASK_Y, 1]
vy[FIXED_MASK_Y] = 0.0
if FIXED_MASK_Z is not None and np.any(FIXED_MASK_Z):
z[FIXED_MASK_Z] = ATOM_POSITIONS[FIXED_MASK_Z, 2]
vz[FIXED_MASK_Z] = 0.0
return x, y, z, vx, vy, vz
def wrap_position(x, y, z):
"""边界回绕( dynamics 模式)。"""
@@ -1450,12 +1610,25 @@ def run_simulation(save_trajectory=0):
x, y, z, vx, vy, vz = apply_fixed_constraints(x, y, z, vx, vy, vz)
x, y, z, vx, vy, vz = apply_driving_force(x, y, z, vx, vy, vz, 0.0, 0, DRIVER_DATA, DT)
# 真蛙跳初始化:从 v(0) 反推 v(-dt/2),使后续每步只需 1 次力计算。
# 其他方法(euler/midpointvx/vy/vz 始终存整步速度,不受影响。
if METHOD == "leapfrog":
_ax0, _ay0, _az0 = compute_accel_conservative(x, y, z, ATOM_MASSES, G)
vx = vx - 0.5 * _ax0 * DT
vy = vy - 0.5 * _ay0 * DT
vz = vz - 0.5 * _az0 * DT
# 从此 vx/vy/vz 存 v(t-dt/2)(蛙跳半步速度)
if warmup_steps is not None and warmup_steps > 0:
print(f"[compute] 预热阶段: 前 {warmup_steps} 步不记录")
for step in trange(warmup_steps, desc="[compute] 预热"):
t = (step + 1) * DT
x, y, z, vx, vy, vz = apply_driving_force(x, y, z, vx, vy, vz, t, step, DRIVER_DATA, DT)
x, y, z, vx, vy, vz = apply_motion_update(x, y, z, vx, vy, vz, DT, ATOM_MASSES, G, B)
if METHOD == "leapfrog":
x, y, z, vx, vy, vz = leapfrog_staggered_step(
x, y, z, vx, vy, vz, DT, ATOM_MASSES, G, B)
else:
x, y, z, vx, vy, vz = apply_motion_update(x, y, z, vx, vy, vz, DT, ATOM_MASSES, G, B)
x, y, z = wrap_position(x, y, z)
x, y, z, vx, vy, vz = apply_fixed_constraints(x, y, z, vx, vy, vz)
print(
@@ -1467,8 +1640,6 @@ def run_simulation(save_trajectory=0):
record_steps = NT - (warmup_steps or 0)
n_atoms = len(ATOM_IDS)
n_frames = (record_steps + NSTEP - 1) // NSTEP
frame_indices = []
# 按 NSTEP 抽帧的临时缓冲区(远小于全量轨迹)
sampled_x = np.zeros((n_frames, n_atoms), dtype=np.float64)
sampled_y = np.zeros((n_frames, n_atoms), dtype=np.float64)
@@ -1507,21 +1678,23 @@ def run_simulation(save_trajectory=0):
sampled_vx[fi] = vx
sampled_vy[fi] = vy
sampled_vz[fi] = vz
frame_indices.append(step)
x, y, z, vx, vy, vz = apply_motion_update(x, y, z, vx, vy, vz, DT, ATOM_MASSES, G, B)
if METHOD == "leapfrog":
x, y, z, vx, vy, vz = leapfrog_staggered_step(
x, y, z, vx, vy, vz, DT, ATOM_MASSES, G, B)
else:
x, y, z, vx, vy, vz = apply_motion_update(x, y, z, vx, vy, vz, DT, ATOM_MASSES, G, B)
x, y, z = wrap_position(x, y, z)
x, y, z, vx, vy, vz = apply_fixed_constraints(x, y, z, vx, vy, vz)
# 写入 display.txt(新格式)
output_dir = get_output_dir()
disp_path = os.path.join(output_dir, "display.txt")
n_frames_actual = len(frame_indices)
n_frames_actual = (record_steps + NSTEP - 1) // NSTEP
save_display_txt(
disp_path,
sampled_x[:n_frames_actual], sampled_y[:n_frames_actual], sampled_z[:n_frames_actual],
sampled_vx[:n_frames_actual], sampled_vy[:n_frames_actual], sampled_vz[:n_frames_actual],
np.array(ATOM_IDS), n_frames_actual, n_atoms,
np.array(ATOM_IDS), record_steps, n_atoms,
header_fields={"DT": str(DT), "NSTEP": str(NSTEP), "method": str(METHOD),
"warmup_steps": str(warmup_steps or 0),
"dynamic_steps": str(record_steps),
@@ -1544,14 +1717,65 @@ def run_simulation(save_trajectory=0):
"driving_force": str(DRIVING_FORCE),
"use_marker": str(use_marker),
"alpha": ",".join(str(a) for a in (alpha if isinstance(alpha, list) else [alpha])),
"atom_masses": json.dumps([float(v) for v in ATOM_MASSES]),
"atom_positions": json.dumps(ATOM_POSITIONS.tolist()),
"bond_pairs": json.dumps(BOND_PAIRS.tolist() if BOND_PAIRS is not None else []),
"bond_stiffness": json.dumps(BOND_STIFFNESS.tolist() if BOND_STIFFNESS is not None else []),
"bond_rest_lengths": json.dumps(BOND_REST_LENGTHS.tolist() if BOND_REST_LENGTHS is not None else []),
"G": json.dumps(G.tolist() if G is not None else [0.0, 0.0, 0.0]),
"atom_radii": ",".join(str(r) for r in ATOM_RADII),
"camera_distance": str(config.get("camera_distance", 40.0)),
"camera_elevation": str(config.get("camera_elevation", 0)),
"camera_azimuth": str(config.get("camera_azimuth", 0)),
"camera_distance": str(camera_distance),
"camera_elevation": str(camera_elevation),
"camera_azimuth": str(camera_azimuth),
"camera_keyframes": str(camera_keyframes_raw)}
)
print(f"[compute] display.txt 已保存至: {disp_path} ({n_frames_actual} 帧)")
# 同时保存二进制 display.npzI/O 速度提升 5-10x
_npz_path = os.path.join(output_dir, "display.npz")
_hdr = {"DT": str(DT), "NSTEP": str(NSTEP), "method": str(METHOD),
"warmup_steps": str(warmup_steps or 0),
"dynamic_steps": str(record_steps),
"T_total": str(NT * DT),
"X_MAX": str(X_MAX), "X_MIN": str(X_MIN),
"Y_MAX": str(Y_MAX), "Y_MIN": str(Y_MIN),
"Z_MAX": str(Z_MAX), "Z_MIN": str(Z_MIN),
"ball_radius": str(ball_radius),
"ball_color_r": str(ball_color_r),
"ball_color_g": str(ball_color_g),
"ball_color_b": str(ball_color_b),
"box_color_r": str(box_color_r),
"box_color_g": str(box_color_g),
"box_color_b": str(box_color_b),
"gravity_field": str(GRAVITY_FIELD),
"gravity_interaction": str(GRAVITY_INTERACTION),
"elastic_force": str(ELASTIC_FORCE),
"damping_force": str(DAMPING_FORCE),
"gravity_strength": str(GRAVITY_STRENGTH),
"driving_force": str(DRIVING_FORCE),
"use_marker": str(use_marker),
"alpha": ",".join(str(a) for a in (alpha if isinstance(alpha, list) else [alpha])),
"atom_masses": json.dumps([float(v) for v in ATOM_MASSES]),
"atom_positions": json.dumps(ATOM_POSITIONS.tolist()),
"bond_pairs": json.dumps(BOND_PAIRS.tolist() if BOND_PAIRS is not None else []),
"bond_stiffness": json.dumps(BOND_STIFFNESS.tolist() if BOND_STIFFNESS is not None else []),
"bond_rest_lengths": json.dumps(BOND_REST_LENGTHS.tolist() if BOND_REST_LENGTHS is not None else []),
"G": json.dumps(G.tolist() if G is not None else [0.0, 0.0, 0.0]),
"atom_radii": ",".join(str(r) for r in ATOM_RADII),
"camera_distance": str(camera_distance),
"camera_elevation": str(camera_elevation),
"camera_azimuth": str(camera_azimuth),
"camera_keyframes": str(camera_keyframes_raw),
"number_of_frames": str(record_steps),
"number_of_particles": str(n_atoms)}
save_display_npz(_npz_path,
sampled_x[:n_frames_actual], sampled_y[:n_frames_actual],
sampled_z[:n_frames_actual],
sampled_vx[:n_frames_actual], sampled_vy[:n_frames_actual],
sampled_vz[:n_frames_actual],
np.array(ATOM_IDS), header_fields=_hdr)
print(f"[compute] display.npz 已保存至: {_npz_path}")
# 可选:保存完整 trajectory.txt
if save_trajectory:
save_trajectory_txt(traj_x, traj_y, traj_z, traj_vx, traj_vy, traj_vz)