refactor: 引擎直接抽帧 + 新 display.txt 纯文本格式 + save_trajectory 开关

核心变更:
1. compute.py: run_simulation 直接按 NSTEP 抽帧写 display.txt(新格式)
   - 新格式:纯文本,帧 1→n 分块,每行: n x y z vx vy vz
   - 新函数 save_display_txt() / load_display_txt()
   - save_trajectory 参数(默认0=不保留 trajectory.txt)
2. dynamics.py: 移除旧 JSON 采样逻辑,自动检测 display.txt
   - Python 引擎直接读取引擎输出的 display.txt
   - 外部引擎仍写 trajectory.txt,自动抽帧转 display.txt
3. draw.py: 适配 load_display_txt() 新格式
4. case06/input.txt: 添加 save_trajectory: 0, step_sample: 0

TODO: 外部引擎(C/C++/Fortran)内部抽帧写 display.txt
TODO: plot_wave.py 适配新格式
TODO: 其他案例 input.txt 更新默认值
This commit is contained in:
2026-06-12 06:36:50 +08:00
parent c158c74609
commit 9d1f84d2bf
4 changed files with 334 additions and 204 deletions
+228 -26
View File
@@ -5,8 +5,8 @@ compute.py
功能:
1. 运行 NT 步物理模拟( kinematics / dynamics 等运动模式)
2. 将每一步的 (x, y, z, vx, vy, vz) 保存到 output/trajectory.txt
3. 同时保存所有模拟参数元数据
2. 按 NSTEP 抽帧,输出 output/display.txt(新文本格式)
3. 可选(save_trajectory=1)保留完整轨迹 output/trajectory.txtJSON
"""
import json
@@ -118,7 +118,7 @@ def _from_text_value(value):
def save_text_data(path, data):
"""Save structured simulation data as formatted JSON text."""
"""Save structured simulation data as formatted JSON text (旧格式,保留兼容)."""
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(_to_text_value(data), f, ensure_ascii=False, indent=2)
@@ -126,12 +126,150 @@ def save_text_data(path, data):
def load_text_data(path):
"""Load structured simulation data from JSON text."""
"""Load structured simulation data from JSON text (旧格式)."""
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return _from_text_value(data)
# ========================================================================
# 新 display.txt 格式:纯文本,按帧分块
# 第1行: number of frames: N
# 第2行: number of particles: M
# 第3行: frame: 1
# 第4行: n x y z vx vy vz
# 第5+行: 数据行(每个原子一行)
# 重复第3-5行直到所有帧
# ========================================================================
def save_display_txt(path, frames_x, frames_y, frames_z,
frames_vx, frames_vy, frames_vz,
atom_ids, n_total_frames, n_total_particles,
header_fields=None):
"""Write display.txt in new text format.
Args:
path: 输出文件路径
frames_x/y/z/vx/vy/vz: (n_frames, n_atoms) 数组
atom_ids: (n_atoms,) 原子编号数组
n_total_frames: 总帧数(含未采样)
n_total_particles: 总粒子数
header_fields: 可选的额外元数据字典(写入文件头之后)
"""
os.makedirs(os.path.dirname(path), exist_ok=True)
n_frames = frames_x.shape[0]
n_atoms = frames_x.shape[1]
# 格式化辅助:固定宽度,6位小数
def fmt(v): return f"{v:13.6f}"
with open(path, "w", encoding="utf-8") as f:
f.write(f"number of frames: {n_total_frames}\n")
f.write(f"number of particles: {n_total_particles}\n")
# 写入额外元数据
if header_fields:
for k, v in header_fields.items():
f.write(f"{k}: {v}\n")
for fr in range(n_frames):
f.write(f"\nframe: {fr + 1}\n")
f.write(f"n x y z vx vy vz\n")
for a in range(n_atoms):
f.write(f"{atom_ids[a]:d}"
f"{fmt(frames_x[fr, a])}"
f"{fmt(frames_y[fr, a])}"
f"{fmt(frames_z[fr, a])}"
f"{fmt(frames_vx[fr, a])}"
f"{fmt(frames_vy[fr, a])}"
f"{fmt(frames_vz[fr, a])}\n")
return path
def load_display_txt(path):
"""Read display.txt new text format into numpy arrays.
Returns dict with keys: frames_x/y/z/vx/vy/vz, atom_ids,
n_total_frames, n_total_particles, header_fields
"""
header_fields = {}
frames_x, frames_y, frames_z = [], [], []
frames_vx, frames_vy, frames_vz = [], [], []
atom_ids = None
n_total_frames = 0
n_total_particles = 0
with open(path, "r", encoding="utf-8") as f:
lines = f.readlines()
# Parse header
i = 0
while i < len(lines):
line = lines[i].strip()
if line.startswith("number of frames:"):
n_total_frames = int(line.split(":")[1].strip())
i += 1
elif line.startswith("number of particles:"):
n_total_particles = int(line.split(":")[1].strip())
i += 1
elif line.startswith("frame:"):
break
else:
# Extra header field
if ":" in line:
k, v = line.split(":", 1)
header_fields[k.strip()] = v.strip()
i += 1
# Parse frames
while i < len(lines):
line = lines[i].strip()
if line.startswith("frame:"):
i += 1 # skip column header line
if i < len(lines):
i += 1 # skip "n x y..."
frame_x, frame_y, frame_z = [], [], []
frame_vx, frame_vy, frame_vz = [], [], []
cur_ids = []
while i < len(lines) and not lines[i].strip().startswith("frame:") and lines[i].strip():
parts = lines[i].strip().split()
if len(parts) >= 7:
cur_ids.append(int(parts[0]))
frame_x.append(float(parts[1]))
frame_y.append(float(parts[2]))
frame_z.append(float(parts[3]))
frame_vx.append(float(parts[4]))
frame_vy.append(float(parts[5]))
frame_vz.append(float(parts[6]))
i += 1
if frame_x:
frames_x.append(frame_x)
frames_y.append(frame_y)
frames_z.append(frame_z)
frames_vx.append(frame_vx)
frames_vy.append(frame_vy)
frames_vz.append(frame_vz)
if atom_ids is None:
atom_ids = np.array(cur_ids)
else:
i += 1
if not frames_x:
raise ValueError(f"{path} 中没有有效帧数据")
return {
"frames_x": np.array(frames_x),
"frames_y": np.array(frames_y),
"frames_z": np.array(frames_z),
"frames_vx": np.array(frames_vx),
"frames_vy": np.array(frames_vy),
"frames_vz": np.array(frames_vz),
"atom_ids": atom_ids,
"n_total_frames": n_total_frames,
"n_total_particles": n_total_particles,
"header_fields": header_fields,
}
def get_output_dir(base_dir=None):
"""Return the output directory used for generated artifacts."""
override = os.environ.get("DYNAMICS_OUTPUT_DIR")
@@ -597,7 +735,8 @@ def run_from_config(config, out_dir=None):
return None, None, None, None, None, None
t_start = time.time()
t_start_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
traj_x, traj_y, traj_z, traj_vx, traj_vy, traj_vz = run_simulation()
traj_x, traj_y, traj_z, traj_vx, traj_vy, traj_vz = run_simulation(
save_trajectory=int(config.get("save_trajectory", 0)))
t_end = time.time()
t_end_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
elapsed = t_end - t_start
@@ -1213,12 +1352,13 @@ def wrap_position(x, y, z):
# 主计算流程
# ===========================================================================
def run_simulation():
"""计算 NT 步轨迹,返回位置/速度数组
def run_simulation(save_trajectory=0):
"""计算 NT 步轨迹,直接抽帧并保存 display.txt
步骤控制:
- warmup_steps: 预热步数,跳过不记录(用于稳定初始状态)
- 实际记录步数 = NT - warmup_steps
- 按 NSTEP 抽帧保存到 display.txt(新格式)
- save_trajectory=1 时额外保存完整 trajectory.txtJSON 旧格式)
"""
# 预热阶段
x = ATOM_POSITIONS[:, 0].copy()
@@ -1228,7 +1368,6 @@ def run_simulation():
vy = ATOM_VELOCITIES[:, 1].copy()
vz = ATOM_VELOCITIES[:, 2].copy()
x, y, z, vx, vy, vz = apply_fixed_constraints(x, y, z, vx, vy, vz)
# 初始时刻驱动力(t=0 时原子 1 的位置由驱动力决定而非 coord.txt)
x, y, z, vx, vy, vz = apply_driving_force(x, y, z, vx, vy, vz, 0.0, 0, DRIVER_DATA, DT)
if warmup_steps is not None and warmup_steps > 0:
@@ -1244,33 +1383,93 @@ def run_simulation():
f"({x[PLOT_ATOM_ROW]:.4f}, {y[PLOT_ATOM_ROW]:.4f}, {z[PLOT_ATOM_ROW]:.4f})"
)
# 记录阶段
# 记录阶段 - 按 NSTEP 抽帧保存
record_steps = NT - (warmup_steps or 0)
n_atoms = len(ATOM_IDS)
traj_x = np.zeros((record_steps, n_atoms), dtype=np.float64)
traj_y = np.zeros((record_steps, n_atoms), dtype=np.float64)
traj_z = np.zeros((record_steps, n_atoms), dtype=np.float64)
traj_vx = np.zeros((record_steps, n_atoms), dtype=np.float64)
traj_vy = np.zeros((record_steps, n_atoms), dtype=np.float64)
traj_vz = np.zeros((record_steps, n_atoms), dtype=np.float64)
n_frames = (record_steps + NSTEP - 1) // NSTEP
frame_indices = []
# 按 NSTEP 抽帧的临时缓冲区(远小于全量轨迹)
sampled_x = np.zeros((n_frames, n_atoms), dtype=np.float64)
sampled_y = np.zeros((n_frames, n_atoms), dtype=np.float64)
sampled_z = np.zeros((n_frames, n_atoms), dtype=np.float64)
sampled_vx = np.zeros((n_frames, n_atoms), dtype=np.float64)
sampled_vy = np.zeros((n_frames, n_atoms), dtype=np.float64)
sampled_vz = np.zeros((n_frames, n_atoms), dtype=np.float64)
# 如果 save_trajectory,准备完整轨迹缓冲区
if save_trajectory:
traj_x = np.zeros((record_steps, n_atoms), dtype=np.float64)
traj_y = np.zeros((record_steps, n_atoms), dtype=np.float64)
traj_z = np.zeros((record_steps, n_atoms), dtype=np.float64)
traj_vx = np.zeros((record_steps, n_atoms), dtype=np.float64)
traj_vy = np.zeros((record_steps, n_atoms), dtype=np.float64)
traj_vz = np.zeros((record_steps, n_atoms), dtype=np.float64)
for step in trange(record_steps, desc="[compute] 计算中"):
t = (step + (warmup_steps or 0)) * DT
# 先施加驱动力(受驱原子的位置覆盖初始/固定约束,为弹簧力提供正确参考)
x, y, z, vx, vy, vz = apply_driving_force(x, y, z, vx, vy, vz, t, step, DRIVER_DATA, DT)
traj_x[step] = x
traj_y[step] = y
traj_z[step] = z
traj_vx[step] = vx
traj_vy[step] = vy
traj_vz[step] = vz
if save_trajectory:
traj_x[step] = x
traj_y[step] = y
traj_z[step] = z
traj_vx[step] = vx
traj_vy[step] = vy
traj_vz[step] = vz
# 抽帧:NSTEP 间隔保存
if step % NSTEP == 0:
fi = step // NSTEP
sampled_x[fi] = x
sampled_y[fi] = y
sampled_z[fi] = z
sampled_vx[fi] = vx
sampled_vy[fi] = vy
sampled_vz[fi] = vz
frame_indices.append(step)
x, y, z, vx, vy, vz = apply_motion_update(x, y, z, vx, vy, vz, DT, ATOM_MASSES, G, B)
x, y, z = wrap_position(x, y, z)
x, y, z, vx, vy, vz = apply_fixed_constraints(x, y, z, vx, vy, vz)
return traj_x, traj_y, traj_z, traj_vx, traj_vy, traj_vz
# 写入 display.txt(新格式)
output_dir = get_output_dir()
disp_path = os.path.join(output_dir, "display.txt")
n_frames_actual = len(frame_indices)
save_display_txt(
disp_path,
sampled_x[:n_frames_actual], sampled_y[:n_frames_actual], sampled_z[:n_frames_actual],
sampled_vx[:n_frames_actual], sampled_vy[:n_frames_actual], sampled_vz[:n_frames_actual],
np.array(ATOM_IDS), record_steps, n_atoms,
header_fields={"DT": str(DT), "NSTEP": str(NSTEP), "method": str(METHOD),
"warmup_steps": str(warmup_steps or 0),
"X_MAX": str(X_MAX), "X_MIN": str(X_MIN),
"Y_MAX": str(Y_MAX), "Y_MIN": str(Y_MIN),
"Z_MAX": str(Z_MAX), "Z_MIN": str(Z_MIN),
"ball_radius": str(ball_radius),
"ball_color_r": str(ball_color_r),
"ball_color_g": str(ball_color_g),
"ball_color_b": str(ball_color_b),
"box_color_r": str(box_color_r),
"box_color_g": str(box_color_g),
"box_color_b": str(box_color_b),
"gravity_field": str(GRAVITY_FIELD),
"gravity_interaction": str(GRAVITY_INTERACTION),
"elastic_force": str(ELASTIC_FORCE),
"damping_force": str(DAMPING_FORCE),
"gravity_strength": str(GRAVITY_STRENGTH),
"driving_force": str(DRIVING_FORCE)}
)
print(f"[compute] display.txt 已保存至: {disp_path} ({n_frames_actual} 帧)")
# 可选:保存完整 trajectory.txt
if save_trajectory:
save_trajectory_txt(traj_x, traj_y, traj_z, traj_vx, traj_vy, traj_vz)
print(f"[compute] trajectory.txt 已保存(完整轨迹)")
return sampled_x[:n_frames_actual], sampled_y[:n_frames_actual], sampled_z[:n_frames_actual], \
sampled_vx[:n_frames_actual], sampled_vy[:n_frames_actual], sampled_vz[:n_frames_actual]
def save_trajectory_table_txt(txt_path, traj_x, traj_y, traj_z, traj_vx, traj_vy, traj_vz, NT, DT):
@@ -1313,10 +1512,13 @@ def main():
output_dir = get_output_dir(script_dir)
print(f"[compute] 开始计算 NT={NT} DT={DT} ")
traj_x, traj_y, traj_z, traj_vx, traj_vy, traj_vz = run_simulation()
traj_x, traj_y, traj_z, traj_vx, traj_vy, traj_vz = run_simulation(save_trajectory=0)
print(f"[compute] 计算完成,共 {NT}")
print(f"[compute] display.txt 已在 run_simulation 中保存")
save_trajectory_txt(traj_x, traj_y, traj_z, traj_vx, traj_vy, traj_vz, script_dir)
# 如果需要完整轨迹,以上传 save_trajectory=1 重新运行
# 以下旧函数保留兼容但不再自动调用
# save_trajectory_txt(...)
# 同时保存为逐行表格,便于直接查看
txt_path = os.path.join(output_dir, "trajectory_table.txt")