from dragon import * kPitchToTest=0.450338 kDeltaThetaBeg=0 kDeltaThetaEnd=2*2*3.1415926 kTotalSteps=100000 kStepDeltaTheta=(kDeltaThetaEnd-kDeltaThetaBeg)/kTotalSteps kParallelNum=24 tasks_list=[i for i in np.arange(kDeltaThetaBeg, kDeltaThetaEnd, kStepDeltaTheta)] task_list_per_process=[tasks_list[i::kParallelNum] for i in range(kParallelNum)] print(f"len(task_list_per_thread)={len(task_list_per_process)}",file=sys.stderr) def ProcessEntryPoint(arg): dragen = Dragon(mp.mpf(kPitchToTest)/(2*mp.pi)) delta_theta_list, process_id = arg logf=open(f"sufficiency_test_{process_id}.log","w") print(f"calculating delta_theta_list={delta_theta_list} with process_id={process_id}",file=logf) for delta_theta in delta_theta_list: cur_status=CheckCollision(status2blocks(dragen.CalcMoveList(delta_theta))) if cur_status>=0: return False # Error return True # OK if __name__ == "__main__": manager = multiprocessing.Manager() task_args_list = [(task_list_per_process[i], i) for i in range(kParallelNum)] with multiprocessing.Pool(processes=kParallelNum) as pool: results=pool.map(ProcessEntryPoint, task_args_list) if False in results: print("Error") else: print("OK")