Files
cumcm2024-code/A/3/sufficiency_test.py
2024-09-08 18:28:14 +08:00

50 lines
1.8 KiB
Python

from dragon import *
kBegPitch = 0.45033
kEndPitch = 0.45034
kTotalSteps = 10
kStepPitch = (kEndPitch - kBegPitch) / kTotalSteps
kParallelNum=24
tasks_list = [kBegPitch + kStepPitch * i for i in range(kTotalSteps)]
task_list_per_process=[tasks_list[i::kParallelNum] for i in range(kParallelNum)]
kDeltaThetaBeg=0
kDeltaThetaEnd=5*2*3.1415926
kStepDeltaTheta=(kDeltaThetaEnd-kDeltaThetaBeg)/10000
print(f"len(task_list_per_thread)={len(task_list_per_process)}",file=sys.stderr)
def ProcessEntryPoint(arg):
pitch_list, process_id, shared_dict, lock = arg
logf=open(f"sufficiency_test_{process_id}.log","w")
print(f"calculating pitch_list={pitch_list} with process_id={process_id}",file=logf)
cnt = 0
tmp_res={}
for pitch in pitch_list:
dragen = Dragon(mp.mpf(pitch)/(2*mp.pi))
status = 1 # OK
for delta_theta in np.arange(kDeltaThetaBeg, kDeltaThetaEnd, kStepDeltaTheta):
cur_status=CheckCollision(status2blocks(dragen.CalcMoveList(delta_theta)))
if cur_status>=0:
status=-1
break
tmp_res[pitch]=status
print(f"res={status}",file=logf)
if status == 1:
break
with lock: # 添加锁保护对共享字典的操作
shared_dict.update(tmp_res)
if __name__ == "__main__":
manager = multiprocessing.Manager()
shared_dict = manager.dict() # 创建一个共享字典
lock = manager.Lock() # 创建一个共享的锁
task_args_list = [(task_list_per_process[i], i, shared_dict, lock) for i in range(kParallelNum)]
with multiprocessing.Pool(processes=kParallelNum) as pool:
pool.map(ProcessEntryPoint, task_args_list)
print(f"\nFinal Results: \n")
res_array=sorted(shared_dict.items())
for item in res_array:
time_point, status = item
print(f"Pitch={time_point}",end=" ")
if status==1:
print("OK")
else:
print("Error")