write low_bount_cal
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -3,3 +3,4 @@
|
|||||||
*.bak
|
*.bak
|
||||||
*.dat
|
*.dat
|
||||||
*.log
|
*.log
|
||||||
|
__pycache__/
|
155
A/3/dragon.py
Normal file
155
A/3/dragon.py
Normal file
@@ -0,0 +1,155 @@
|
|||||||
|
import mpmath as mp
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
from matplotlib.patches import Rectangle
|
||||||
|
import numpy as np
|
||||||
|
from typing import *
|
||||||
|
import threading
|
||||||
|
import numba
|
||||||
|
import multiprocessing
|
||||||
|
|
||||||
|
mp.dps = 15 # 设置精度为15位小数
|
||||||
|
|
||||||
|
kSegLength1 = mp.mpf('2.86')
|
||||||
|
kSegLength2 = mp.mpf('1.65')
|
||||||
|
kPointsConsidered=50
|
||||||
|
kInnerCircleRadius=4.5
|
||||||
|
|
||||||
|
class Dragon:
|
||||||
|
def __init__(self,kAlpha):
|
||||||
|
self.kAlpha = kAlpha
|
||||||
|
def Theta2C(self, theta):
|
||||||
|
tmp = mp.sqrt(1 + theta**2)
|
||||||
|
return self.kAlpha * 0.5 * (theta * tmp - mp.log(-theta + tmp))
|
||||||
|
|
||||||
|
def Theta2Dot(self, theta):
|
||||||
|
return (self.kAlpha * theta * mp.cos(theta), self.kAlpha * theta * mp.sin(theta))
|
||||||
|
|
||||||
|
def GenerateFirstNodeTheta(self, delta_theta):
|
||||||
|
return kInnerCircleRadius/self.kAlpha
|
||||||
|
|
||||||
|
def GenerateFollowNodeTheta(self, cur_node_theta, expected_distance):
|
||||||
|
cur_node_dot = self.Theta2Dot(cur_node_theta)
|
||||||
|
def f(theta):
|
||||||
|
test_node_dot = self.Theta2Dot(theta)
|
||||||
|
actual_distance = mp.sqrt((cur_node_dot[0]-test_node_dot[0])**2 + (cur_node_dot[1]-test_node_dot[1])**2)
|
||||||
|
return actual_distance - expected_distance
|
||||||
|
return mp.findroot(f, cur_node_theta + 0.1, solver='secant',tol=1e-20)
|
||||||
|
|
||||||
|
|
||||||
|
def CalcMoveList(self, delta_theta=0):
|
||||||
|
first_node_theta = self.GenerateFirstNodeTheta(delta_theta)
|
||||||
|
first_node_dot = self.Theta2Dot(first_node_theta)
|
||||||
|
first_node_C = self.Theta2C(first_node_theta)
|
||||||
|
|
||||||
|
node_list = [{"theta": first_node_theta, "node": first_node_dot, "C": first_node_C, "v": mp.mpf('1.0')}]
|
||||||
|
|
||||||
|
for i in range(1, kPointsConsidered):
|
||||||
|
expected_distance = kSegLength1 if i == 1 else kSegLength2
|
||||||
|
cur_node_theta = self.GenerateFollowNodeTheta(node_list[-1]["theta"], expected_distance)
|
||||||
|
cur_node_dot = self.Theta2Dot(cur_node_theta)
|
||||||
|
cur_node_C = self.Theta2C(cur_node_theta)
|
||||||
|
node_list.append({"theta": cur_node_theta, "node": cur_node_dot, "C": cur_node_C})
|
||||||
|
for i in range(kPointsConsidered-1):
|
||||||
|
AA = kSegLength1 if i == 0 else kSegLength2
|
||||||
|
theta_i = node_list[i]["theta"]
|
||||||
|
theta_ip1 = node_list[i+1]["theta"]
|
||||||
|
alpha_i = mp.atan(theta_i)
|
||||||
|
alpha_ip1 = mp.atan(theta_ip1)
|
||||||
|
beta_i = mp.acos(((self.kAlpha*theta_i)**2 + AA**2 - (self.kAlpha*theta_ip1)**2) / (2*self.kAlpha*theta_i*AA))
|
||||||
|
gama_i = mp.acos(((self.kAlpha*theta_ip1)**2 + AA**2 - (self.kAlpha*theta_i)**2) / (2*self.kAlpha*theta_ip1*AA))
|
||||||
|
node_list[i+1]["v"] = node_list[i]["v"] * (-mp.cos(alpha_i + beta_i) / mp.cos(alpha_ip1 - gama_i))
|
||||||
|
|
||||||
|
return node_list
|
||||||
|
|
||||||
|
|
||||||
|
# 将结果转换为float并保留6位小数
|
||||||
|
def mp2float(time_point_list):
|
||||||
|
float_res_list = [
|
||||||
|
{k: round(float(v), 6) if isinstance(v, mp.mpf) else
|
||||||
|
[round(float(x), 6) for x in v] if isinstance(v, tuple) else v
|
||||||
|
for k, v in node.items()}
|
||||||
|
for node in time_point_list
|
||||||
|
]
|
||||||
|
return float_res_list
|
||||||
|
|
||||||
|
def status2blocks(node_list):
|
||||||
|
res=[]
|
||||||
|
for i in range(len(node_list) - 1):
|
||||||
|
x1, y1 = [float(coord) for coord in node_list[i]["node"]]
|
||||||
|
x2, y2 = [float(coord) for coord in node_list[i+1]["node"]]
|
||||||
|
# 计算并绘制木板(长方形)
|
||||||
|
dx = x2 - x1
|
||||||
|
dy = y2 - y1
|
||||||
|
length = np.sqrt(dx**2 + dy**2)
|
||||||
|
angle = np.arctan2(dy, dx)
|
||||||
|
|
||||||
|
rect_length = length + 0.55 # 总长度加上两端各延伸的0.275m
|
||||||
|
rect_width = 0.3
|
||||||
|
|
||||||
|
# 计算长方形的中心点
|
||||||
|
center_x = (x1 + x2) / 2
|
||||||
|
center_y = (y1 + y2) / 2
|
||||||
|
|
||||||
|
# 左下角坐标
|
||||||
|
rect1_x = center_x - rect_length/2 * np.cos(angle) + rect_width/2 * np.sin(angle)
|
||||||
|
rect1_y = center_y - rect_length/2 * np.sin(angle) - rect_width/2 * np.cos(angle)
|
||||||
|
|
||||||
|
# 右下角坐标
|
||||||
|
rect2_x = center_x + rect_length/2 * np.cos(angle) + rect_width/2 * np.sin(angle)
|
||||||
|
rect2_y = center_y + rect_length/2 * np.sin(angle) - rect_width/2 * np.cos(angle)
|
||||||
|
|
||||||
|
# 右上角坐标
|
||||||
|
rect3_x = center_x + rect_length/2 * np.cos(angle) - rect_width/2 * np.sin(angle)
|
||||||
|
rect3_y = center_y + rect_length/2 * np.sin(angle) + rect_width/2 * np.cos(angle)
|
||||||
|
|
||||||
|
# 左上角坐标
|
||||||
|
rect4_x = center_x - rect_length/2 * np.cos(angle) - rect_width/2 * np.sin(angle)
|
||||||
|
rect4_y = center_y - rect_length/2 * np.sin(angle) + rect_width/2 * np.cos(angle)
|
||||||
|
|
||||||
|
res.append(((rect1_x, rect1_y), (rect2_x, rect2_y), (rect3_x, rect3_y), (rect4_x, rect4_y)))
|
||||||
|
|
||||||
|
return res
|
||||||
|
|
||||||
|
@numba.njit
|
||||||
|
def CrossProduct(a,b):
|
||||||
|
return a[0]*b[1]-a[1]*b[0]
|
||||||
|
@numba.njit
|
||||||
|
def PointInBlock(point,block):
|
||||||
|
vec1_alpha=(block[1][0]-block[0][0],block[1][1]-block[0][1])
|
||||||
|
vec1_beta=(point[0]-block[0][0],point[1]-block[0][1])
|
||||||
|
vec2_alpha=(block[2][0]-block[1][0],block[2][1]-block[1][1])
|
||||||
|
vec2_beta=(point[0]-block[1][0],point[1]-block[1][1])
|
||||||
|
vec3_alpha=(block[3][0]-block[2][0],block[3][1]-block[2][1])
|
||||||
|
vec3_beta=(point[0]-block[2][0],point[1]-block[2][1])
|
||||||
|
vec4_alpha=(block[0][0]-block[3][0],block[0][1]-block[3][1])
|
||||||
|
vec4_beta=(point[0]-block[3][0],point[1]-block[3][1])
|
||||||
|
status1=CrossProduct(vec1_alpha,vec1_beta)
|
||||||
|
status2=CrossProduct(vec2_alpha,vec2_beta)
|
||||||
|
status3=CrossProduct(vec3_alpha,vec3_beta)
|
||||||
|
status4=CrossProduct(vec4_alpha,vec4_beta)
|
||||||
|
if status1<0:
|
||||||
|
return -1
|
||||||
|
if status2<0:
|
||||||
|
return -1
|
||||||
|
if status3<0:
|
||||||
|
return -1
|
||||||
|
if status4<0:
|
||||||
|
return -1
|
||||||
|
kEps=1e-10
|
||||||
|
if status1<kEps or status2<kEps or status3<kEps or status4<kEps:
|
||||||
|
return 0
|
||||||
|
return 1
|
||||||
|
def CheckCollision(block_list):
|
||||||
|
res = -1
|
||||||
|
for i in range(len(block_list)-1):
|
||||||
|
for j in range(2):
|
||||||
|
point=block_list[i][j]
|
||||||
|
for k in range(i+1,len(block_list)):
|
||||||
|
status=PointInBlock(point,block_list[k])
|
||||||
|
if status>res:
|
||||||
|
res=status
|
||||||
|
if res==1:
|
||||||
|
break
|
||||||
|
return res
|
42
A/3/low_bound_cal.py
Normal file
42
A/3/low_bound_cal.py
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
from dragon import *
|
||||||
|
|
||||||
|
kBegPitch = 0.3
|
||||||
|
kEndPitch = 0.55
|
||||||
|
kTotalSteps = 10000
|
||||||
|
kStepAlpha = (kEndPitch - kBegPitch) / kTotalSteps
|
||||||
|
kParallelNum=24
|
||||||
|
tasks_list = [kBegPitch + kStepAlpha * i for i in range(kTotalSteps)]
|
||||||
|
task_list_per_process=[tasks_list[i::kParallelNum] for i in range(kParallelNum)]
|
||||||
|
print(f"len(task_list_per_thread)={len(task_list_per_process)}",file=sys.stderr)
|
||||||
|
def ProcessEntryPoint(arg):
|
||||||
|
pitch_list, process_id, shared_dict, lock = arg
|
||||||
|
logf=open(f"low_bound_cal_{process_id}.log","w")
|
||||||
|
print(f"calculating pitch_list={pitch_list} with process_id={process_id}",file=logf)
|
||||||
|
cnt = 0
|
||||||
|
tmp_res={}
|
||||||
|
for pitch in pitch_list:
|
||||||
|
dragen = Dragon(mp.mpf(pitch)/(2*mp.pi))
|
||||||
|
status = CheckCollision(status2blocks(dragen.CalcMoveList()))
|
||||||
|
tmp_res[pitch]=status
|
||||||
|
print(f"res={status}",file=logf)
|
||||||
|
with lock: # 添加锁保护对共享字典的操作
|
||||||
|
shared_dict.update(tmp_res)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
manager = multiprocessing.Manager()
|
||||||
|
shared_dict = manager.dict() # 创建一个共享字典
|
||||||
|
lock = manager.Lock() # 创建一个共享的锁
|
||||||
|
task_args_list = [(task_list_per_process[i], i, shared_dict, lock) for i in range(kParallelNum)]
|
||||||
|
with multiprocessing.Pool(processes=kParallelNum) as pool:
|
||||||
|
pool.map(ProcessEntryPoint, task_args_list)
|
||||||
|
print(f"\nFinal Results: \n")
|
||||||
|
res_array=sorted(shared_dict.items())
|
||||||
|
for item in res_array:
|
||||||
|
time_point, status = item
|
||||||
|
print(f"time point={time_point}",end=" ")
|
||||||
|
if status==-1:
|
||||||
|
print("NoCollision")
|
||||||
|
elif status==0:
|
||||||
|
print("CollisionOnEdge")
|
||||||
|
else:
|
||||||
|
print("CollisionInBlock")
|
1
A/3/sufficiency_test.py
Normal file
1
A/3/sufficiency_test.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
from dragon import *
|
Reference in New Issue
Block a user