Files
dynamics/dynamics.py
T
admin 41790a782a fix: save_trajectory=0 时删除 trajectory.txt,所有引擎保持一致
Python 引擎:run_simulation 已正确支持 save_trajectory 
外部引擎(C/C++/Fortran):save_trajectory_txt 仅在
  save_trajectory=1 时调用;display.txt 生成后删除 trajectory.txt
补充:移除 compute.py 中重复的 'global use_marker' 和占位符
2026-06-12 08:18:07 +08:00

545 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
dynamics.py
------
统一入口:读取 YAML 配置文件 → 运行模拟 → 抽帧 → 绘图(可选)
用法:
python dynamics.py # 使用 input/input.txt
python dynamics.py input/input.txt # 指定配置文件(YAML 格式)
python dynamics.py --config input/input.txt --no-plot
"""
import os
import sys
import subprocess
import time
import argparse
from contextlib import contextmanager
from pathlib import Path
import yaml
import numpy as np
# 导入同目录下的模块
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import compute
def _fmt_alpha(v):
"""将 alpha 值格式化为逗号分隔字符串,兼容 numpy 数组/list/标量。"""
if isinstance(v, (list, tuple, np.ndarray)):
return ",".join(str(float(x)) for x in v)
return str(float(v))
def _load_camera_kf(config, runtime_base):
"""加载 move_camera.txt(速度段格式)→ JSON 字符串。"""
import re, json
if not int(config.get("move_camera", 0)):
return ""
cam_file = str(config.get("move_camera_file",
os.path.join("input", "move_camera.txt")))
cam_path = cam_file
if not os.path.isabs(cam_file):
cam_path = os.path.join(runtime_base, cam_file)
if not os.path.exists(cam_path):
return ""
segments = []
with open(cam_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
# 解析帧范围:支持 "all"(全程)或 "N-M"(区间)
if line.lower().startswith("all") or re.match(r'^\s*all\s', line, re.IGNORECASE):
start, end = 0, 10**9
else:
m = re.match(r'(\d+)\s*-\s*(\d+)', line)
if not m:
continue
start, end = int(m.group(1)), int(m.group(2))
v, r = [0.0, 0.0, 0.0], [0.0, 0.0, 0.0]
for i, axis in enumerate(['x', 'y', 'z']):
m2 = re.search(r'v' + axis + r'\s*=\s*([-\d.]+)', line)
if m2: v[i] = float(m2.group(1))
m2 = re.search(r'r' + axis + r'\s*=\s*([-\d.]+)', line)
if m2: r[i] = float(m2.group(1))
if any(v) or any(r):
segments.append({"start": start, "end": end, "v": v, "r": r})
return json.dumps(segments) if segments else ""
def read_optional_index(data, key, default_value):
"""Read an optional integer index from structured txt metadata."""
if key not in data:
return default_value
value = data[key]
if value is None or int(value) < 0:
return default_value
return int(value)
def load_yaml_config(config_path):
"""从 YAML 文件加载配置字典。"""
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
return config
def resolve_path(base_dir, path_value):
"""Resolve a path relative to the runtime base directory."""
path = Path(path_value)
if path.is_absolute():
return path
return (Path(base_dir) / path).resolve()
@contextmanager
def runtime_dir_overrides(input_dir=None, output_dir=None):
"""Temporarily override runtime input/output directories for compute helpers."""
old_input = os.environ.get("DYNAMICS_INPUT_DIR")
old_output = os.environ.get("DYNAMICS_OUTPUT_DIR")
try:
if input_dir is not None:
os.environ["DYNAMICS_INPUT_DIR"] = str(Path(input_dir).resolve())
if output_dir is not None:
os.environ["DYNAMICS_OUTPUT_DIR"] = str(Path(output_dir).resolve())
yield
finally:
if old_input is None:
os.environ.pop("DYNAMICS_INPUT_DIR", None)
else:
os.environ["DYNAMICS_INPUT_DIR"] = old_input
if old_output is None:
os.environ.pop("DYNAMICS_OUTPUT_DIR", None)
else:
os.environ["DYNAMICS_OUTPUT_DIR"] = old_output
def build_sample_indices(total_steps, sample_step, sample_start, sample_end):
"""Validate sampling settings and build frame indices."""
if sample_step <= 0:
raise ValueError(f"NSTEP 必须为正整数,实际为 {sample_step}")
if sample_start < 0:
raise ValueError(f"sample_start 不能小于 0,实际为 {sample_start}")
if sample_end > total_steps:
raise ValueError(
f"sample_end 不能大于记录步数 {total_steps},实际为 {sample_end}")
if sample_start >= sample_end:
raise ValueError(
f"sample_start 必须小于 sample_end,实际为 [{sample_start}, {sample_end})")
n_frames = (sample_end - sample_start) // sample_step
if n_frames <= 0:
raise ValueError(
f"抽帧范围 [{sample_start}, {sample_end}) 过短,按 NSTEP={sample_step} 无法抽出任何帧")
indices = np.arange(n_frames, dtype=np.int64) * sample_step + sample_start
return indices
def run_case(config_path, runtime_base, input_dir="input", output_dir="output", no_plot=False):
"""Run one case with explicit program path, input path, and output path."""
runtime_base = Path(runtime_base).resolve()
input_dir_path = resolve_path(runtime_base, input_dir)
output_dir_path = resolve_path(runtime_base, output_dir)
config_path = resolve_path(runtime_base, config_path)
with runtime_dir_overrides(input_dir_path, output_dir_path):
# 1. 加载 YAML 配置
config = load_yaml_config(config_path)
print(f"[run] 已加载配置: {config_path}")
# ── T_total → NT 自动转换 ──────────────────────
if "T_total" in config and "NT" not in config:
dt = float(config["DT"])
config["NT"] = int(float(config["T_total"]) / dt)
print(f"[run] T_total={config['T_total']} → NT={config['NT']} (DT={dt})")
elif "T_total" in config and "NT" in config:
print(f"[run] 同时指定了 T_total 和 NT,使用 NT={config['NT']}")
# 显示步骤控制信息
steps_info = {k: config.get(k, 1) for k in ["step_simulate", "step_sample", "step_plot", "step_plot_wave", "step_animation"]}
step_flags = ", ".join(f"{k}={v}" for k, v in steps_info.items())
print(f"[run] 步骤开关: {step_flags}")
warmup = config.get("warmup_steps", 0)
ss = config.get("sample_start", "从头")
se = config.get("sample_end", "到尾")
method = config.get("method", "explicit_euler")
coord_file = config.get("coord_file", os.path.join("input", "coord.txt"))
plot_atom = config.get("plot_atom", "第一个原子")
print(f"[run] 算法: {method}")
print(f"[run] 坐标文件: {coord_file}")
print(f"[run] 绘图/动画原子序号: {plot_atom}")
print(f"[run] 步骤控制: 预热={warmup}步, 抽帧范围=[{ss}, {se})")
output_dir_abs = compute.get_output_dir(str(runtime_base))
traj_path = os.path.join(output_dir_abs, "trajectory.txt")
disp_path = os.path.join(output_dir_abs, "display.txt")
# ── 自动缓存检测 ───────────────────────────────────────
# force_calc=1: 强制重新计算,忽略缓存
# force_calc=0: 尊重 step_simulate 设置,不自动覆盖
force_calc = int(config.get("force_calc", 0))
if force_calc:
print(f"[run] force_calc=1,跳过缓存,强制重新计算")
config["step_simulate"] = 1
config["step_sample"] = 1
elif config.get("step_simulate", 1):
# step_simulate=1 且 force_calc=0 → 按用户要求执行计算
# 但检测一下参数是否已变更(NT),如果变了则自动更新 step_sample
if os.path.isdir(output_dir_abs) and os.path.exists(traj_path) and os.path.exists(disp_path):
cached_nt = None
try:
with open(traj_path, 'rb') as _f:
_f.seek(-4096, 2)
_tail = _f.read().decode('utf-8', errors='replace')
import re as _re
_m = _re.search(r'"NT":\s*(\d+)', _tail)
if _m:
cached_nt = int(_m.group(1))
except Exception:
pass
config_nt = int(config.get("NT", 0))
if cached_nt is not None and cached_nt != config_nt:
print(f"[run] 参数已变更(缓存 NT={cached_nt},配置 NT={config_nt}),"
f"将重新计算")
config["step_sample"] = 1
else:
# 参数一致,按 step_simulate=1 执行,step_sample 由用户设置决定
pass
else:
# step_simulate=0 → 检测缓存是否存在并提示
if os.path.isdir(output_dir_abs) and os.path.exists(traj_path) and os.path.exists(disp_path):
print(f"[run] 已有输出,步骤已被跳过")
else:
print(f"[run] 没有可用的缓存输出,但 step_simulate=0,将跳过模拟")
# 2. 运行物理模拟 → output/trajectory.txt
if config.get("step_simulate", 1):
engine = config.get("engine", "python")
total_steps = config["NT"]
record_steps = total_steps - (config.get("warmup_steps") or 0)
print(f"[run] 开始计算 总步数={total_steps} 记录步数={record_steps} DT={config['DT']}")
import time as _time
_t0 = _time.time()
if engine == "python":
compute.run_from_config(config, str(runtime_base))
else:
# 外部引擎:先加载配置到全局变量,再运行引擎
config["_skip_run"] = True
compute.run_from_config(config, str(runtime_base))
config.pop("_skip_run", None)
input_dir_abs = str(input_dir_path.resolve())
output_dir_abs = str(output_dir_path.resolve())
# 外部引擎写完整 trajectory.txt,后续抽帧
traj_x, traj_y, traj_z, traj_vx, traj_vy, traj_vz = compute.run_engine(
engine, input_dir_abs, output_dir_abs, config)
if int(config.get("save_trajectory", 0)):
compute.save_trajectory_txt(traj_x, traj_y, traj_z, traj_vx, traj_vy, traj_vz, str(runtime_base))
_elapsed = _time.time() - _t0
print(f"[run] 引擎: {engine} 计算完成: {record_steps}{_elapsed:.3f} s")
else:
print("[run] 步骤 [模拟] 已跳过")
# 3. 生成 display.txt:模拟完成后总是刷新,避免参数变更后缓存过时
disp_path_new = os.path.join(output_dir_abs, "display.txt")
if engine == "python":
# Python 引擎在 run_simulation 内部已写入 display.txt
if os.path.exists(disp_path_new):
print(f"[run] 发现已有 display.txt(引擎直接抽帧)")
else:
# 外部引擎:从 trajectory.txt 重新抽帧(覆盖旧的 display.txt
traj_path = os.path.join(output_dir_abs, "trajectory.txt")
if not os.path.exists(traj_path):
print(f"[run] 错误: 找不到 trajectory.txt 或 display.txt")
sys.exit(1)
data = compute.load_text_data(traj_path)
NT = int(data["NT"]); DT = float(data["DT"]); NSTEP = int(data.get("NSTEP", 1))
record_steps = NT - int(data.get("warmup_steps", 0))
n_atoms = len(data["atom_ids"])
sample_start = 0
sample_end = NT
indices = np.arange(0, record_steps, NSTEP, dtype=np.int64)
if len(indices) == 0:
indices = np.array([0])
traj_x = data["traj_x"]; traj_y = data["traj_y"]; traj_z = data["traj_z"]
traj_vx = data["traj_vx"]; traj_vy = data["traj_vy"]; traj_vz = data["traj_vz"]
# 构建 header_fields
hf = {"DT": str(DT), "NSTEP": str(NSTEP), "method": str(data.get("method", "")),
"warmup_steps": str(data.get("warmup_steps", 0)),
"dynamic_steps": str(record_steps),
"T_total": str(NT * DT),
"X_MAX": str(data.get("X_MAX", 10)), "X_MIN": str(data.get("X_MIN", -10)),
"Y_MAX": str(data.get("Y_MAX", 10)), "Y_MIN": str(data.get("Y_MIN", -10)),
"Z_MAX": str(data.get("Z_MAX", 10)), "Z_MIN": str(data.get("Z_MIN", -10)),
"ball_radius": str(data.get("ball_radius", 0.5)),
"ball_color_r": str(data.get("ball_color_r", 0.9)),
"ball_color_g": str(data.get("ball_color_g", 0.2)),
"ball_color_b": str(data.get("ball_color_b", 0.2)),
"box_color_r": str(data.get("box_color_r", 0.8)),
"box_color_g": str(data.get("box_color_g", 0.8)),
"box_color_b": str(data.get("box_color_b", 0.85)),
"gravity_field": str(data.get("gravity_field", 1)),
"gravity_interaction": str(data.get("gravity_interaction", 0)),
"elastic_force": str(data.get("elastic_force", 1)),
"damping_force": str(data.get("damping_force", 0)),
"gravity_strength": str(data.get("gravity_strength", 1.0)),
"driving_force": str(data.get("driving_force", 0)),
"use_marker": str(config.get("use_marker", 0)),
"alpha": _fmt_alpha(data.get("alpha", 0.2)),
"atom_radii": _fmt_alpha(data.get("atom_radii", [])),
"camera_distance": str(config.get("camera_distance", 40.0)),
"camera_elevation": str(config.get("camera_elevation", 0)),
"camera_azimuth": str(config.get("camera_azimuth", 0)),
"camera_keyframes": _load_camera_kf(config, str(runtime_base))}
n_frames = len(indices)
compute.save_display_txt(
disp_path_new,
traj_x[indices], traj_y[indices], traj_z[indices],
traj_vx[indices], traj_vy[indices], traj_vz[indices],
np.array(data["atom_ids"]), n_frames, n_atoms,
header_fields=hf)
print(f"[run] 从 trajectory.txt 抽帧生成 display.txt ({n_frames} 帧)")
# 外部引擎:save_trajectory=0 时清理 trajectory.txt
save_traj = int(config.get("save_trajectory", 0))
if engine != "python" and not save_traj:
traj_path = os.path.join(output_dir_abs, "trajectory.txt")
try:
os.remove(traj_path)
print(f"[run] save_trajectory=0,已删除 {traj_path}")
except OSError:
pass
# 4. 绘图(可选)
if not no_plot and config.get("step_plot", 1):
try:
import matplotlib.pyplot as plt
# 配置中文字体支持
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'WenQuanYi Micro Hei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
time_arr = np.arange(NT) * DT
n_atoms = all_x.shape[1]
atom_ids_list = data.get("atom_ids", np.arange(n_atoms) + 1)
fig, axes = plt.subplots(3, 3, figsize=(15, 13))
fig.suptitle("轨迹与能量分析", fontsize=16)
# ── 位置 / 速度 6 子图(前 2 行,每行 3 列) ──
plot_configs = [
(axes[0, 0], all_x, "x - 时间"),
(axes[0, 1], all_y, "y - 时间"),
(axes[0, 2], all_z, "z - 时间"),
(axes[1, 0], all_vx, "vx - 时间"),
(axes[1, 1], all_vy, "vy - 时间"),
(axes[1, 2], all_vz, "vz - 时间"),
]
colors = plt.cm.tab10(np.linspace(0, 1, n_atoms))
for ax, data_arr, title in plot_configs:
for i in range(n_atoms):
atom_id = int(atom_ids_list[i])
ax.plot(time_arr, data_arr[:, i], color=colors[i], linewidth=1.5, label=f"原子 {atom_id}")
ax.set_title(title)
ax.set_xlabel("时间 (s)")
ax.grid(True, alpha=0.3)
ax.legend()
# ── 能量计算 ─────────────────────────────────────
masses = np.array(data["atom_masses"]) # (n_atoms,)
G_vec = np.array(data.get("G", [0.0, 0.0, -9.8])) # [gx, gy, gz]
gravity_field_enabled = int(data.get("gravity_field", 1))
gravity_interaction_enabled = int(data.get("gravity_interaction", 0))
gravity_strength = float(data.get("gravity_strength", 1.0))
elastic_force_enabled = int(data.get("elastic_force", 1))
damping_force_enabled = int(data.get("damping_force", 0))
# 动能 Ek = ½ m v²
ek = 0.5 * masses[np.newaxis, :] * (all_vx**2 + all_vy**2 + all_vz**2)
# 均匀重力场势能 Ug = -m G·r
ug = np.zeros_like(ek)
if gravity_field_enabled:
ug = -masses[np.newaxis, :] * (
G_vec[0] * all_x + G_vec[1] * all_y + G_vec[2] * all_z
)
# 弹性势能 Us = ½ k (d - d₀)²
us = np.zeros_like(ek)
bond_pairs = data.get("bond_pairs")
bond_stiffness = data.get("bond_stiffness")
bond_rest_lengths = data.get("bond_rest_lengths")
if elastic_force_enabled and bond_pairs is not None and len(bond_pairs) > 0:
for b_idx in range(len(bond_pairs)):
i, j = bond_pairs[b_idx]
dx = all_x[:, j] - all_x[:, i]
dy = all_y[:, j] - all_y[:, i]
dz = all_z[:, j] - all_z[:, i]
dist = np.sqrt(dx**2 + dy**2 + dz**2)
stretch = dist - bond_rest_lengths[b_idx]
us_each = 0.5 * bond_stiffness[b_idx] * stretch**2
us[:, i] += us_each # 将整根键的势能记给 i
# 万有引力势能 Ug_grav = -G_grav * m_i * m_j / r
ug_grav = np.zeros_like(ek)
if gravity_interaction_enabled:
n_atoms_en = len(masses)
for i in range(n_atoms_en):
for j in range(i + 1, n_atoms_en):
dx = all_x[:, j] - all_x[:, i]
dy = all_y[:, j] - all_y[:, i]
dz = all_z[:, j] - all_z[:, i]
dist = np.sqrt(dx**2 + dy**2 + dz**2)
dist = np.maximum(dist, 1e-12)
pair_pe = -gravity_strength * masses[i] * masses[j] / dist
ug_grav[:, i] += 0.5 * pair_pe
ug_grav[:, j] += 0.5 * pair_pe
# 各原子总能量
e_total = ek + ug + us + ug_grav # (NT, n_atoms)
# 系统能量分量
ek_sys = np.sum(ek, axis=1)
ug_sys = np.sum(ug, axis=1)
us_sys = np.sum(us, axis=1)
ug_grav_sys = np.sum(ug_grav, axis=1)
e_sys = ek_sys + ug_sys + us_sys + ug_grav_sys
# ── 第 3 行左:各原子总能量 ──
ax_e = axes[2, 0]
for i in range(n_atoms):
aid = int(atom_ids_list[i])
ax_e.plot(time_arr, e_total[:, i], color=colors[i], linewidth=1.5, label=f"原子 {aid}")
ax_e.set_title("各原子总能量")
ax_e.set_xlabel("时间 (s)")
ax_e.set_ylabel("能量")
ax_e.grid(True, alpha=0.3)
ax_e.legend(loc="upper right")
# ── 第 3 行右:系统总能量 ──
ax_sys = axes[2, 1]
ax_sys.plot(time_arr, ek_sys, 'b-', linewidth=1.5, label="系统动能")
ax_sys.plot(time_arr, ug_sys, 'g-', linewidth=1.5, label="均匀重力势能")
if elastic_force_enabled and bond_pairs is not None and len(bond_pairs) > 0:
ax_sys.plot(time_arr, us_sys, color='orange', linewidth=1.5, label="系统弹性势能")
if gravity_interaction_enabled:
ax_sys.plot(time_arr, ug_grav_sys, color='purple', linewidth=1.5, label="万有引力势能")
ax_sys.plot(time_arr, e_sys, 'r--', linewidth=1.5, label="系统总能量")
ax_sys.set_title("系统总能量")
ax_sys.set_xlabel("时间 (s)")
ax_sys.set_ylabel("能量")
ax_sys.grid(True, alpha=0.3)
ax_sys.legend(loc="upper right")
# 隐藏第 3 行第 3 列空子图
axes[2, 2].set_visible(False)
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plot_path = os.path.join(output_dir_abs, "trajectory_plots.png")
plt.savefig(plot_path, dpi=300, bbox_inches="tight")
print(f"[run] 轨迹图表已保存至: {plot_path}")
plt.show()
except ImportError:
print("[run] 警告: 未安装 matplotlib,跳过绘图")
print(f"[run] 完成!输出目录: {output_dir_abs}")
# 5. 自动播放动画(可选)
if config.get("step_animation", 0):
draw_script = os.path.join(os.path.dirname(os.path.abspath(__file__)), "draw.py")
if not os.path.exists(draw_script):
print(f"[run] 未找到动画脚本: {draw_script}")
else:
# 检查 display.txt 是否存在(step_sample=0 时可能没有)
disp_path = os.path.join(output_dir_abs, "display.txt")
if not os.path.exists(disp_path):
print(f"[run] 错误: 找不到 {disp_path}")
print(f"[run] 启动动画需要先运行抽帧(step_sample: 1),或手动保留 output/display.txt")
else:
try:
print("[run] 正在启动 VisPy 3D 动画窗口…")
ansi_log = os.path.join(output_dir_abs, "animation.log")
if sys.platform == "win32":
# Windows 上给子进程独立控制台,避免父进程退出时连带关闭
creation_flags = subprocess.CREATE_NEW_CONSOLE
else:
creation_flags = 0
proc = subprocess.Popen(
[sys.executable, draw_script, output_dir_abs],
cwd=runtime_base,
stdout=subprocess.DEVNULL,
stderr=open(ansi_log, "w", encoding="utf-8"),
creationflags=creation_flags,
)
# 等待半秒检查子进程是否成功启动(未立即崩溃)
time.sleep(0.5)
if proc.poll() is not None:
print(f"[run] ⚠ 动画进程已退出,返回码={proc.returncode}")
print(f"[run] 请查看错误日志: {ansi_log}")
else:
print(f"[run] VisPy 动画窗口已启动(PID={proc.pid}")
except Exception as e:
print(f"[run] 启动动画失败: {e}")
else:
print("[run] 运行 python draw.py 查看动画。")
# 6. 波形能量动画(可选)
if config.get("step_plot_wave", 0):
try:
import plot_wave as pw
print("[run] 正在绘制波形与能量图…")
pw.plot_wave(
str(output_dir_abs),
save_gif=int(config.get("plot_wave_save_gif", 0)),
save_mp4=int(config.get("plot_wave_save_mp4", 0)),
)
except Exception as e:
print(f"[run] 绘制波形图失败: {e}")
def main():
parser = argparse.ArgumentParser(description="物理模拟统一入口")
parser.add_argument("config_file", nargs="?", default=os.path.join("input", "input.txt"),
help="YAML 配置文件路径(默认: input/input.txt,虽然是 .txt 后缀但使用 YAML 格式)")
parser.add_argument("--config", dest="config_override",
help="YAML 配置文件路径(可选,优先于位置参数)")
parser.add_argument("--input-dir", default="input",
help="输入目录路径(默认: input")
parser.add_argument("--output-dir", default="output",
help="输出目录路径(默认: output")
parser.add_argument("--runtime-base", default=".",
help="案例运行根目录(默认: 当前目录)")
parser.add_argument("--no-plot", action="store_true",
help="跳过 matplotlib 绘图")
args = parser.parse_args()
config_path = args.config_override or args.config_file
runtime_base = resolve_path(os.getcwd(), args.runtime_base)
config_path_abs = resolve_path(runtime_base, config_path)
if not os.path.exists(config_path_abs):
print(f"错误: 配置文件不存在: {config_path_abs}")
sys.exit(1)
run_case(
config_path=config_path_abs,
runtime_base=runtime_base,
input_dir=args.input_dir,
output_dir=args.output_dir,
no_plot=args.no_plot,
)
if __name__ == "__main__":
main()