Files
cumcm2024-code/A/3/full_valid_test.py
2024-09-07 16:47:14 +08:00

41 lines
1.7 KiB
Python

from dragon import *
kPitchToTest=mp.mpf("0.45033740")
kDeltaThetaBeg=0
kDeltaThetaEnd=2*2*3.1415926
kTotalSteps=10000
kStepDeltaTheta=(kDeltaThetaEnd-kDeltaThetaBeg)/kTotalSteps
kParallelNum=24
tasks_list=[i for i in np.arange(kDeltaThetaBeg, kDeltaThetaEnd, kStepDeltaTheta)]
task_list_per_process=[tasks_list[i::kParallelNum] for i in range(kParallelNum)]
print(f"len(task_list_per_thread)={len(task_list_per_process)}",file=sys.stderr)
def ProcessEntryPoint(arg):
dragen = Dragon(kPitchToTest/(2*mp.pi))
delta_theta_list, process_id = arg
logf=open(f"sufficiency_test_{process_id}.log","w")
print(f"calculating delta_theta_list={delta_theta_list} with process_id={process_id}",file=logf)
for delta_theta in delta_theta_list:
cur_status=CheckCollision(status2blocks(dragen.CalcMoveList(delta_theta)))
if cur_status>=0:
return False # Error
return True # OK
if __name__ == "__main__":
manager = multiprocessing.Manager()
task_args_list = [(task_list_per_process[i], i) for i in range(kParallelNum)]
with multiprocessing.Pool(processes=kParallelNum) as pool:
results=pool.map(ProcessEntryPoint, task_args_list)
if False in results:
print("Error")
else:
print("OK")
# Now generate an gif for human to check
dragen = Dragon(kPitchToTest/(2*mp.pi))
kTotalFrames=100
kStepDeltaTheta=(kDeltaThetaEnd-kDeltaThetaBeg)/kTotalFrames
frame_list=[]
for i in range(kTotalFrames):
delta_theta=kDeltaThetaBeg+kStepDeltaTheta*i
cur_frame=dragen.GenerateImg(mp2float(dragen.CalcMoveList(delta_theta)))
frame_list.append(cur_frame)
frame_list.reverse()
frame_list[0].save('dragon.gif', save_all=True, append_images=frame_list[1:], optimize=False, duration=1000, loop=0)