41 lines
1.7 KiB
Python
41 lines
1.7 KiB
Python
from dragon import *
|
|
kPitchToTest=0.450338
|
|
kDeltaThetaBeg=0
|
|
kDeltaThetaEnd=2*2*3.1415926
|
|
kTotalSteps=100000
|
|
kStepDeltaTheta=(kDeltaThetaEnd-kDeltaThetaBeg)/kTotalSteps
|
|
kParallelNum=24
|
|
tasks_list=[i for i in np.arange(kDeltaThetaBeg, kDeltaThetaEnd, kStepDeltaTheta)]
|
|
task_list_per_process=[tasks_list[i::kParallelNum] for i in range(kParallelNum)]
|
|
print(f"len(task_list_per_thread)={len(task_list_per_process)}",file=sys.stderr)
|
|
def ProcessEntryPoint(arg):
|
|
dragen = Dragon(mp.mpf(kPitchToTest)/(2*mp.pi))
|
|
delta_theta_list, process_id = arg
|
|
logf=open(f"sufficiency_test_{process_id}.log","w")
|
|
print(f"calculating delta_theta_list={delta_theta_list} with process_id={process_id}",file=logf)
|
|
for delta_theta in delta_theta_list:
|
|
cur_status=CheckCollision(status2blocks(dragen.CalcMoveList(delta_theta)))
|
|
if cur_status>=0:
|
|
return False # Error
|
|
return True # OK
|
|
|
|
if __name__ == "__main__":
|
|
manager = multiprocessing.Manager()
|
|
task_args_list = [(task_list_per_process[i], i) for i in range(kParallelNum)]
|
|
with multiprocessing.Pool(processes=kParallelNum) as pool:
|
|
results=pool.map(ProcessEntryPoint, task_args_list)
|
|
if False in results:
|
|
print("Error")
|
|
else:
|
|
print("OK")
|
|
# Now generate an gif for human to check
|
|
dragen = Dragon(mp.mpf(kPitchToTest)/(2*mp.pi))
|
|
kTotalFrames=100
|
|
kStepDeltaTheta=(kDeltaThetaEnd-kDeltaThetaBeg)/kTotalFrames
|
|
frame_list=[]
|
|
for i in range(kTotalFrames):
|
|
delta_theta=kDeltaThetaBeg+kStepDeltaTheta*i
|
|
cur_frame=dragen.GenerateImg(mp2float(dragen.CalcMoveList(delta_theta)))
|
|
frame_list.append(cur_frame)
|
|
frame_list.reverse()
|
|
frame_list[0].save('dragon.gif', save_all=True, append_images=frame_list[1:], optimize=False, duration=1000, loop=0) |