Python math 模块,copysign() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用math.copysign()。
def get_cubic_root(self):
# We have the equation x^2 D^2 + (1-x)^4 * C / h_min^2
# where x = sqrt(mu).
# We substitute x, which is sqrt(mu), with x = y + 1.
# It gives y^3 + py = q
# where p = (D^2 h_min^2)/(2*C) and q = -p.
# We use the Vieta's substution to compute the root.
# There is only one real solution y (which is in [0, 1] ).
# http://mathworld.wolfram.com/VietasSubstitution.html
# eps in the numerator is to prevent momentum = 1 in case of zero gradient
p = (self._dist_to_opt + eps)**2 * (self._h_min + eps)**2 / 2 / (self._grad_var + eps)
w3 = (-math.sqrt(p**2 + 4.0 / 27.0 * p**3) - p) / 2.0
w = math.copysign(1.0, w3) * math.pow(math.fabs(w3), 1.0/3.0)
y = w - p / 3.0 / (w + eps)
x = y + 1
if DEBUG:
logging.debug("p %f, den %f", p, self._grad_var + eps)
logging.debug("w3 %f ", w3)
logging.debug("y %f, den %f", y, w + eps)
return x
def process_transactions(self, trade_event, current_orders):
for order, txn in self.transact(trade_event, current_orders):
if txn.type == zp.DATASOURCE_TYPE.COMMISSION:
order.commission = (order.commission or 0.0) + txn.cost
else:
if txn.amount == 0:
raise zipline.errors.TransactionWithNoAmount(txn=txn)
if math.copysign(1, txn.amount) != order.direction:
raise zipline.errors.TransactionWithWrongDirection(
txn=txn, order=order)
if abs(txn.amount) > abs(self.orders[txn.order_id].amount):
raise zipline.errors.TransactionVolumeExceedsOrder(
txn=txn, order=order)
order.filled += txn.amount
if txn.commission is not None:
order.commission = ((order.commission or 0.0) +
txn.commission)
# mark the date of the order to match the transaction
# that is filling it.
order.dt = txn.dt
yield txn, order
def scrollEvent(self, dx=0, dy=0):
"""
Generate scroll events from parametters and displacement
@param int dx delta movement from last call on x axis
@param int dy delta movement from last call on y axis
@return float absolute distance moved this tick
"""
# Compute mouse mouvement from interger part of d * scale
self._scr_dx += dx * self._scr_xscale
self._scr_dy += dy * self._scr_yscale
_syn = False
if int(self._scr_dx):
self.relEvent(rel=Rels.REL_HWHEEL, val=int(copysign(1, self._scr_dx)))
self._scr_dx -= int(self._scr_dx)
_syn = True
if int(self._scr_dy):
self.relEvent(rel=Rels.REL_WHEEL, val=int(copysign(1, self._scr_dy)))
self._scr_dy -= int(self._scr_dy)
_syn = True
if _syn:
self.synEvent()
def mode_ROUND(self, x, y, range):
"""
If input value bellow deadzone range, output value is zero
If input value is above deadzone range,
output value is 1 (or maximum allowed)
"""
if y == 0:
# Small optimalization for 1D input, for example trigger
if abs(x) > self.upper:
return copysign(range, x)
return (0 if abs(x) < self.lower else x), 0
distance = sqrt(x*x + y*y)
if distance < self.lower:
return 0, 0
if distance > self.upper:
angle = atan2(x, y)
return range * sin(angle), range * cos(angle)
return x, y
def mode_LINEAR(self, x, y, range):
"""
Input value is scaled, so entire output range is covered by
reduced input range of deadzone.
"""
if y == 0:
# Small optimalization for 1D input, for example trigger
return copysign(
clamp(
0,
((x - self.lower) / (self.upper - self.lower)) * range,
range),
x
), 0
distance = clamp(self.lower, sqrt(x*x + y*y), self.upper)
distance = (distance - self.lower) / (self.upper - self.lower) * range
angle = atan2(x, y)
return distance * sin(angle), distance * cos(angle)
def _subplot_scrolled(self, event):
keys = event['key']
if not keys:
# at least 1 step per event but a maximum of 5
step = max(1, abs(int(event['step'])))
step = min(step, 5)
step = copysign(step, event['step'])
slice_model = self._get_slice_view(event).model()
index = int(slice_model.index + step)
if 0 <= index < len(slice_model):
self._context.update_index_for_direction(slice_model.index_direction, index)
elif keys.state(ctrl=True) or keys.state(shift=True):
x = event['x']
y = event['y']
step = max(1, abs(int(event['step'])))
step = copysign(step / 20.0, event['step'])
slice_view = self._get_slice_view(event)
if slice_view.zoom(x, y, step):
self._context_changed()
def _from_float(cls, f):
if isinstance(f, int): # handle integer inputs
return cls(f)
if not isinstance(f, float):
raise TypeError("argument must be int or float.")
if _math.isinf(f) or _math.isnan(f):
return cls(repr(f))
if _math.copysign(1.0, f) == 1.0:
sign = 0
else:
sign = 1
n, d = abs(f).as_integer_ratio()
#k = d.bit_length() - 1
k = _bit_length(d) - 1
result = _dec_from_triple(sign, str(n*5**k), -k)
if cls is Decimal:
return result
else:
return cls(result)
def assertFloatIdentical(self, x, y):
"""Fail unless floats x and y are identical, in the sense that:
(1) both x and y are nans, or
(2) both x and y are infinities, with the same sign, or
(3) both x and y are zeros, with the same sign, or
(4) x and y are both finite and nonzero, and x == y
"""
msg = 'floats {!r} and {!r} are not identical'
if math.isnan(x) or math.isnan(y):
if math.isnan(x) and math.isnan(y):
return
elif x == y:
if x != 0.0:
return
# both zero; check that signs match
elif math.copysign(1.0, x) == math.copysign(1.0, y):
return
else:
msg += ': zeros have different signs'
self.fail(msg.format(x, y))
def assertFloatsAreIdentical(self, x, y):
"""assert that floats x and y are identical, in the sense that:
(1) both x and y are nans, or
(2) both x and y are infinities, with the same sign, or
(3) both x and y are zeros, with the same sign, or
(4) x and y are both finite and nonzero, and x == y
"""
msg = 'floats {!r} and {!r} are not identical'
if isnan(x) or isnan(y):
if isnan(x) and isnan(y):
return
elif x == y:
if x != 0.0:
return
# both zero; check that signs match
elif copysign(1.0, x) == copysign(1.0, y):
return
else:
msg += ': zeros have different signs'
self.fail(msg.format(x, y))
def __init__(self, lattice, m, n):
self.lattice = lattice
self.m = m
self.n = n
# Chiral vector
self.c = lattice.pos(m,n)
self.magC = mag(self.c)
# Translation vector
d = gcd(2*n+m,2*m+n)
self.t = lattice.pos((2*n+m)/d, -(2*m+n)/d);
self.magT = mag(self.t)
# Chiral rotation matrix (rotate a1 along x-axis)
self.theta = acos(norm(self.c)[0]*copysign(1, self.c[1]))
self.rotM = np.array([
[cos(self.theta), sin(self.theta)],
[-sin(self.theta), cos(self.theta)]]).T
# Calculate atoms and bonds in unit cell
self._boundsErr = mag(lattice.pos(0,0,0) - lattice.pos(0,0,1))
self.indices = self._calcIndices(m, n)
self.atoms = self._calcAtoms(self.indices)
self.bonds = self._calcBonds(self.indices)
def start_run(self, at_home_door, distance, angular):
if at_home_door == True:
self.brake()
self.robot_move.move_to(x=self.last_move_goal)
return True
if distance > (self.line_distance + self.distance_tolerance):
self.vel.linear.x = self.x_speed
self.vel.linear.y = math.copysign(self.y_speed, self.move_direction)
elif distance < (self.line_distance - self.distance_tolerance):
self.vel.linear.x = self.x_speed * -1
self.vel.linear.y = self.y_speed
else:
self.vel.linear.x = 0
self.vel.linear.y = self.y_speed
if math.fabs(angular) < self.angular_tolerance:
self.vel.angular.z = 0
elif angular > 0:
self.vel.angular.z = -1 * self.z_speed
else:
self.vel.angular.z = self.z_speed
self.cmd_move_pub.publish(self.vel)
return False
def turn(self, goal_angular):
# ???????????????,??????????????
# ????,????
rospy.loginfo('[robot_move_pkg]->move_in_robot.turn will turn %s'%goal_angular)
current_angular = start_angular = self.robot_state.get_robot_current_w()#??????????
r = rospy.Rate(100)
delta_angular = current_angular - start_angular
move_velocity = g_msgs.Twist()
while not rospy.is_shutdown() :
if abs(delta_angular - abs(goal_angular)) < self.turn_move_stop_tolerance:
break
current_angular = self.robot_state.get_robot_current_w()
move_velocity.angular.z = math.copysign(self.w_speed, goal_angular)
delta_angular += abs(abs(current_angular) - abs(start_angular) )
start_angular = current_angular
self.cmd_move_pub.publish(move_velocity) #???????????
r.sleep()
self.brake()
def assertFloatsAreIdentical(self, x, y):
"""assert that floats x and y are identical, in the sense that:
(1) both x and y are nans, or
(2) both x and y are infinities, with the same sign, or
(3) both x and y are zeros, with the same sign, or
(4) x and y are both finite and nonzero, and x == y
"""
msg = 'floats {!r} and {!r} are not identical'
if isnan(x) or isnan(y):
if isnan(x) and isnan(y):
return
elif x == y:
if x != 0.0:
return
# both zero; check that signs match
elif copysign(1.0, x) == copysign(1.0, y):
return
else:
msg += ': zeros have different signs'
self.fail(msg.format(x, y))
def assertFloatsAreIdentical(self, x, y):
"""assert that floats x and y are identical, in the sense that:
(1) both x and y are nans, or
(2) both x and y are infinities, with the same sign, or
(3) both x and y are zeros, with the same sign, or
(4) x and y are both finite and nonzero, and x == y
"""
msg = 'floats {!r} and {!r} are not identical'
if isnan(x) or isnan(y):
if isnan(x) and isnan(y):
return
elif x == y:
if x != 0.0:
return
# both zero; check that signs match
elif copysign(1.0, x) == copysign(1.0, y):
return
else:
msg += ': zeros have different signs'
self.fail(msg.format(x, y))
def _drive_to_1(
self,
speed: int,
pos_x: float,
pos_y: float
) -> None:
diff_x = pos_x - self._pos_x
diff_y = pos_y - self._pos_y
if abs(diff_x) > abs(diff_y):
direct = math.degrees(math.atan(diff_y/diff_x))
else:
fract = diff_x / diff_y
sign = math.copysign(1, fract)
direct = sign * 90 - math.degrees(math.atan(fract))
if diff_x < 0:
direct += 180
self._rotate_to(speed, direct)
def process_transactions(self, trade_event, current_orders):
for order, txn in self.transact(trade_event, current_orders):
if txn.type == zp.DATASOURCE_TYPE.COMMISSION:
order.commission = (order.commission or 0.0) + txn.cost
else:
if txn.amount == 0:
raise zipline.errors.TransactionWithNoAmount(txn=txn)
if math.copysign(1, txn.amount) != order.direction:
raise zipline.errors.TransactionWithWrongDirection(
txn=txn, order=order)
if abs(txn.amount) > abs(self.orders[txn.order_id].amount):
raise zipline.errors.TransactionVolumeExceedsOrder(
txn=txn, order=order)
order.filled += txn.amount
if txn.commission is not None:
order.commission = ((order.commission or 0.0) +
txn.commission)
# mark the date of the order to match the transaction
# that is filling it.
order.dt = txn.dt
yield txn, order
def drain_asset(positions, asset, quantity):
"""
Generic method of reducing the quantity of assets across an entire set of positions
This is hard to do manually because positions can duplicates and arbitrary quantities
Traverse the entire position set reducing quantities to zero until it hits the target reduction
"""
remaining_quantity = quantity
# get a list of positions that are opposite to the quantity we are draining
positions = [_ for _ in positions if _.asset == asset and copysign(1,_.quantity) == copysign(1, quantity * -1)]
for position in positions:
if abs(remaining_quantity) <= abs(position.quantity):
# there are enough quantity in this position to complete it
position.quantity += remaining_quantity
remaining_quantity = 0
return remaining_quantity
if abs(remaining_quantity) > abs(position.quantity):
# we are going to have some left over
remaining_quantity += position.quantity
position.quantity = 0
return remaining_quantity
def estimate(self, quote:Quote, quantity:int = None):
# quantity is only used for direction
quantity = copysign(1, quantity)
if quote.bid is None or quote.ask is None or quote.bid == 0.0 or quote.ask == 0.0:
raise Exception(
'SlippageEstimator.estimate: Cannot estimate a price if the bid or ask are None or 0.0')
if quantity is None or quantity == 0:
raise Exception(
'SlippageEstimator.estimate: Must provide a signed quantity to buy or sell.')
spread = (quote.ask - quote.bid) / 2
midpoint = quote.bid + spread
if quantity > 0:
return midpoint - spread * self.slippage
else:
return midpoint + spread * self.slippage
def assertFloatIdentical(self, x, y):
"""Fail unless floats x and y are identical, in the sense that:
(1) both x and y are nans, or
(2) both x and y are infinities, with the same sign, or
(3) both x and y are zeros, with the same sign, or
(4) x and y are both finite and nonzero, and x == y
"""
msg = 'floats {!r} and {!r} are not identical'
if math.isnan(x) or math.isnan(y):
if math.isnan(x) and math.isnan(y):
return
elif x == y:
if x != 0.0:
return
# both zero; check that signs match
elif math.copysign(1.0, x) == math.copysign(1.0, y):
return
else:
msg += ': zeros have different signs'
self.fail(msg.format(x, y))
def _check_battery_state(_battery_acpi_path):
"""
@return LaptopChargeStatus
"""
o = slerp(_battery_acpi_path+'/state')
batt_info = yaml.load(o)
rv = LaptopChargeStatus()
state = batt_info.get('charging state', 'discharging')
rv.charge_state = state_to_val.get(state, 0)
rv.rate = _strip_A(batt_info.get('present rate', '-1 mA'))
if rv.charge_state == LaptopChargeStatus.DISCHARGING:
rv.rate = math.copysign(rv.rate, -1) # Need to set discharging rate to negative
rv.charge = _strip_Ah(batt_info.get('remaining capacity', '-1 mAh'))
rv.voltage = _strip_V(batt_info.get('present voltage', '-1 mV'))
rv.present = batt_info.get('present', False)
rv.header.stamp = rospy.get_rostime()
return rv
def _simulateTemps(self, delta=1):
timeDiff = self.lastTempAt - time.time()
self.lastTempAt = time.time()
for i in range(len(self.temp)):
if abs(self.temp[i] - self.targetTemp[i]) > delta:
oldVal = self.temp[i]
self.temp[i] += math.copysign(timeDiff * 10, self.targetTemp[i] - self.temp[i])
if math.copysign(1, self.targetTemp[i] - oldVal) != math.copysign(1, self.targetTemp[i] - self.temp[i]):
self.temp[i] = self.targetTemp[i]
if self.temp[i] < 0:
self.temp[i] = 0
if abs(self.bedTemp - self.bedTargetTemp) > delta:
oldVal = self.bedTemp
self.bedTemp += math.copysign(timeDiff * 10, self.bedTargetTemp - self.bedTemp)
if math.copysign(1, self.bedTargetTemp - oldVal) != math.copysign(1, self.bedTargetTemp - self.bedTemp):
self.bedTemp = self.bedTargetTemp
if self.bedTemp < 0:
self.bedTemp = 0
def _simulateTemps(self, delta=1):
timeDiff = self.lastTempAt - time.time()
self.lastTempAt = time.time()
for i in range(len(self.temp)):
if abs(self.temp[i] - self.targetTemp[i]) > delta:
oldVal = self.temp[i]
self.temp[i] += math.copysign(timeDiff * 10, self.targetTemp[i] - self.temp[i])
if math.copysign(1, self.targetTemp[i] - oldVal) != math.copysign(1, self.targetTemp[i] - self.temp[i]):
self.temp[i] = self.targetTemp[i]
if self.temp[i] < self.ambientTemp:
self.temp[i] = self.ambientTemp
if abs(self.bedTemp - self.bedTargetTemp) > delta:
oldVal = self.bedTemp
self.bedTemp += math.copysign(timeDiff * 10, self.bedTargetTemp - self.bedTemp)
if math.copysign(1, self.bedTargetTemp - oldVal) != math.copysign(1, self.bedTargetTemp - self.bedTemp):
self.bedTemp = self.bedTargetTemp
if self.bedTemp < self.ambientTemp:
self.bedTemp = self.ambientTemp
def setf(self, value):
"""store /value/ into a binary format"""
exponentbias = (2**self.components[1])/2 - 1
m,e = math.frexp(value)
# extract components from value
s = math.copysign(1.0, m)
m = abs(m)
# convert to integrals
sf = 1 if s < 0 else 0
exponent = e + exponentbias - 1
m = m*2.0 - 1.0 # remove the implicit bit
mantissa = int(m * (2**self.components[2]))
# store components
result = bitmap.zero
result = bitmap.push( result, bitmap.new(sf,self.components[0]) )
result = bitmap.push( result, bitmap.new(exponent,self.components[1]) )
result = bitmap.push( result, bitmap.new(mantissa,self.components[2]) )
return self.__setvalue__( result[0] )
def getf(self):
"""convert the stored floating-point number into a python native float"""
exponentbias = (2**self.components[1])/2 - 1
res = bitmap.new( self.__getvalue__(), sum(self.components) )
# extract components
res,sign = bitmap.shift(res, self.components[0])
res,exponent = bitmap.shift(res, self.components[1])
res,mantissa = bitmap.shift(res, self.components[2])
if exponent > 0 and exponent < (2**self.components[2]-1):
# convert to float
s = -1 if sign else +1
e = exponent - exponentbias
m = 1.0 + (float(mantissa) / 2**self.components[2])
# done
return math.ldexp( math.copysign(m,s), e)
# FIXME: this should return NaN or something
Log.warn('float_t.getf : {:s} : Invalid exponent value : {:d}'.format(self.instance(), exponent))
return 0.0
def assertFloatsAreIdentical(self, x, y):
"""assert that floats x and y are identical, in the sense that:
(1) both x and y are nans, or
(2) both x and y are infinities, with the same sign, or
(3) both x and y are zeros, with the same sign, or
(4) x and y are both finite and nonzero, and x == y
"""
msg = 'floats {!r} and {!r} are not identical'
if isnan(x) or isnan(y):
if isnan(x) and isnan(y):
return
elif x == y:
if x != 0.0:
return
# both zero; check that signs match
elif copysign(1.0, x) == copysign(1.0, y):
return
else:
msg += ': zeros have different signs'
self.fail(msg.format(x, y))
def rect(r, phi):
_rect_special = [
[inf+nanj, None, -inf, complex(-float("inf"), -0.0), None, inf+nanj, inf+nanj],
[nan+nanj, None, None, None, None, nan+nanj, nan+nanj],
[0, None, complex(-0.0, 0.0), complex(-0.0, -0.0), None, 0, 0],
[0, None, complex(0.0, -0.0), 0, None, 0, 0],
[nan+nanj, None, None, None, None, nan+nanj, nan+nanj],
[inf+nanj, None, complex(float("inf"), -0.0), inf, None, inf+nanj, inf+nanj],
[nan+nanj, nan+nanj, nan, nan, nan+nanj, nan+nanj, nan+nanj]
]
if not math.isfinite(r) or not math.isfinite(phi):
if math.isinf(phi) and not math.isnan(r) and r != 0:
raise ValueError
if math.isinf(r) and math.isfinite(phi) and phi != 0:
if r > 0:
return complex(math.copysign(inf, math.cos(phi)),
math.copysign(inf, math.sin(phi)))
return complex(-math.copysign(inf, math.cos(phi)),
-math.copysign(inf, math.sin(phi)))
return _rect_special[_special_type(r)][_special_type(phi)]
return complex(r*math.cos(phi), r*math.sin(phi))
def assertFloatsAreIdentical(self, x, y):
"""assert that floats x and y are identical, in the sense that:
(1) both x and y are nans, or
(2) both x and y are infinities, with the same sign, or
(3) both x and y are zeros, with the same sign, or
(4) x and y are both finite and nonzero, and x == y
"""
msg = 'floats {!r} and {!r} are not identical'
if isnan(x) or isnan(y):
if isnan(x) and isnan(y):
return
elif x == y:
if x != 0.0:
return
# both zero; check that signs match
elif copysign(1.0, x) == copysign(1.0, y):
return
else:
msg += ': zeros have different signs'
self.fail(msg.format(x, y))
def assertFloatIdentical(self, x, y):
"""Fail unless floats x and y are identical, in the sense that:
(1) both x and y are nans, or
(2) both x and y are infinities, with the same sign, or
(3) both x and y are zeros, with the same sign, or
(4) x and y are both finite and nonzero, and x == y
"""
msg = 'floats {!r} and {!r} are not identical'
if math.isnan(x) or math.isnan(y):
if math.isnan(x) and math.isnan(y):
return
elif x == y:
if x != 0.0:
return
# both zero; check that signs match
elif math.copysign(1.0, x) == math.copysign(1.0, y):
return
else:
msg += ': zeros have different signs'
self.fail(msg.format(x, y))
def assertFloatsAreIdentical(self, x, y):
"""assert that floats x and y are identical, in the sense that:
(1) both x and y are nans, or
(2) both x and y are infinities, with the same sign, or
(3) both x and y are zeros, with the same sign, or
(4) x and y are both finite and nonzero, and x == y
"""
msg = 'floats {!r} and {!r} are not identical'
if isnan(x) or isnan(y):
if isnan(x) and isnan(y):
return
elif x == y:
if x != 0.0:
return
# both zero; check that signs match
elif copysign(1.0, x) == copysign(1.0, y):
return
else:
msg += ': zeros have different signs'
self.fail(msg.format(x, y))
def segment_points(start_pose, curvature, length, delta_length):
"""Return points of segment, at delta_length intervals."""
l = 0.0
delta_length = copysign(delta_length, length)
points = []
while abs(l) < abs(length):
points.append(CurveSegment.end_pose(start_pose, curvature, l)[0:2])
l += delta_length
return points
# --------------------------------------------------------------------------
# Exploration of car's kinematic state space.
# --------------------------------------------------------------------------
# Allowed movements. These are given as tuples: (curvature, length).
# The list contains forward and backward movements.
def assertFloatIdentical(self, x, y):
"""Fail unless floats x and y are identical, in the sense that:
(1) both x and y are nans, or
(2) both x and y are infinities, with the same sign, or
(3) both x and y are zeros, with the same sign, or
(4) x and y are both finite and nonzero, and x == y
"""
msg = 'floats {!r} and {!r} are not identical'
if math.isnan(x) or math.isnan(y):
if math.isnan(x) and math.isnan(y):
return
elif x == y:
if x != 0.0:
return
# both zero; check that signs match
elif math.copysign(1.0, x) == math.copysign(1.0, y):
return
else:
msg += ': zeros have different signs'
self.fail(msg.format(x, y))
def ventogeostrofico(lat,GradientePressione,R):
if R==0.0 or GradientePressione==0.0:
tws=0
else:
omega=2*math.pi/(24*3600)#rotazione terrestre
f=2*omega*math.sin(lat)#parametro di coriolis
densitaaria=1.2#kgm/mc vale per T=20gradi
R=R*1853.0#raggio di curvatura isobare espressa in m
GradientePressione=GradientePressione*100.0/1853.0#Gradiente espresso in Pa/m
segno=GradientePressione/math.copysign(GradientePressione,1)
a=segno/R
b=-f
c=math.copysign(GradientePressione,1)/densitaaria
discriminante=(b**2-4*a*c)
if discriminante >0:
tws=(-b-discriminante**0.5)/(2*a)
tws=tws*3600.0/1853.0#converte tws in knts
else:
tws=0.0#caso del centro di alta pressione
return tws
def ventogeostroficoCentroAzione(PCentroAzione,LatCentroAzione,LonCentroAzione,extension,lat,lon):
#campo di pressione: P=0.5*(PCentro-1013)*cos(pi/extension*r)+(PCentro+1013)*0.5, per r<=extension, P=1013 per r >extension
#r distanza dal centro d'azione e raggio curvatura isobara
losso=lossodromica(LatCentroAzione,LonCentroAzione,lat,lon)
r=losso[0]
brg=losso[1]
tws=0
twd=0
segno=0
exp=0
if r<extension:
if PCentroAzione>1013:
segno=1
else:
segno=-1
#GradientePressione=-0.5*(PCentroAzione-1013)*math.pi/extension*math.sin(math.pi/extension*r)
GradientePressione=0.5*(PCentroAzione-1013)*math.pi/extension*math.sin(math.pi/extension*r)
GradientePressione=math.copysign(GradientePressione,1)
tws=ventogeostrofico(LatCentroAzione,segno*GradientePressione,r)
if LatCentroAzione>0:
twd=riduci360(brg-segno*math.radians(100))#emisfero Nord
else:
twd=riduci360(brg+segno*math.radians(100))#emisfero Sud
return tws,twd
def interno(poligono,pto):
#poligono orientato in senso orario
somma=0
for i in range(0,len(poligono)-1):
v1=poligono[i]
v2=poligono[i+1]
rlv1=math.atan2((v1[1]-pto[1]),(v1[0]-pto[0]))
if rlv1<0:rlv1=2*math.pi+rlv1
rlv2=math.atan2((v2[1]-pto[1]),(v2[0]-pto[0]))
if rlv2<0:rlv2=2*math.pi+rlv2
angolo=math.copysign(rlv2-rlv1,1)
if angolo>math.pi:
angolo=2*math.pi-angolo
somma=somma+angolo
if somma<1.99*math.pi:
Interno=False
else:
Interno=True
return Interno
def loxodrome(latA, lonA, latB, lonB): # lossodromica
# Rhumb line navigation
# Takes two points on earth and returns:
# the distance and constant (true) bearing one would need to
# follow to reach it.
# Doesn't function near poles:
# but you shouldn't be sailing near the poles anyways!
# when latB = -90: math domain error log(0)
# when latA = -90 [zero divison error]
# when A==B returns (0.0,0.0)
# if latA == latB:
if math.copysign(latA-latB, 1) <= math.radians(0.1/3600.0):
q = math.cos(latA)
else:
Df = math.log(math.tan(latB/2+math.pi/4)
/ math.tan(latA/2+math.pi/4), math.e)
q = (latB-latA) * 1.0/Df
Distance = EARTH_RADIUS * ((latA-latB)**2+(q*(lonA-lonB))**2)**0.5
Heading = math.atan2(-q*(lonB-lonA), (latB-latA))
if Heading < 0:
Heading = Heading + 2.0 * math.pi # atan2:[-pi,pi]
return Distance, Heading
def print_lat(lat): # def stampalat(lat):
# returns a string in the format xxDegrees,xxMinutes, N/S
lat_decimal = math.copysign(math.degrees(lat), 1) # latdecimali
lat_degree = int(lat_decimal) # latgradi
lat_minute = (lat_decimal - lat_degree) * 60 # latprimi
if lat_minute > 59.51:
lat_degree = lat_degree + 1
lat_minute = 0
else:
if lat_minute - int(lat_minute) > 0.51:
lat_minute = int(lat_minute) + 1
else:
lat_minute = int(lat_minute)
if lat > 0:
hemisphere = "N" # segno
else:
hemisphere = "S"
gradi = "%2d" % lat_degree # gradi
primi = "%2d" % lat_minute # primi
lat = (gradi.replace(" ", "0") + u"°" + primi.replace(" ", "0")
+ "'" + hemisphere)
return lat
def print_lon(lon): # def stampalon(lon):
# returns a string in the format xxDegrees,xxMinutes, N/S
lon_decimal = math.copysign(math.degrees(lon), 1) # londecimali
lon_degree = int(lon_decimal) # longradi
lon_minutes = (lon_decimal - lon_degree) * 60 # lonprimi
if lon_minutes > 59.51:
lon_degree = lon_degree + 1
lon_minutes = 0
else:
if lon_minutes - int(lon_minutes) > 0.51:
lon_minutes = int(lon_minutes) + 1
else:
lon_minutes = int(lon_minutes)
if lon > 0:
hemisphere = "W"
else:
hemisphere = "E"
degrees = "%3d" % lon_degree # gradi
minutes = "%2d" % lon_minutes
lon = (degrees.replace(" ", "0") + u"°" + minutes.replace(" ", "0")
+ "'" + hemisphere)
return lon
def ventogeostrofico(lat,GradientePressione,R):
if R==0.0 or GradientePressione==0.0:
tws=0
else:
omega=2*math.pi/(24*3600)#rotazione terrestre
f=2*omega*math.sin(lat)#parametro di coriolis
densitaaria=1.2#kgm/mc vale per T=20gradi
R=R*1853.0#raggio di curvatura isobare espressa in m
GradientePressione=GradientePressione*100.0/1853.0#Gradiente espresso in Pa/m
segno=GradientePressione/math.copysign(GradientePressione,1)
a=segno/R
b=-f
c=math.copysign(GradientePressione,1)/densitaaria
discriminante=(b**2-4*a*c)
if discriminante >0:
tws=(-b-discriminante**0.5)/(2*a)
tws=tws*3600.0/1853.0#converte tws in knts
else:
tws=0.0#caso del centro di alta pressione
return tws
def ventogeostroficoCentroAzione(PCentroAzione,LatCentroAzione,LonCentroAzione,extension,lat,lon):
#campo di pressione: P=0.5*(PCentro-1013)*cos(pi/extension*r)+(PCentro+1013)*0.5, per r<=extension, P=1013 per r >extension
#r distanza dal centro d'azione e raggio curvatura isobara
losso=lossodromica(LatCentroAzione,LonCentroAzione,lat,lon)
r=losso[0]
brg=losso[1]
tws=0
twd=0
segno=0
exp=0
if r<extension:
if PCentroAzione>1013:
segno=1
else:
segno=-1
#GradientePressione=-0.5*(PCentroAzione-1013)*math.pi/extension*math.sin(math.pi/extension*r)
GradientePressione=0.5*(PCentroAzione-1013)*math.pi/extension*math.sin(math.pi/extension*r)
GradientePressione=math.copysign(GradientePressione,1)
tws=ventogeostrofico(LatCentroAzione,segno*GradientePressione,r)
if LatCentroAzione>0:
twd=riduci360(brg-segno*math.radians(100))#emisfero Nord
else:
twd=riduci360(brg+segno*math.radians(100))#emisfero Sud
return tws,twd
def interno(poligono,pto):
#poligono orientato in senso orario
somma=0
for i in range(0,len(poligono)-1):
v1=poligono[i]
v2=poligono[i+1]
rlv1=math.atan2((v1[1]-pto[1]),(v1[0]-pto[0]))
if rlv1<0:rlv1=2*math.pi+rlv1
rlv2=math.atan2((v2[1]-pto[1]),(v2[0]-pto[0]))
if rlv2<0:rlv2=2*math.pi+rlv2
angolo=math.copysign(rlv2-rlv1,1)
if angolo>math.pi:
angolo=2*math.pi-angolo
somma=somma+angolo
if somma<1.99*math.pi:
Interno=False
else:
Interno=True
return Interno
def calcolaventogeostrofico(lat,GradientePressione,R):
omega=2*math.pi/(24*3600)#rotazione terrestre
f=2*omega*math.sin(lat)#parametro di coriolis
densitaaria=1.2#kgm/mc vale per T=20gradi
R=R*1853.0#curvatura isobare espressa in m
GradientePressione=GradientePressione*100.0/1853.0#Gradiente espresso in Pa/m
segno=GradientePressione/math.copysign(GradientePressione,1)
a=segno/R
b=-f
c=math.copysign(GradientePressione,1)/densitaaria
discriminante=(b**2-4*a*c)
if discriminante >0:
tws=(-b-discriminante**0.5)/(2*a)
tws=tws*3600.0/1853.0#converte tws in knts
else:
tws=0.0#caso del centro di alta pressione
return tws
def interno(poligono,pto):
#poligono orientato in senso orario
somma=0
for i in range(0,len(poligono)-1):
v1=poligono[i]
v2=poligono[i+1]
rlv1=math.atan2((v1[1]-pto[1]),(v1[0]-pto[0]))
if rlv1<0:rlv1=2*math.pi+rlv1
rlv2=math.atan2((v2[1]-pto[1]),(v2[0]-pto[0]))
if rlv2<0:rlv2=2*math.pi+rlv2
angolo=math.copysign(rlv2-rlv1,1)
if angolo>math.pi:
angolo=2*math.pi-angolo
somma=somma+angolo
if somma<2*math.pi:
Interno=False
else:
Interno=True
return Interno
def get_cubic_root(self):
# We have the equation x^2 D^2 + (1-x)^4 * C / h_min^2
# where x = sqrt(mu).
# We substitute x, which is sqrt(mu), with x = y + 1.
# It gives y^3 + py = q
# where p = (D^2 h_min^2)/(2*C) and q = -p.
# We use the Vieta's substution to compute the root.
# There is only one real solution y (which is in [0, 1] ).
# http://mathworld.wolfram.com/VietasSubstitution.html
# eps in the numerator is to prevent momentum = 1 in case of zero gradient
if np.isnan(self._dist_to_opt) or np.isnan(self._h_min) or np.isnan(self._grad_var) \
or np.isinf(self._dist_to_opt) or np.isinf(self._h_min) or np.isinf(self._grad_var):
logging.warning("Input to cubic solver has invalid nan/inf value!")
raise Exception("Input to cubic solver has invalid nan/inf value!")
p = (self._dist_to_opt + eps)**2 * (self._h_min + eps)**2 / 2 / (self._grad_var + eps)
w3 = (-math.sqrt(p**2 + 4.0 / 27.0 * p**3) - p) / 2.0
w = math.copysign(1.0, w3) * math.pow(math.fabs(w3), 1.0/3.0)
y = w - p / 3.0 / (w + eps)
x = y + 1
if self._verbose:
logging.debug("p %f, denominator %f", p, self._grad_var + eps)
logging.debug("w3 %f ", w3)
logging.debug("y %f, denominator %f", y, w + eps)
if np.isnan(x) or np.isinf(x):
logging.warning("Output from cubic is invalid nan/inf value!")
raise Exception("Output from cubic is invalid nan/inf value!")
return x
def transact_stub(slippage, commission, event, open_orders):
"""
This is intended to be wrapped in a partial, so that the
slippage and commission models can be enclosed.
"""
for order, transaction in slippage(event, open_orders):
if transaction and transaction.amount != 0:
direction = math.copysign(1, transaction.amount)
per_share, total_commission = commission.calculate(transaction)
transaction.price += per_share * direction
transaction.commission = total_commission
yield order, transaction
def update(self, txn):
if self.sid != txn.sid:
raise Exception('updating position with txn for a '
'different sid')
total_shares = self.amount + txn.amount
if total_shares == 0:
self.cost_basis = 0.0
else:
prev_direction = copysign(1, self.amount)
txn_direction = copysign(1, txn.amount)
if prev_direction != txn_direction:
# we're covering a short or closing a position
if abs(txn.amount) > abs(self.amount):
# we've closed the position and gone short
# or covered the short position and gone long
self.cost_basis = txn.price
else:
prev_cost = self.cost_basis * self.amount
txn_cost = txn.amount * txn.price
total_cost = prev_cost + txn_cost
self.cost_basis = total_cost / total_shares
# Update the last sale price if txn is
# best data we have so far
if self.last_sale_date is None or txn.dt > self.last_sale_date:
self.last_sale_price = txn.price
self.last_sale_date = txn.dt
self.amount = total_shares
def __init__(self, dt, sid, amount, stop=None, limit=None, filled=0,
commission=None, id=None):
"""
@dt - datetime.datetime that the order was placed
@sid - stock sid of the order
@amount - the number of shares to buy/sell
a positive sign indicates a buy
a negative sign indicates a sell
@filled - how many shares of the order have been filled so far
"""
# get a string representation of the uuid.
self.id = id or self.make_id()
self.dt = dt
self.reason = None
self.created = dt
self.sid = sid
self.amount = amount
self.filled = filled
self.commission = commission
self._status = ORDER_STATUS.OPEN
self.stop = stop
self.limit = limit
self.stop_reached = False
self.limit_reached = False
self.direction = math.copysign(1, self.amount)
self.type = zp.DATASOURCE_TYPE.ORDER
def from_float(cls, f):
"""Converts a float to a decimal number, exactly.
Note that Decimal.from_float(0.1) is not the same as Decimal('0.1').
Since 0.1 is not exactly representable in binary floating point, the
value is stored as the nearest representable value which is
0x1.999999999999ap-4. The exact equivalent of the value in decimal
is 0.1000000000000000055511151231257827021181583404541015625.
>>> Decimal.from_float(0.1)
Decimal('0.1000000000000000055511151231257827021181583404541015625')
>>> Decimal.from_float(float('nan'))
Decimal('NaN')
>>> Decimal.from_float(float('inf'))
Decimal('Infinity')
>>> Decimal.from_float(-float('inf'))
Decimal('-Infinity')
>>> Decimal.from_float(-0.0)
Decimal('-0')
"""
if isinstance(f, (int, long)): # handle integer inputs
return cls(f)
if _math.isinf(f) or _math.isnan(f): # raises TypeError if not a float
return cls(repr(f))
if _math.copysign(1.0, f) == 1.0:
sign = 0
else:
sign = 1
n, d = abs(f).as_integer_ratio()
k = d.bit_length() - 1
result = _dec_from_triple(sign, str(n*5**k), -k)
if cls is Decimal:
return result
else:
return cls(result)
def __update(self):
area = self.area
if not area:
self.__zero()
return
# Center of mass
# https://en.wikipedia.org/wiki/Center_of_mass#A_continuous_volume
self.meanX = meanX = self.momentX / area
self.meanY = meanY = self.momentY / area
# Var(X) = E[X^2] - E[X]^2
self.varianceX = varianceX = self.momentXX / area - meanX**2
self.varianceY = varianceY = self.momentYY / area - meanY**2
self.stddevX = stddevX = math.copysign(abs(varianceX)**.5, varianceX)
self.stddevY = stddevY = math.copysign(abs(varianceY)**.5, varianceY)
# Covariance(X,Y) = ( E[X.Y] - E[X]E[Y] )
self.covariance = covariance = self.momentXY / area - meanX*meanY
# Correlation(X,Y) = Covariance(X,Y) / ( stddev(X) * stddev(Y) )
# https://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
correlation = covariance / (stddevX * stddevY)
self.correlation = correlation if abs(correlation) > 1e-3 else 0
slant = covariance / varianceY
self.slant = slant if abs(slant) > 1e-3 else 0
def copysign(x, y):
"""return x with the sign of y (missing from python 2.5.2)"""
if sys.version_info > (2, 6):
return math.copysign(x, y)
return math.fabs(x) * (-1 if y < 0 or (y == 0 and 1/y < 0) else 1)