b95a3579fc
在帧位置写 all 表示该段对所有帧生效: all vx=0.1 ry=0.5 # 全程缓慢平移 + 旋转 等同于 start=0, end=INF,与普通区间段一样支持时间交叠。
531 lines
25 KiB
Python
531 lines
25 KiB
Python
"""
|
||
dynamics.py
|
||
------
|
||
统一入口:读取 YAML 配置文件 → 运行模拟 → 抽帧 → 绘图(可选)
|
||
|
||
用法:
|
||
python dynamics.py # 使用 input/input.txt
|
||
python dynamics.py input/input.txt # 指定配置文件(YAML 格式)
|
||
python dynamics.py --config input/input.txt --no-plot
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import subprocess
|
||
import time
|
||
import argparse
|
||
from contextlib import contextmanager
|
||
from pathlib import Path
|
||
|
||
import yaml
|
||
import numpy as np
|
||
|
||
# 导入同目录下的模块
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
import compute
|
||
|
||
|
||
def _fmt_alpha(v):
|
||
"""将 alpha 值格式化为逗号分隔字符串,兼容 numpy 数组/list/标量。"""
|
||
if isinstance(v, (list, tuple, np.ndarray)):
|
||
return ",".join(str(float(x)) for x in v)
|
||
return str(float(v))
|
||
|
||
|
||
def _load_camera_kf(config, runtime_base):
|
||
"""加载 move_camera.txt(速度段格式)→ JSON 字符串。"""
|
||
import re, json
|
||
if not int(config.get("move_camera", 0)):
|
||
return ""
|
||
cam_file = str(config.get("move_camera_file",
|
||
os.path.join("input", "move_camera.txt")))
|
||
cam_path = cam_file
|
||
if not os.path.isabs(cam_file):
|
||
cam_path = os.path.join(runtime_base, cam_file)
|
||
if not os.path.exists(cam_path):
|
||
return ""
|
||
segments = []
|
||
with open(cam_path, "r", encoding="utf-8") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
# 解析帧范围:支持 "all"(全程)或 "N-M"(区间)
|
||
if line.lower().startswith("all") or re.match(r'^\s*all\s', line, re.IGNORECASE):
|
||
start, end = 0, 10**9
|
||
else:
|
||
m = re.match(r'(\d+)\s*-\s*(\d+)', line)
|
||
if not m:
|
||
continue
|
||
start, end = int(m.group(1)), int(m.group(2))
|
||
v, r = [0.0, 0.0, 0.0], [0.0, 0.0, 0.0]
|
||
for i, axis in enumerate(['x', 'y', 'z']):
|
||
m2 = re.search(r'v' + axis + r'\s*=\s*([-\d.]+)', line)
|
||
if m2: v[i] = float(m2.group(1))
|
||
m2 = re.search(r'r' + axis + r'\s*=\s*([-\d.]+)', line)
|
||
if m2: r[i] = float(m2.group(1))
|
||
if any(v) or any(r):
|
||
segments.append({"start": start, "end": end, "v": v, "r": r})
|
||
return json.dumps(segments) if segments else ""
|
||
|
||
|
||
def read_optional_index(data, key, default_value):
|
||
"""Read an optional integer index from structured txt metadata."""
|
||
if key not in data:
|
||
return default_value
|
||
value = data[key]
|
||
if value is None or int(value) < 0:
|
||
return default_value
|
||
return int(value)
|
||
|
||
|
||
def load_yaml_config(config_path):
|
||
"""从 YAML 文件加载配置字典。"""
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
config = yaml.safe_load(f)
|
||
return config
|
||
|
||
|
||
def resolve_path(base_dir, path_value):
|
||
"""Resolve a path relative to the runtime base directory."""
|
||
path = Path(path_value)
|
||
if path.is_absolute():
|
||
return path
|
||
return (Path(base_dir) / path).resolve()
|
||
|
||
|
||
@contextmanager
|
||
def runtime_dir_overrides(input_dir=None, output_dir=None):
|
||
"""Temporarily override runtime input/output directories for compute helpers."""
|
||
old_input = os.environ.get("DYNAMICS_INPUT_DIR")
|
||
old_output = os.environ.get("DYNAMICS_OUTPUT_DIR")
|
||
try:
|
||
if input_dir is not None:
|
||
os.environ["DYNAMICS_INPUT_DIR"] = str(Path(input_dir).resolve())
|
||
if output_dir is not None:
|
||
os.environ["DYNAMICS_OUTPUT_DIR"] = str(Path(output_dir).resolve())
|
||
yield
|
||
finally:
|
||
if old_input is None:
|
||
os.environ.pop("DYNAMICS_INPUT_DIR", None)
|
||
else:
|
||
os.environ["DYNAMICS_INPUT_DIR"] = old_input
|
||
if old_output is None:
|
||
os.environ.pop("DYNAMICS_OUTPUT_DIR", None)
|
||
else:
|
||
os.environ["DYNAMICS_OUTPUT_DIR"] = old_output
|
||
|
||
|
||
def build_sample_indices(total_steps, sample_step, sample_start, sample_end):
|
||
"""Validate sampling settings and build frame indices."""
|
||
if sample_step <= 0:
|
||
raise ValueError(f"NSTEP 必须为正整数,实际为 {sample_step}")
|
||
if sample_start < 0:
|
||
raise ValueError(f"sample_start 不能小于 0,实际为 {sample_start}")
|
||
if sample_end > total_steps:
|
||
raise ValueError(
|
||
f"sample_end 不能大于记录步数 {total_steps},实际为 {sample_end}")
|
||
if sample_start >= sample_end:
|
||
raise ValueError(
|
||
f"sample_start 必须小于 sample_end,实际为 [{sample_start}, {sample_end})")
|
||
|
||
n_frames = (sample_end - sample_start) // sample_step
|
||
if n_frames <= 0:
|
||
raise ValueError(
|
||
f"抽帧范围 [{sample_start}, {sample_end}) 过短,按 NSTEP={sample_step} 无法抽出任何帧")
|
||
|
||
indices = np.arange(n_frames, dtype=np.int64) * sample_step + sample_start
|
||
return indices
|
||
|
||
|
||
def run_case(config_path, runtime_base, input_dir="input", output_dir="output", no_plot=False):
|
||
"""Run one case with explicit program path, input path, and output path."""
|
||
runtime_base = Path(runtime_base).resolve()
|
||
input_dir_path = resolve_path(runtime_base, input_dir)
|
||
output_dir_path = resolve_path(runtime_base, output_dir)
|
||
config_path = resolve_path(runtime_base, config_path)
|
||
|
||
with runtime_dir_overrides(input_dir_path, output_dir_path):
|
||
# 1. 加载 YAML 配置
|
||
config = load_yaml_config(config_path)
|
||
print(f"[run] 已加载配置: {config_path}")
|
||
|
||
# ── T_total → NT 自动转换 ──────────────────────
|
||
if "T_total" in config and "NT" not in config:
|
||
dt = float(config["DT"])
|
||
config["NT"] = int(float(config["T_total"]) / dt)
|
||
print(f"[run] T_total={config['T_total']} → NT={config['NT']} (DT={dt})")
|
||
elif "T_total" in config and "NT" in config:
|
||
print(f"[run] 同时指定了 T_total 和 NT,使用 NT={config['NT']}")
|
||
|
||
# 显示步骤控制信息
|
||
steps_info = {k: config.get(k, 1) for k in ["step_simulate", "step_sample", "step_plot", "step_plot_wave", "step_animation"]}
|
||
step_flags = ", ".join(f"{k}={v}" for k, v in steps_info.items())
|
||
print(f"[run] 步骤开关: {step_flags}")
|
||
|
||
warmup = config.get("warmup_steps", 0)
|
||
ss = config.get("sample_start", "从头")
|
||
se = config.get("sample_end", "到尾")
|
||
method = config.get("method", "explicit_euler")
|
||
coord_file = config.get("coord_file", os.path.join("input", "coord.txt"))
|
||
plot_atom = config.get("plot_atom", "第一个原子")
|
||
print(f"[run] 算法: {method}")
|
||
print(f"[run] 坐标文件: {coord_file}")
|
||
print(f"[run] 绘图/动画原子序号: {plot_atom}")
|
||
print(f"[run] 步骤控制: 预热={warmup}步, 抽帧范围=[{ss}, {se})")
|
||
|
||
output_dir_abs = compute.get_output_dir(str(runtime_base))
|
||
traj_path = os.path.join(output_dir_abs, "trajectory.txt")
|
||
disp_path = os.path.join(output_dir_abs, "display.txt")
|
||
|
||
# ── 自动缓存检测 ───────────────────────────────────────
|
||
# force_calc=1: 强制重新计算,忽略缓存
|
||
# force_calc=0: 尊重 step_simulate 设置,不自动覆盖
|
||
force_calc = int(config.get("force_calc", 0))
|
||
if force_calc:
|
||
print(f"[run] force_calc=1,跳过缓存,强制重新计算")
|
||
config["step_simulate"] = 1
|
||
config["step_sample"] = 1
|
||
elif config.get("step_simulate", 1):
|
||
# step_simulate=1 且 force_calc=0 → 按用户要求执行计算
|
||
# 但检测一下参数是否已变更(NT),如果变了则自动更新 step_sample
|
||
if os.path.isdir(output_dir_abs) and os.path.exists(traj_path) and os.path.exists(disp_path):
|
||
cached_nt = None
|
||
try:
|
||
with open(traj_path, 'rb') as _f:
|
||
_f.seek(-4096, 2)
|
||
_tail = _f.read().decode('utf-8', errors='replace')
|
||
import re as _re
|
||
_m = _re.search(r'"NT":\s*(\d+)', _tail)
|
||
if _m:
|
||
cached_nt = int(_m.group(1))
|
||
except Exception:
|
||
pass
|
||
config_nt = int(config.get("NT", 0))
|
||
if cached_nt is not None and cached_nt != config_nt:
|
||
print(f"[run] 参数已变更(缓存 NT={cached_nt},配置 NT={config_nt}),"
|
||
f"将重新计算")
|
||
config["step_sample"] = 1
|
||
else:
|
||
# 参数一致,按 step_simulate=1 执行,step_sample 由用户设置决定
|
||
pass
|
||
else:
|
||
# step_simulate=0 → 检测缓存是否存在并提示
|
||
if os.path.isdir(output_dir_abs) and os.path.exists(traj_path) and os.path.exists(disp_path):
|
||
print(f"[run] 已有输出,步骤已被跳过")
|
||
else:
|
||
print(f"[run] 没有可用的缓存输出,但 step_simulate=0,将跳过模拟")
|
||
|
||
# 2. 运行物理模拟 → output/trajectory.txt
|
||
if config.get("step_simulate", 1):
|
||
engine = config.get("engine", "python")
|
||
total_steps = config["NT"]
|
||
record_steps = total_steps - (config.get("warmup_steps") or 0)
|
||
print(f"[run] 开始计算 总步数={total_steps} 记录步数={record_steps} DT={config['DT']}")
|
||
|
||
import time as _time
|
||
_t0 = _time.time()
|
||
|
||
if engine == "python":
|
||
compute.run_from_config(config, str(runtime_base))
|
||
else:
|
||
# 外部引擎:先加载配置到全局变量,再运行引擎
|
||
config["_skip_run"] = True
|
||
compute.run_from_config(config, str(runtime_base))
|
||
config.pop("_skip_run", None)
|
||
input_dir_abs = str(input_dir_path.resolve())
|
||
output_dir_abs = str(output_dir_path.resolve())
|
||
# 外部引擎写完整 trajectory.txt,后续抽帧
|
||
traj_x, traj_y, traj_z, traj_vx, traj_vy, traj_vz = compute.run_engine(
|
||
engine, input_dir_abs, output_dir_abs, config)
|
||
compute.save_trajectory_txt(traj_x, traj_y, traj_z, traj_vx, traj_vy, traj_vz, str(runtime_base))
|
||
|
||
_elapsed = _time.time() - _t0
|
||
print(f"[run] 引擎: {engine} 计算完成: {record_steps} 步 {_elapsed:.3f} s")
|
||
else:
|
||
print("[run] 步骤 [模拟] 已跳过")
|
||
|
||
# 3. 检查 display.txt(新格式),不存在则从 trajectory.txt 抽帧生成
|
||
disp_path_new = os.path.join(output_dir_abs, "display.txt")
|
||
if os.path.exists(disp_path_new):
|
||
print(f"[run] 发现已有 display.txt(引擎直接抽帧)")
|
||
else:
|
||
# 从 trajectory.txt 抽帧(外部引擎)
|
||
traj_path = os.path.join(output_dir_abs, "trajectory.txt")
|
||
if not os.path.exists(traj_path):
|
||
print(f"[run] 错误: 找不到 trajectory.txt 或 display.txt")
|
||
sys.exit(1)
|
||
data = compute.load_text_data(traj_path)
|
||
NT = int(data["NT"]); DT = float(data["DT"]); NSTEP = int(data.get("NSTEP", 1))
|
||
record_steps = NT - int(data.get("warmup_steps", 0))
|
||
n_atoms = len(data["atom_ids"])
|
||
sample_start = 0
|
||
sample_end = NT
|
||
indices = np.arange(0, record_steps, NSTEP, dtype=np.int64)
|
||
if len(indices) == 0:
|
||
indices = np.array([0])
|
||
|
||
traj_x = data["traj_x"]; traj_y = data["traj_y"]; traj_z = data["traj_z"]
|
||
traj_vx = data["traj_vx"]; traj_vy = data["traj_vy"]; traj_vz = data["traj_vz"]
|
||
|
||
# 构建 header_fields
|
||
hf = {"DT": str(DT), "NSTEP": str(NSTEP), "method": str(data.get("method", "")),
|
||
"warmup_steps": str(data.get("warmup_steps", 0)),
|
||
"dynamic_steps": str(record_steps),
|
||
"T_total": str(NT * DT),
|
||
"X_MAX": str(data.get("X_MAX", 10)), "X_MIN": str(data.get("X_MIN", -10)),
|
||
"Y_MAX": str(data.get("Y_MAX", 10)), "Y_MIN": str(data.get("Y_MIN", -10)),
|
||
"Z_MAX": str(data.get("Z_MAX", 10)), "Z_MIN": str(data.get("Z_MIN", -10)),
|
||
"ball_radius": str(data.get("ball_radius", 0.5)),
|
||
"ball_color_r": str(data.get("ball_color_r", 0.9)),
|
||
"ball_color_g": str(data.get("ball_color_g", 0.2)),
|
||
"ball_color_b": str(data.get("ball_color_b", 0.2)),
|
||
"box_color_r": str(data.get("box_color_r", 0.8)),
|
||
"box_color_g": str(data.get("box_color_g", 0.8)),
|
||
"box_color_b": str(data.get("box_color_b", 0.85)),
|
||
"gravity_field": str(data.get("gravity_field", 1)),
|
||
"gravity_interaction": str(data.get("gravity_interaction", 0)),
|
||
"elastic_force": str(data.get("elastic_force", 1)),
|
||
"damping_force": str(data.get("damping_force", 0)),
|
||
"gravity_strength": str(data.get("gravity_strength", 1.0)),
|
||
"driving_force": str(data.get("driving_force", 0)),
|
||
"use_marker": str(config.get("use_marker", 0)),
|
||
"alpha": _fmt_alpha(data.get("alpha", 0.2)),
|
||
"atom_radii": _fmt_alpha(data.get("atom_radii", [])),
|
||
"camera_distance": str(config.get("camera_distance", 40.0)),
|
||
"camera_elevation": str(config.get("camera_elevation", 0)),
|
||
"camera_azimuth": str(config.get("camera_azimuth", 0)),
|
||
"camera_keyframes": _load_camera_kf(config, str(runtime_base))}
|
||
|
||
n_frames = len(indices)
|
||
compute.save_display_txt(
|
||
disp_path_new,
|
||
traj_x[indices], traj_y[indices], traj_z[indices],
|
||
traj_vx[indices], traj_vy[indices], traj_vz[indices],
|
||
np.array(data["atom_ids"]), n_frames, n_atoms,
|
||
header_fields=hf)
|
||
print(f"[run] 从 trajectory.txt 抽帧生成 display.txt ({n_frames} 帧)")
|
||
|
||
# 4. 绘图(可选)
|
||
if not no_plot and config.get("step_plot", 1):
|
||
try:
|
||
import matplotlib.pyplot as plt
|
||
|
||
# 配置中文字体支持
|
||
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'WenQuanYi Micro Hei', 'DejaVu Sans']
|
||
plt.rcParams['axes.unicode_minus'] = False
|
||
|
||
time_arr = np.arange(NT) * DT
|
||
n_atoms = all_x.shape[1]
|
||
atom_ids_list = data.get("atom_ids", np.arange(n_atoms) + 1)
|
||
|
||
fig, axes = plt.subplots(3, 3, figsize=(15, 13))
|
||
fig.suptitle("轨迹与能量分析", fontsize=16)
|
||
|
||
# ── 位置 / 速度 6 子图(前 2 行,每行 3 列) ──
|
||
plot_configs = [
|
||
(axes[0, 0], all_x, "x - 时间"),
|
||
(axes[0, 1], all_y, "y - 时间"),
|
||
(axes[0, 2], all_z, "z - 时间"),
|
||
(axes[1, 0], all_vx, "vx - 时间"),
|
||
(axes[1, 1], all_vy, "vy - 时间"),
|
||
(axes[1, 2], all_vz, "vz - 时间"),
|
||
]
|
||
|
||
colors = plt.cm.tab10(np.linspace(0, 1, n_atoms))
|
||
|
||
for ax, data_arr, title in plot_configs:
|
||
for i in range(n_atoms):
|
||
atom_id = int(atom_ids_list[i])
|
||
ax.plot(time_arr, data_arr[:, i], color=colors[i], linewidth=1.5, label=f"原子 {atom_id}")
|
||
ax.set_title(title)
|
||
ax.set_xlabel("时间 (s)")
|
||
ax.grid(True, alpha=0.3)
|
||
ax.legend()
|
||
|
||
# ── 能量计算 ─────────────────────────────────────
|
||
masses = np.array(data["atom_masses"]) # (n_atoms,)
|
||
G_vec = np.array(data.get("G", [0.0, 0.0, -9.8])) # [gx, gy, gz]
|
||
gravity_field_enabled = int(data.get("gravity_field", 1))
|
||
gravity_interaction_enabled = int(data.get("gravity_interaction", 0))
|
||
gravity_strength = float(data.get("gravity_strength", 1.0))
|
||
elastic_force_enabled = int(data.get("elastic_force", 1))
|
||
damping_force_enabled = int(data.get("damping_force", 0))
|
||
|
||
# 动能 Ek = ½ m v²
|
||
ek = 0.5 * masses[np.newaxis, :] * (all_vx**2 + all_vy**2 + all_vz**2)
|
||
|
||
# 均匀重力场势能 Ug = -m G·r
|
||
ug = np.zeros_like(ek)
|
||
if gravity_field_enabled:
|
||
ug = -masses[np.newaxis, :] * (
|
||
G_vec[0] * all_x + G_vec[1] * all_y + G_vec[2] * all_z
|
||
)
|
||
|
||
# 弹性势能 Us = ½ k (d - d₀)²
|
||
us = np.zeros_like(ek)
|
||
bond_pairs = data.get("bond_pairs")
|
||
bond_stiffness = data.get("bond_stiffness")
|
||
bond_rest_lengths = data.get("bond_rest_lengths")
|
||
if elastic_force_enabled and bond_pairs is not None and len(bond_pairs) > 0:
|
||
for b_idx in range(len(bond_pairs)):
|
||
i, j = bond_pairs[b_idx]
|
||
dx = all_x[:, j] - all_x[:, i]
|
||
dy = all_y[:, j] - all_y[:, i]
|
||
dz = all_z[:, j] - all_z[:, i]
|
||
dist = np.sqrt(dx**2 + dy**2 + dz**2)
|
||
stretch = dist - bond_rest_lengths[b_idx]
|
||
us_each = 0.5 * bond_stiffness[b_idx] * stretch**2
|
||
us[:, i] += us_each # 将整根键的势能记给 i
|
||
|
||
# 万有引力势能 Ug_grav = -G_grav * m_i * m_j / r
|
||
ug_grav = np.zeros_like(ek)
|
||
if gravity_interaction_enabled:
|
||
n_atoms_en = len(masses)
|
||
for i in range(n_atoms_en):
|
||
for j in range(i + 1, n_atoms_en):
|
||
dx = all_x[:, j] - all_x[:, i]
|
||
dy = all_y[:, j] - all_y[:, i]
|
||
dz = all_z[:, j] - all_z[:, i]
|
||
dist = np.sqrt(dx**2 + dy**2 + dz**2)
|
||
dist = np.maximum(dist, 1e-12)
|
||
pair_pe = -gravity_strength * masses[i] * masses[j] / dist
|
||
ug_grav[:, i] += 0.5 * pair_pe
|
||
ug_grav[:, j] += 0.5 * pair_pe
|
||
|
||
# 各原子总能量
|
||
e_total = ek + ug + us + ug_grav # (NT, n_atoms)
|
||
|
||
# 系统能量分量
|
||
ek_sys = np.sum(ek, axis=1)
|
||
ug_sys = np.sum(ug, axis=1)
|
||
us_sys = np.sum(us, axis=1)
|
||
ug_grav_sys = np.sum(ug_grav, axis=1)
|
||
e_sys = ek_sys + ug_sys + us_sys + ug_grav_sys
|
||
|
||
# ── 第 3 行左:各原子总能量 ──
|
||
ax_e = axes[2, 0]
|
||
for i in range(n_atoms):
|
||
aid = int(atom_ids_list[i])
|
||
ax_e.plot(time_arr, e_total[:, i], color=colors[i], linewidth=1.5, label=f"原子 {aid}")
|
||
ax_e.set_title("各原子总能量")
|
||
ax_e.set_xlabel("时间 (s)")
|
||
ax_e.set_ylabel("能量")
|
||
ax_e.grid(True, alpha=0.3)
|
||
ax_e.legend(loc="upper right")
|
||
|
||
# ── 第 3 行右:系统总能量 ──
|
||
ax_sys = axes[2, 1]
|
||
ax_sys.plot(time_arr, ek_sys, 'b-', linewidth=1.5, label="系统动能")
|
||
ax_sys.plot(time_arr, ug_sys, 'g-', linewidth=1.5, label="均匀重力势能")
|
||
if elastic_force_enabled and bond_pairs is not None and len(bond_pairs) > 0:
|
||
ax_sys.plot(time_arr, us_sys, color='orange', linewidth=1.5, label="系统弹性势能")
|
||
if gravity_interaction_enabled:
|
||
ax_sys.plot(time_arr, ug_grav_sys, color='purple', linewidth=1.5, label="万有引力势能")
|
||
ax_sys.plot(time_arr, e_sys, 'r--', linewidth=1.5, label="系统总能量")
|
||
ax_sys.set_title("系统总能量")
|
||
ax_sys.set_xlabel("时间 (s)")
|
||
ax_sys.set_ylabel("能量")
|
||
ax_sys.grid(True, alpha=0.3)
|
||
ax_sys.legend(loc="upper right")
|
||
|
||
# 隐藏第 3 行第 3 列空子图
|
||
axes[2, 2].set_visible(False)
|
||
|
||
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
|
||
plot_path = os.path.join(output_dir_abs, "trajectory_plots.png")
|
||
plt.savefig(plot_path, dpi=300, bbox_inches="tight")
|
||
print(f"[run] 轨迹图表已保存至: {plot_path}")
|
||
plt.show()
|
||
except ImportError:
|
||
print("[run] 警告: 未安装 matplotlib,跳过绘图")
|
||
|
||
print(f"[run] 完成!输出目录: {output_dir_abs}")
|
||
|
||
# 5. 自动播放动画(可选)
|
||
if config.get("step_animation", 0):
|
||
draw_script = os.path.join(os.path.dirname(os.path.abspath(__file__)), "draw.py")
|
||
if not os.path.exists(draw_script):
|
||
print(f"[run] 未找到动画脚本: {draw_script}")
|
||
else:
|
||
# 检查 display.txt 是否存在(step_sample=0 时可能没有)
|
||
disp_path = os.path.join(output_dir_abs, "display.txt")
|
||
if not os.path.exists(disp_path):
|
||
print(f"[run] 错误: 找不到 {disp_path}")
|
||
print(f"[run] 启动动画需要先运行抽帧(step_sample: 1),或手动保留 output/display.txt")
|
||
else:
|
||
try:
|
||
print("[run] 正在启动 VisPy 3D 动画窗口…")
|
||
ansi_log = os.path.join(output_dir_abs, "animation.log")
|
||
if sys.platform == "win32":
|
||
# Windows 上给子进程独立控制台,避免父进程退出时连带关闭
|
||
creation_flags = subprocess.CREATE_NEW_CONSOLE
|
||
else:
|
||
creation_flags = 0
|
||
proc = subprocess.Popen(
|
||
[sys.executable, draw_script, output_dir_abs],
|
||
cwd=runtime_base,
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=open(ansi_log, "w", encoding="utf-8"),
|
||
creationflags=creation_flags,
|
||
)
|
||
# 等待半秒检查子进程是否成功启动(未立即崩溃)
|
||
time.sleep(0.5)
|
||
if proc.poll() is not None:
|
||
print(f"[run] ⚠ 动画进程已退出,返回码={proc.returncode}")
|
||
print(f"[run] 请查看错误日志: {ansi_log}")
|
||
else:
|
||
print(f"[run] VisPy 动画窗口已启动(PID={proc.pid})")
|
||
except Exception as e:
|
||
print(f"[run] 启动动画失败: {e}")
|
||
else:
|
||
print("[run] 运行 python draw.py 查看动画。")
|
||
|
||
# 6. 波形能量动画(可选)
|
||
if config.get("step_plot_wave", 0):
|
||
try:
|
||
import plot_wave as pw
|
||
print("[run] 正在绘制波形与能量图…")
|
||
pw.plot_wave(
|
||
str(output_dir_abs),
|
||
save_gif=int(config.get("plot_wave_save_gif", 0)),
|
||
save_mp4=int(config.get("plot_wave_save_mp4", 0)),
|
||
)
|
||
except Exception as e:
|
||
print(f"[run] 绘制波形图失败: {e}")
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="物理模拟统一入口")
|
||
parser.add_argument("config_file", nargs="?", default=os.path.join("input", "input.txt"),
|
||
help="YAML 配置文件路径(默认: input/input.txt,虽然是 .txt 后缀但使用 YAML 格式)")
|
||
parser.add_argument("--config", dest="config_override",
|
||
help="YAML 配置文件路径(可选,优先于位置参数)")
|
||
parser.add_argument("--input-dir", default="input",
|
||
help="输入目录路径(默认: input)")
|
||
parser.add_argument("--output-dir", default="output",
|
||
help="输出目录路径(默认: output)")
|
||
parser.add_argument("--runtime-base", default=".",
|
||
help="案例运行根目录(默认: 当前目录)")
|
||
parser.add_argument("--no-plot", action="store_true",
|
||
help="跳过 matplotlib 绘图")
|
||
args = parser.parse_args()
|
||
|
||
config_path = args.config_override or args.config_file
|
||
runtime_base = resolve_path(os.getcwd(), args.runtime_base)
|
||
config_path_abs = resolve_path(runtime_base, config_path)
|
||
if not os.path.exists(config_path_abs):
|
||
print(f"错误: 配置文件不存在: {config_path_abs}")
|
||
sys.exit(1)
|
||
run_case(
|
||
config_path=config_path_abs,
|
||
runtime_base=runtime_base,
|
||
input_dir=args.input_dir,
|
||
output_dir=args.output_dir,
|
||
no_plot=args.no_plot,
|
||
)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|