from dragon import * kPitchToTest=0.450338 kDeltaThetaBeg=0 kDeltaThetaEnd=2*2*3.1415926 kTotalSteps=100000 kStepDeltaTheta=(kDeltaThetaEnd-kDeltaThetaBeg)/kTotalSteps kParallelNum=24 tasks_list=[i for i in np.arange(kDeltaThetaBeg, kDeltaThetaEnd, kStepDeltaTheta)] task_list_per_process=[tasks_list[i::kParallelNum] for i in range(kParallelNum)] print(f"len(task_list_per_thread)={len(task_list_per_process)}",file=sys.stderr) def ProcessEntryPoint(arg): dragen = Dragon(mp.mpf(kPitchToTest)/(2*mp.pi)) delta_theta_list, process_id = arg logf=open(f"sufficiency_test_{process_id}.log","w") print(f"calculating delta_theta_list={delta_theta_list} with process_id={process_id}",file=logf) for delta_theta in delta_theta_list: cur_status=CheckCollision(status2blocks(dragen.CalcMoveList(delta_theta))) if cur_status>=0: return False # Error return True # OK if __name__ == "__main__": manager = multiprocessing.Manager() task_args_list = [(task_list_per_process[i], i) for i in range(kParallelNum)] with multiprocessing.Pool(processes=kParallelNum) as pool: results=pool.map(ProcessEntryPoint, task_args_list) if False in results: print("Error") else: print("OK") # Now generate an gif for human to check dragen = Dragon(mp.mpf(kPitchToTest)/(2*mp.pi)) kTotalFrames=100 kStepDeltaTheta=(kDeltaThetaEnd-kDeltaThetaBeg)/kTotalFrames frame_list=[] for i in range(kTotalFrames): delta_theta=kDeltaThetaBeg+kStepDeltaTheta*i cur_frame=dragen.GenerateImg(mp2float(dragen.CalcMoveList(delta_theta))) frame_list.append(cur_frame) frame_list.reverse() frame_list[0].save('dragon.gif', save_all=True, append_images=frame_list[1:], optimize=False, duration=1000, loop=0)