From 03af8507d89c0f3674c0b08cc1b7d9b202871cd7 Mon Sep 17 00:00:00 2001 From: ZhuangYumin Date: Fri, 6 Sep 2024 15:12:18 +0800 Subject: [PATCH] write low_bount_cal --- .gitignore | 3 +- A/3/dragon.py | 155 ++++++++++++++++++++++++++++++++++++++++ A/3/low_bound_cal.py | 42 +++++++++++ A/3/sufficiency_test.py | 1 + 4 files changed, 200 insertions(+), 1 deletion(-) create mode 100644 A/3/dragon.py create mode 100644 A/3/low_bound_cal.py create mode 100644 A/3/sufficiency_test.py diff --git a/.gitignore b/.gitignore index 7cf14b5..e1d25da 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ *.json *.bak *.dat -*.log \ No newline at end of file +*.log +__pycache__/ \ No newline at end of file diff --git a/A/3/dragon.py b/A/3/dragon.py new file mode 100644 index 0000000..57a3b35 --- /dev/null +++ b/A/3/dragon.py @@ -0,0 +1,155 @@ +import mpmath as mp +import json +import sys +import matplotlib.pyplot as plt +from matplotlib.patches import Rectangle +import numpy as np +from typing import * +import threading +import numba +import multiprocessing + +mp.dps = 15 # 设置精度为15位小数 + +kSegLength1 = mp.mpf('2.86') +kSegLength2 = mp.mpf('1.65') +kPointsConsidered=50 +kInnerCircleRadius=4.5 + +class Dragon: + def __init__(self,kAlpha): + self.kAlpha = kAlpha + def Theta2C(self, theta): + tmp = mp.sqrt(1 + theta**2) + return self.kAlpha * 0.5 * (theta * tmp - mp.log(-theta + tmp)) + + def Theta2Dot(self, theta): + return (self.kAlpha * theta * mp.cos(theta), self.kAlpha * theta * mp.sin(theta)) + + def GenerateFirstNodeTheta(self, delta_theta): + return kInnerCircleRadius/self.kAlpha + + def GenerateFollowNodeTheta(self, cur_node_theta, expected_distance): + cur_node_dot = self.Theta2Dot(cur_node_theta) + def f(theta): + test_node_dot = self.Theta2Dot(theta) + actual_distance = mp.sqrt((cur_node_dot[0]-test_node_dot[0])**2 + (cur_node_dot[1]-test_node_dot[1])**2) + return actual_distance - expected_distance + return mp.findroot(f, cur_node_theta + 0.1, solver='secant',tol=1e-20) + + + def CalcMoveList(self, delta_theta=0): + first_node_theta = self.GenerateFirstNodeTheta(delta_theta) + first_node_dot = self.Theta2Dot(first_node_theta) + first_node_C = self.Theta2C(first_node_theta) + + node_list = [{"theta": first_node_theta, "node": first_node_dot, "C": first_node_C, "v": mp.mpf('1.0')}] + + for i in range(1, kPointsConsidered): + expected_distance = kSegLength1 if i == 1 else kSegLength2 + cur_node_theta = self.GenerateFollowNodeTheta(node_list[-1]["theta"], expected_distance) + cur_node_dot = self.Theta2Dot(cur_node_theta) + cur_node_C = self.Theta2C(cur_node_theta) + node_list.append({"theta": cur_node_theta, "node": cur_node_dot, "C": cur_node_C}) + for i in range(kPointsConsidered-1): + AA = kSegLength1 if i == 0 else kSegLength2 + theta_i = node_list[i]["theta"] + theta_ip1 = node_list[i+1]["theta"] + alpha_i = mp.atan(theta_i) + alpha_ip1 = mp.atan(theta_ip1) + beta_i = mp.acos(((self.kAlpha*theta_i)**2 + AA**2 - (self.kAlpha*theta_ip1)**2) / (2*self.kAlpha*theta_i*AA)) + gama_i = mp.acos(((self.kAlpha*theta_ip1)**2 + AA**2 - (self.kAlpha*theta_i)**2) / (2*self.kAlpha*theta_ip1*AA)) + node_list[i+1]["v"] = node_list[i]["v"] * (-mp.cos(alpha_i + beta_i) / mp.cos(alpha_ip1 - gama_i)) + + return node_list + + +# 将结果转换为float并保留6位小数 +def mp2float(time_point_list): + float_res_list = [ + {k: round(float(v), 6) if isinstance(v, mp.mpf) else + [round(float(x), 6) for x in v] if isinstance(v, tuple) else v + for k, v in node.items()} + for node in time_point_list + ] + return float_res_list + +def status2blocks(node_list): + res=[] + for i in range(len(node_list) - 1): + x1, y1 = [float(coord) for coord in node_list[i]["node"]] + x2, y2 = [float(coord) for coord in node_list[i+1]["node"]] + # 计算并绘制木板(长方形) + dx = x2 - x1 + dy = y2 - y1 + length = np.sqrt(dx**2 + dy**2) + angle = np.arctan2(dy, dx) + + rect_length = length + 0.55 # 总长度加上两端各延伸的0.275m + rect_width = 0.3 + + # 计算长方形的中心点 + center_x = (x1 + x2) / 2 + center_y = (y1 + y2) / 2 + + # 左下角坐标 + rect1_x = center_x - rect_length/2 * np.cos(angle) + rect_width/2 * np.sin(angle) + rect1_y = center_y - rect_length/2 * np.sin(angle) - rect_width/2 * np.cos(angle) + + # 右下角坐标 + rect2_x = center_x + rect_length/2 * np.cos(angle) + rect_width/2 * np.sin(angle) + rect2_y = center_y + rect_length/2 * np.sin(angle) - rect_width/2 * np.cos(angle) + + # 右上角坐标 + rect3_x = center_x + rect_length/2 * np.cos(angle) - rect_width/2 * np.sin(angle) + rect3_y = center_y + rect_length/2 * np.sin(angle) + rect_width/2 * np.cos(angle) + + # 左上角坐标 + rect4_x = center_x - rect_length/2 * np.cos(angle) - rect_width/2 * np.sin(angle) + rect4_y = center_y - rect_length/2 * np.sin(angle) + rect_width/2 * np.cos(angle) + + res.append(((rect1_x, rect1_y), (rect2_x, rect2_y), (rect3_x, rect3_y), (rect4_x, rect4_y))) + + return res + +@numba.njit +def CrossProduct(a,b): + return a[0]*b[1]-a[1]*b[0] +@numba.njit +def PointInBlock(point,block): + vec1_alpha=(block[1][0]-block[0][0],block[1][1]-block[0][1]) + vec1_beta=(point[0]-block[0][0],point[1]-block[0][1]) + vec2_alpha=(block[2][0]-block[1][0],block[2][1]-block[1][1]) + vec2_beta=(point[0]-block[1][0],point[1]-block[1][1]) + vec3_alpha=(block[3][0]-block[2][0],block[3][1]-block[2][1]) + vec3_beta=(point[0]-block[2][0],point[1]-block[2][1]) + vec4_alpha=(block[0][0]-block[3][0],block[0][1]-block[3][1]) + vec4_beta=(point[0]-block[3][0],point[1]-block[3][1]) + status1=CrossProduct(vec1_alpha,vec1_beta) + status2=CrossProduct(vec2_alpha,vec2_beta) + status3=CrossProduct(vec3_alpha,vec3_beta) + status4=CrossProduct(vec4_alpha,vec4_beta) + if status1<0: + return -1 + if status2<0: + return -1 + if status3<0: + return -1 + if status4<0: + return -1 + kEps=1e-10 + if status1res: + res=status + if res==1: + break + return res diff --git a/A/3/low_bound_cal.py b/A/3/low_bound_cal.py new file mode 100644 index 0000000..8b5c4f2 --- /dev/null +++ b/A/3/low_bound_cal.py @@ -0,0 +1,42 @@ +from dragon import * + +kBegPitch = 0.3 +kEndPitch = 0.55 +kTotalSteps = 10000 +kStepAlpha = (kEndPitch - kBegPitch) / kTotalSteps +kParallelNum=24 +tasks_list = [kBegPitch + kStepAlpha * i for i in range(kTotalSteps)] +task_list_per_process=[tasks_list[i::kParallelNum] for i in range(kParallelNum)] +print(f"len(task_list_per_thread)={len(task_list_per_process)}",file=sys.stderr) +def ProcessEntryPoint(arg): + pitch_list, process_id, shared_dict, lock = arg + logf=open(f"low_bound_cal_{process_id}.log","w") + print(f"calculating pitch_list={pitch_list} with process_id={process_id}",file=logf) + cnt = 0 + tmp_res={} + for pitch in pitch_list: + dragen = Dragon(mp.mpf(pitch)/(2*mp.pi)) + status = CheckCollision(status2blocks(dragen.CalcMoveList())) + tmp_res[pitch]=status + print(f"res={status}",file=logf) + with lock: # 添加锁保护对共享字典的操作 + shared_dict.update(tmp_res) + +if __name__ == "__main__": + manager = multiprocessing.Manager() + shared_dict = manager.dict() # 创建一个共享字典 + lock = manager.Lock() # 创建一个共享的锁 + task_args_list = [(task_list_per_process[i], i, shared_dict, lock) for i in range(kParallelNum)] + with multiprocessing.Pool(processes=kParallelNum) as pool: + pool.map(ProcessEntryPoint, task_args_list) + print(f"\nFinal Results: \n") + res_array=sorted(shared_dict.items()) + for item in res_array: + time_point, status = item + print(f"time point={time_point}",end=" ") + if status==-1: + print("NoCollision") + elif status==0: + print("CollisionOnEdge") + else: + print("CollisionInBlock") \ No newline at end of file diff --git a/A/3/sufficiency_test.py b/A/3/sufficiency_test.py new file mode 100644 index 0000000..d339e9f --- /dev/null +++ b/A/3/sufficiency_test.py @@ -0,0 +1 @@ +from dragon import * \ No newline at end of file