Python operator 模块,eq() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用operator.eq()。
def __new__(cls, op, terms):
if isinstance(terms, list):
return object.__new__(cls, op, terms)
items = terms.items()
if len(items) == 1:
if isinstance(items[0][1], list):
return InOp("in", items[0])
else:
return EqOp("eq", items[0])
else:
acc = []
for lhs, rhs in items:
if rhs.json.startswith("["):
acc.append(InOp("in", [Variable(lhs), rhs]))
else:
acc.append(EqOp("eq", [Variable(lhs), rhs]))
return AndOp("and", acc)
def parse(self, data):
"""Parse operator and value from filter's data."""
val = data.get(self.fname, missing)
if not isinstance(val, dict):
val = self.field.deserialize(val)
request.filters[self.fname] = val
return (self.operators['$eq'], val),
ops = ()
request.filters[self.fname] = {}
for op, val in val.items():
if op not in self.operators:
continue
val = self.field.deserialize(val) if op not in self.list_ops else [self.field.deserialize(v) for v in val] # noqa
ops += (self.operators[op], val),
request.filters[self.fname][op] = val
return ops
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
possible.
"""
if (
isinstance(binary.left, expression.BindParameter)
and binary.operator == operator.eq
and not isinstance(binary.right, expression.BindParameter)
):
return self.process(
expression.BinaryExpression(binary.right,
binary.left,
binary.operator),
**kwargs)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
def _validate_query_state(self):
for attr, methname, notset, op in (
('_limit', 'limit()', None, operator.is_),
('_offset', 'offset()', None, operator.is_),
('_order_by', 'order_by()', False, operator.is_),
('_group_by', 'group_by()', False, operator.is_),
('_distinct', 'distinct()', False, operator.is_),
(
'_from_obj',
'join(), outerjoin(), select_from(), or from_self()',
(), operator.eq)
):
if not op(getattr(self.query, attr), notset):
raise sa_exc.InvalidRequestError(
"Can't call Query.update() or Query.delete() "
"when %s has been called" %
(methname, )
)
def get_op(cls, op):
ops = {
symbol.test: cls.test,
symbol.and_test: cls.and_test,
symbol.atom: cls.atom,
symbol.comparison: cls.comparison,
'not in': lambda x, y: x not in y,
'in': lambda x, y: x in y,
'==': operator.eq,
'!=': operator.ne,
'<': operator.lt,
'>': operator.gt,
'<=': operator.le,
'>=': operator.ge,
}
if hasattr(symbol, 'or_test'):
ops[symbol.or_test] = cls.test
return ops[op]
def get_op(cls, op):
ops = {
symbol.test: cls.test,
symbol.and_test: cls.and_test,
symbol.atom: cls.atom,
symbol.comparison: cls.comparison,
'not in': lambda x, y: x not in y,
'in': lambda x, y: x in y,
'==': operator.eq,
'!=': operator.ne,
'<': operator.lt,
'>': operator.gt,
'<=': operator.le,
'>=': operator.ge,
}
if hasattr(symbol, 'or_test'):
ops[symbol.or_test] = cls.test
return ops[op]
def wrap_transition(op, did_change):
"""Transform operator into a transition operator.
That is, an operator that
only returns true if the ``did_change`` operator also returns true.
Note:
E.g. ``wrap_transition(operator.eq, operator.ne)`` returns function
with signature ``(new_value, needle, old_value)`` and only returns
true if new_value is equal to needle, but old_value was not equal
to needle.
"""
def compare(new_value, needle, old_value):
return did_change(old_value, needle) and op(new_value, needle)
return compare
def get_op(cls, op):
ops = {
symbol.test: cls.test,
symbol.and_test: cls.and_test,
symbol.atom: cls.atom,
symbol.comparison: cls.comparison,
'not in': lambda x, y: x not in y,
'in': lambda x, y: x in y,
'==': operator.eq,
'!=': operator.ne,
'<': operator.lt,
'>': operator.gt,
'<=': operator.le,
'>=': operator.ge,
}
if hasattr(symbol, 'or_test'):
ops[symbol.or_test] = cls.test
return ops[op]
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
possible.
"""
if (
isinstance(binary.left, expression.BindParameter)
and binary.operator == operator.eq
and not isinstance(binary.right, expression.BindParameter)
):
return self.process(
expression.BinaryExpression(binary.right,
binary.left,
binary.operator),
**kwargs)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
def _validate_query_state(self):
for attr, methname, notset, op in (
('_limit', 'limit()', None, operator.is_),
('_offset', 'offset()', None, operator.is_),
('_order_by', 'order_by()', False, operator.is_),
('_group_by', 'group_by()', False, operator.is_),
('_distinct', 'distinct()', False, operator.is_),
(
'_from_obj',
'join(), outerjoin(), select_from(), or from_self()',
(), operator.eq)
):
if not op(getattr(self.query, attr), notset):
raise sa_exc.InvalidRequestError(
"Can't call Query.update() or Query.delete() "
"when %s has been called" %
(methname, )
)
def test_richcompare_crash(self):
# gh-4613
import operator as op
# dummy class where __array__ throws exception
class Foo(object):
__array_priority__ = 1002
def __array__(self,*args,**kwargs):
raise Exception()
rhs = Foo()
lhs = np.array(1)
for f in [op.lt, op.le, op.gt, op.ge]:
if sys.version_info[0] >= 3:
assert_raises(TypeError, f, lhs, rhs)
else:
f(lhs, rhs)
assert_(not op.eq(lhs, rhs))
assert_(op.ne(lhs, rhs))
def main(name,typeid):
client = APIClient()
paramDict = {}
result = ''
act = 'upload'
if operator.eq(act, 'upload') == 1:
paramDict['username'] = 'sorano123'
paramDict['password'] = 'sorano10032'
paramDict['typeid'] = typeid
paramDict['timeout'] = '60'
paramDict['softid'] = '1'
paramDict['softkey'] = 'b40ffbee5c1cf4e38028c197eb2fc751'
paramKeys = ['username','password','typeid','timeout','softid','softkey']
from PIL import Image
img = Image.open('Captchas\\' + name)
if img is None:
print ('get file error!')
img.save("upload.gif", format="gif")
filebytes = open("upload.gif", "rb").read()
result = client.http_upload_image("http://api.ruokuai.com/create.xml", paramKeys, paramDict, filebytes)
return result
def _rules_pass(obj, rules, compare=operator.eq):
for key, expected_value in rules.items():
if isinstance(expected_value, dict):
if key.endswith('NotEqual'):
nested_compare = operator.ne
key = key[:-len('NotEqual')]
else:
nested_compare = compare
nested_rules_passed = _rules_pass(
obj=obj.get(key) or {},
rules=expected_value,
compare=nested_compare,
)
if not nested_rules_passed:
return False
elif not compare(obj.get(key), expected_value):
return False
return True
def get_op(cls, op):
ops = {
symbol.test: cls.test,
symbol.and_test: cls.and_test,
symbol.atom: cls.atom,
symbol.comparison: cls.comparison,
'not in': lambda x, y: x not in y,
'in': lambda x, y: x in y,
'==': operator.eq,
'!=': operator.ne,
'<': operator.lt,
'>': operator.gt,
'<=': operator.le,
'>=': operator.ge,
}
if hasattr(symbol, 'or_test'):
ops[symbol.or_test] = cls.test
return ops[op]
def get_op(cls, op):
ops = {
symbol.test: cls.test,
symbol.and_test: cls.and_test,
symbol.atom: cls.atom,
symbol.comparison: cls.comparison,
'not in': lambda x, y: x not in y,
'in': lambda x, y: x in y,
'==': operator.eq,
'!=': operator.ne,
'<': operator.lt,
'>': operator.gt,
'<=': operator.le,
'>=': operator.ge,
}
if hasattr(symbol, 'or_test'):
ops[symbol.or_test] = cls.test
return ops[op]
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
possible.
"""
if (
isinstance(binary.left, expression.BindParameter)
and binary.operator == operator.eq
and not isinstance(binary.right, expression.BindParameter)
):
return self.process(
expression.BinaryExpression(binary.right,
binary.left,
binary.operator),
**kwargs)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
def test_encode(self):
self.assertEqual(AP("a").encode(42), ('a___42.0', ['(declare-const a___42.0 Bool)']))
self.assertEqual(AP("a").encode(), ('a___0.0', ['(declare-const a___0.0 Bool)']))
self.assertEqual(AP("a").encode(0.5), ('a___0.5', ['(declare-const a___0.5 Bool)']))
self.assertEqual(AP(None, (1, 2), operator.ge, 42, ('x', 'y')).encode(),
('(>= (+ (* 1.0 x___0.0) (* 2.0 y___0.0)) 42)',
['(declare-const x___0.0 Real)', '(declare-const y___0.0 Real)']))
self.assertEqual(AP(None, (1, 2), operator.ge, 42, ('x', 'y')).encode(42),
('(>= (+ (* 1.0 x___42.0) (* 2.0 y___42.0)) 42)',
['(declare-const x___42.0 Real)', '(declare-const y___42.0 Real)']))
self.assertEqual(AP(None, None, operator.ne, 42, ('x', 'y')).encode(),
('(distinct (+ x___0.0 y___0.0) 42)',
['(declare-const x___0.0 Real)', '(declare-const y___0.0 Real)']))
self.assertEqual(AP(None, None, operator.eq, 42, 'x').encode(),
('(= x___0.0 42)', ['(declare-const x___0.0 Real)']))
self.assertEqual(AP(None, (3,), gt, 42, 'x').encode(),
('(> (* 3.0 x___0.0) 42)', ['(declare-const x___0.0 Real)']))
self.assertEqual(AP(None, (3,), gt, 42, 'x').encode(0.5),
('(and (>= (* 3.0 x___0.0) 42) (and (> (* 3.0 x___0.5) 42) (>= (* 3.0 x___1.0) 42)))',
['(declare-const x___0.0 Real)', '(declare-const x___0.5 Real)',
'(declare-const x___1.0 Real)']))
def test_negationnormalform(self):
self.assertEqual(NOT(AP("a")).negationnormalform(), NOT(AP("a")))
self.assertEqual(NOT(NOT(AP("a"))).negationnormalform(), AP("a"))
self.assertEqual(NOT(NEXT(AP("a"))).negationnormalform(), NEXT(NOT(AP("a"))))
self.assertEqual(NOT(AP(None, None, operator.ge, 42, 'x')).negationnormalform(),
AP(None, None, operator.lt, 42, 'x'))
self.assertEqual(NOT(AP(None, None, operator.le, 42, 'x')).negationnormalform(),
AP(None, None, operator.gt, 42, 'x'))
self.assertEqual(NOT(AP(None, None, operator.gt, 42, 'x')).negationnormalform(),
AP(None, None, operator.le, 42, 'x'))
self.assertEqual(NOT(AP(None, None, operator.lt, 42, 'x')).negationnormalform(),
AP(None, None, operator.ge, 42, 'x'))
self.assertEqual(NOT(AP(None, None, operator.eq, 42, 'x')).negationnormalform(),
AP(None, None, operator.ne, 42, 'x'))
self.assertEqual(NOT(AP(None, None, operator.ne, 42, 'x')).negationnormalform(),
AP(None, None, operator.eq, 42, 'x'))
self.assertEqual(NOT(AND(AP("a"), AP("b"))).negationnormalform(), OR(NOT(AP("a")), NOT(AP("b"))))
self.assertEqual(NOT(OR(AP("a"), AP("b"))).negationnormalform(), AND(NOT(AP("a")), NOT(AP("b"))))
self.assertEqual(NOT(IMPLIES(AP("a"), AP("b"))).negationnormalform(), AND(AP("a"), NOT(AP("b"))))
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
possible.
"""
if (
isinstance(binary.left, expression.BindParameter)
and binary.operator == operator.eq
and not isinstance(binary.right, expression.BindParameter)
):
return self.process(
expression.BinaryExpression(binary.right,
binary.left,
binary.operator),
**kwargs)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
def exact_content():
contend = ''
f_in = open(dat_dir, 'r', encoding='utf-8')
total = len(f_in.readlines())
f_in.close()
if os.path.exists(out):
os.remove(out)
f_out = open(out, 'w', encoding='utf-8')
with open(dat_dir, 'r', encoding='utf-8') as f_in:
for line in tqdm(f_in, total=total) :
# time.sleep(1)
x = '<content>'
y = line[0:min(len(x),len(line))]
if operator.eq(x, y) == True:
contend = line[len(x):len(line)-len(x)] + '\n'
f_out.write(contend)
# print(contends)
f_out.close()
def operator(self):
"""Supported Filter Operators
+ EQ - Equal To
+ NE - Not Equal To
+ GT - Greater Than
+ GE - Greater Than or Equal To
+ LT - Less Than
+ LE - Less Than or Equal To
+ SW - Starts With
+ IN - In String or Array
+ NI - Not in String or Array
"""
return {
'EQ': operator.eq,
'NE': operator.ne,
'GT': operator.gt,
'GE': operator.ge,
'LT': operator.lt,
'LE': operator.le,
'SW': self._starts_with,
'IN': self._in,
'NI': self._ni # not in
}
def test_constructor(self):
eq = self.assertEqual
td = timedelta
# Check keyword args to constructor
eq(td(), td(weeks=0, days=0, hours=0, minutes=0, seconds=0,
milliseconds=0, microseconds=0))
eq(td(1), td(days=1))
eq(td(0, 1), td(seconds=1))
eq(td(0, 0, 1), td(microseconds=1))
eq(td(weeks=1), td(days=7))
eq(td(days=1), td(hours=24))
eq(td(hours=1), td(minutes=60))
eq(td(minutes=1), td(seconds=60))
eq(td(seconds=1), td(milliseconds=1000))
eq(td(milliseconds=1), td(microseconds=1000))
# Check float args to constructor
eq(td(weeks=1.0/7), td(days=1))
eq(td(days=1.0/24), td(hours=1))
eq(td(hours=1.0/60), td(minutes=1))
eq(td(minutes=1.0/60), td(seconds=1))
eq(td(seconds=0.001), td(milliseconds=1))
eq(td(milliseconds=0.001), td(microseconds=1))
def test_str(self):
td = timedelta
eq = self.assertEqual
eq(str(td(1)), "1 day, 0:00:00")
eq(str(td(-1)), "-1 day, 0:00:00")
eq(str(td(2)), "2 days, 0:00:00")
eq(str(td(-2)), "-2 days, 0:00:00")
eq(str(td(hours=12, minutes=58, seconds=59)), "12:58:59")
eq(str(td(hours=2, minutes=3, seconds=4)), "2:03:04")
eq(str(td(weeks=-30, hours=23, minutes=12, seconds=34)),
"-210 days, 23:12:34")
eq(str(td(milliseconds=1)), "0:00:00.001000")
eq(str(td(microseconds=3)), "0:00:00.000003")
eq(str(td(days=999999999, hours=23, minutes=59, seconds=59,
microseconds=999999)),
"999999999 days, 23:59:59.999999")
def test_values(self):
# check all operators and all comparison results
self.checkvalue("lt", 0, 0, False)
self.checkvalue("le", 0, 0, True )
self.checkvalue("eq", 0, 0, True )
self.checkvalue("ne", 0, 0, False)
self.checkvalue("gt", 0, 0, False)
self.checkvalue("ge", 0, 0, True )
self.checkvalue("lt", 0, 1, True )
self.checkvalue("le", 0, 1, True )
self.checkvalue("eq", 0, 1, False)
self.checkvalue("ne", 0, 1, True )
self.checkvalue("gt", 0, 1, False)
self.checkvalue("ge", 0, 1, False)
self.checkvalue("lt", 1, 0, False)
self.checkvalue("le", 1, 0, False)
self.checkvalue("eq", 1, 0, False)
self.checkvalue("ne", 1, 0, True )
self.checkvalue("gt", 1, 0, True )
self.checkvalue("ge", 1, 0, True )
def test_richcompare(self):
self.assertIs(complex.__eq__(1+1j, 1<<10000), False)
self.assertIs(complex.__lt__(1+1j, None), NotImplemented)
self.assertIs(complex.__eq__(1+1j, 1+1j), True)
self.assertIs(complex.__eq__(1+1j, 2+2j), False)
self.assertIs(complex.__ne__(1+1j, 1+1j), False)
self.assertIs(complex.__ne__(1+1j, 2+2j), True)
for i in range(1, 100):
f = i / 100.0
self.assertIs(complex.__eq__(f+0j, f), True)
self.assertIs(complex.__ne__(f+0j, f), False)
self.assertIs(complex.__eq__(complex(f, f), f), False)
self.assertIs(complex.__ne__(complex(f, f), f), True)
self.assertIs(complex.__lt__(1+1j, 2+2j), NotImplemented)
self.assertIs(complex.__le__(1+1j, 2+2j), NotImplemented)
self.assertIs(complex.__gt__(1+1j, 2+2j), NotImplemented)
self.assertIs(complex.__ge__(1+1j, 2+2j), NotImplemented)
self.assertRaises(TypeError, operator.lt, 1+1j, 2+2j)
self.assertRaises(TypeError, operator.le, 1+1j, 2+2j)
self.assertRaises(TypeError, operator.gt, 1+1j, 2+2j)
self.assertRaises(TypeError, operator.ge, 1+1j, 2+2j)
self.assertIs(operator.eq(1+1j, 1+1j), True)
self.assertIs(operator.eq(1+1j, 2+2j), False)
self.assertIs(operator.ne(1+1j, 1+1j), False)
self.assertIs(operator.ne(1+1j, 2+2j), True)
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
possible.
"""
if (
isinstance(binary.left, expression.BindParameter)
and binary.operator == operator.eq
and not isinstance(binary.right, expression.BindParameter)
):
return self.process(
expression.BinaryExpression(binary.right,
binary.left,
binary.operator),
**kwargs)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
def _validate_query_state(self):
for attr, methname, notset, op in (
('_limit', 'limit()', None, operator.is_),
('_offset', 'offset()', None, operator.is_),
('_order_by', 'order_by()', False, operator.is_),
('_group_by', 'group_by()', False, operator.is_),
('_distinct', 'distinct()', False, operator.is_),
(
'_from_obj',
'join(), outerjoin(), select_from(), or from_self()',
(), operator.eq)
):
if not op(getattr(self.query, attr), notset):
raise sa_exc.InvalidRequestError(
"Can't call Query.update() or Query.delete() "
"when %s has been called" %
(methname, )
)
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
possible.
"""
if (
isinstance(binary.left, expression.BindParameter)
and binary.operator == operator.eq
and not isinstance(binary.right, expression.BindParameter)
):
return self.process(
expression.BinaryExpression(binary.right,
binary.left,
binary.operator),
**kwargs)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
def _validate_query_state(self):
for attr, methname, notset, op in (
('_limit', 'limit()', None, operator.is_),
('_offset', 'offset()', None, operator.is_),
('_order_by', 'order_by()', False, operator.is_),
('_group_by', 'group_by()', False, operator.is_),
('_distinct', 'distinct()', False, operator.is_),
(
'_from_obj',
'join(), outerjoin(), select_from(), or from_self()',
(), operator.eq)
):
if not op(getattr(self.query, attr), notset):
raise sa_exc.InvalidRequestError(
"Can't call Query.update() or Query.delete() "
"when %s has been called" %
(methname, )
)
def __figure_out_task_attribute_and_operator(criteria):
swap_operands = False
if '__' in criteria:
attribute, op = criteria.split('__')
op = get_operator_function(op)
else:
attribute, op = criteria, operator.eq
if not hasattr(Task, attribute):
if hasattr(Task, attribute + 's'):
op = operator.contains
attribute += 's'
else:
raise RuntimeError("Task doesn't have such attribute.")
elif op is operator.contains:
swap_operands = True
return attribute, op, swap_operands
def test_subtransformation():
subtransformation = Transformation(
Rule('*', lib.set_localname('pablo'))
)
transformation = Transformation(
lib.f(id, Ref('root')), lib.put_variable('source_id'),
subtransformation,
lib.f(id, Ref('root')), lib.put_variable('result_id'),
lib.debug_symbols('source_id', 'result_id'),
Rule(Not(If(Ref('source_id'), operator.eq, Ref('result_id'))),
(lib.debug_message('NO!'), lib.debug_symbols('root'),
lib.set_localname('neruda'), AbortRule))
)
doc = etree.fromstring('<augustus />')
assert etree.QName(doc).text == 'augustus'
result = transformation(doc)
assert result.tag == 'pablo'
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
possible.
"""
if (
isinstance(binary.left, expression.BindParameter)
and binary.operator == operator.eq
and not isinstance(binary.right, expression.BindParameter)
):
return self.process(
expression.BinaryExpression(binary.right,
binary.left,
binary.operator),
**kwargs)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
def test_richcompare_crash(self):
# gh-4613
import operator as op
# dummy class where __array__ throws exception
class Foo(object):
__array_priority__ = 1002
def __array__(self,*args,**kwargs):
raise Exception()
rhs = Foo()
lhs = np.array(1)
for f in [op.lt, op.le, op.gt, op.ge]:
if sys.version_info[0] >= 3:
assert_raises(TypeError, f, lhs, rhs)
else:
f(lhs, rhs)
assert_(not op.eq(lhs, rhs))
assert_(op.ne(lhs, rhs))
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
possible.
"""
if (
isinstance(binary.left, expression.BindParameter)
and binary.operator == operator.eq
and not isinstance(binary.right, expression.BindParameter)
):
return self.process(
expression.BinaryExpression(binary.right,
binary.left,
binary.operator),
**kwargs)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
def _validate_query_state(self):
for attr, methname, notset, op in (
('_limit', 'limit()', None, operator.is_),
('_offset', 'offset()', None, operator.is_),
('_order_by', 'order_by()', False, operator.is_),
('_group_by', 'group_by()', False, operator.is_),
('_distinct', 'distinct()', False, operator.is_),
(
'_from_obj',
'join(), outerjoin(), select_from(), or from_self()',
(), operator.eq)
):
if not op(getattr(self.query, attr), notset):
raise sa_exc.InvalidRequestError(
"Can't call Query.update() or Query.delete() "
"when %s has been called" %
(methname, )
)
def test_values(self):
# check all operators and all comparison results
self.checkvalue("lt", 0, 0, False)
self.checkvalue("le", 0, 0, True )
self.checkvalue("eq", 0, 0, True )
self.checkvalue("ne", 0, 0, False)
self.checkvalue("gt", 0, 0, False)
self.checkvalue("ge", 0, 0, True )
self.checkvalue("lt", 0, 1, True )
self.checkvalue("le", 0, 1, True )
self.checkvalue("eq", 0, 1, False)
self.checkvalue("ne", 0, 1, True )
self.checkvalue("gt", 0, 1, False)
self.checkvalue("ge", 0, 1, False)
self.checkvalue("lt", 1, 0, False)
self.checkvalue("le", 1, 0, False)
self.checkvalue("eq", 1, 0, False)
self.checkvalue("ne", 1, 0, True )
self.checkvalue("gt", 1, 0, True )
self.checkvalue("ge", 1, 0, True )
def test_values(self):
# check all operators and all comparison results
self.checkvalue("lt", 0, 0, False)
self.checkvalue("le", 0, 0, True )
self.checkvalue("eq", 0, 0, True )
self.checkvalue("ne", 0, 0, False)
self.checkvalue("gt", 0, 0, False)
self.checkvalue("ge", 0, 0, True )
self.checkvalue("lt", 0, 1, True )
self.checkvalue("le", 0, 1, True )
self.checkvalue("eq", 0, 1, False)
self.checkvalue("ne", 0, 1, True )
self.checkvalue("gt", 0, 1, False)
self.checkvalue("ge", 0, 1, False)
self.checkvalue("lt", 1, 0, False)
self.checkvalue("le", 1, 0, False)
self.checkvalue("eq", 1, 0, False)
self.checkvalue("ne", 1, 0, True )
self.checkvalue("gt", 1, 0, True )
self.checkvalue("ge", 1, 0, True )
def _match_subject_name(cert, subject_name, compare_func=operator.eq, alt_names=True):
names = []
if alt_names:
try:
alt_names = cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
names = alt_names.value.get_values_for_type(x509.DNSName)
except x509.extensions.ExtensionNotFound:
pass
if not names:
common_names = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
if common_names:
common_name = common_names[0]
names = [common_name.value]
if not any(compare_func(name, subject_name) for name in names):
if len(names) > 1:
raise InvalidCertificate("Subject name %r doesn't match either of %s" % (subject_name, ', '.join(map(repr, names))))
elif len(names) == 1:
raise InvalidCertificate("Subject name %r doesn't match %r" % (subject_name, names[0]))
else:
raise InvalidCertificate("No appropriate commonName or subjectAltName DNSName fields were found")
def __init__(self, parent, func, on_error=None, equal=None,
initial_value=undefined, *, tracker=None):
"""
:param tracker: an instance of :class:`~.tracker.Tracker` that is
managing the computing process
:param parent: a possible parent computation owning this
:param func: the function to be computed
:param on_error: an optional callback that will be called if an
error is raised during computation
:param equal: an optional equality comparison function to be used
instead of the default ``operator.eq``
:param initial_value: an optional initial value. By default it is a
marker value called ``undefined``, that will be replaced with the
first calculated value without generating any item.
"""
func = functools.partial(self._auto, func)
self._equal = equal or operator.eq
self.current_value = initial_value
self._tracker = tracker
self._init_next_value_container()
super().__init__(parent, func, on_error, tracker=tracker)
def __setattr__(self, name, new):
fields = super().__getattribute__('_fields')
if name in fields:
try:
old = super().__getattribute__(name)
except AttributeError:
old = undefined
res = super().__setattr__(name, new)
deps = super().__getattribute__('_deps')
if name in deps:
eq = super().__getattribute__('_field_eq')
if old is undefined or not eq(old, new):
deps[name].changed()
else:
res = super().__setattr__(name, new)
return res
def __init__(self, generator=undefined, initial_value=undefined,
equal=None, always_recompute=False, *, tracker=None):
super().__init__(tracker=tracker)
self._equal = equal or operator.eq
self._descriptor_initialized = False
self._single_value_initialized = False
if callable(generator):
# suppose it's used as a method decorator
self._generator = generator
self._value = initial_value
elif generator is not undefined:
self._generator = None
self._value = generator
else:
self._generator = None
self._value = initial_value
self._comp = None
self._always_recompute = always_recompute
def number_of_args(fn):
"""Return the number of positional arguments for a function, or None if the number is variable.
Looks inside any decorated functions."""
try:
if hasattr(fn, '__wrapped__'):
return number_of_args(fn.__wrapped__)
if any(p.kind == p.VAR_POSITIONAL for p in signature(fn).parameters.values()):
return None
else:
return sum(p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) for p in signature(fn).parameters.values())
except ValueError:
# signatures don't work for built-in operators, so check for a few explicitly
UNARY_OPS = [len, op.not_, op.truth, op.abs, op.index, op.inv, op.invert, op.neg, op.pos]
BINARY_OPS = [op.lt, op.le, op.gt, op.ge, op.eq, op.ne, op.is_, op.is_not, op.add, op.and_, op.floordiv, op.lshift, op.mod, op.mul, op.or_, op.pow, op.rshift, op.sub, op.truediv, op.xor, op.concat, op.contains, op.countOf, op.delitem, op.getitem, op.indexOf]
TERNARY_OPS = [op.setitem]
if fn in UNARY_OPS:
return 1
elif fn in BINARY_OPS:
return 2
elif fn in TERNARY_OPS:
return 3
else:
raise NotImplementedError("Bult-in operator {} not supported".format(fn))
def get_op(cls, op):
ops = {
symbol.test: cls.test,
symbol.and_test: cls.and_test,
symbol.atom: cls.atom,
symbol.comparison: cls.comparison,
'not in': lambda x, y: x not in y,
'in': lambda x, y: x in y,
'==': operator.eq,
'!=': operator.ne,
'<': operator.lt,
'>': operator.gt,
'<=': operator.le,
'>=': operator.ge,
}
if hasattr(symbol, 'or_test'):
ops[symbol.or_test] = cls.test
return ops[op]
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
possible.
"""
if (
isinstance(binary.left, expression.BindParameter)
and binary.operator == operator.eq
and not isinstance(binary.right, expression.BindParameter)
):
return self.process(
expression.BinaryExpression(binary.right,
binary.left,
binary.operator),
**kwargs)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
def _validate_query_state(self):
for attr, methname, notset, op in (
('_limit', 'limit()', None, operator.is_),
('_offset', 'offset()', None, operator.is_),
('_order_by', 'order_by()', False, operator.is_),
('_group_by', 'group_by()', False, operator.is_),
('_distinct', 'distinct()', False, operator.is_),
(
'_from_obj',
'join(), outerjoin(), select_from(), or from_self()',
(), operator.eq)
):
if not op(getattr(self.query, attr), notset):
raise sa_exc.InvalidRequestError(
"Can't call Query.update() or Query.delete() "
"when %s has been called" %
(methname, )
)
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
possible.
"""
if (
isinstance(binary.left, expression.BindParameter)
and binary.operator == operator.eq
and not isinstance(binary.right, expression.BindParameter)
):
return self.process(
expression.BinaryExpression(binary.right,
binary.left,
binary.operator),
**kwargs)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
def _validate_query_state(self):
for attr, methname, notset, op in (
('_limit', 'limit()', None, operator.is_),
('_offset', 'offset()', None, operator.is_),
('_order_by', 'order_by()', False, operator.is_),
('_group_by', 'group_by()', False, operator.is_),
('_distinct', 'distinct()', False, operator.is_),
(
'_from_obj',
'join(), outerjoin(), select_from(), or from_self()',
(), operator.eq)
):
if not op(getattr(self.query, attr), notset):
raise sa_exc.InvalidRequestError(
"Can't call Query.update() or Query.delete() "
"when %s has been called" %
(methname, )
)
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
possible.
"""
if (
isinstance(binary.left, expression.BindParameter)
and binary.operator == operator.eq
and not isinstance(binary.right, expression.BindParameter)
):
return self.process(
expression.BinaryExpression(binary.right,
binary.left,
binary.operator),
**kwargs)
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
def _validate_query_state(self):
for attr, methname, notset, op in (
('_limit', 'limit()', None, operator.is_),
('_offset', 'offset()', None, operator.is_),
('_order_by', 'order_by()', False, operator.is_),
('_group_by', 'group_by()', False, operator.is_),
('_distinct', 'distinct()', False, operator.is_),
(
'_from_obj',
'join(), outerjoin(), select_from(), or from_self()',
(), operator.eq)
):
if not op(getattr(self.query, attr), notset):
raise sa_exc.InvalidRequestError(
"Can't call Query.update() or Query.delete() "
"when %s has been called" %
(methname, )
)
def get_op(cls, op):
ops = {
symbol.test: cls.test,
symbol.and_test: cls.and_test,
symbol.atom: cls.atom,
symbol.comparison: cls.comparison,
'not in': lambda x, y: x not in y,
'in': lambda x, y: x in y,
'==': operator.eq,
'!=': operator.ne,
'<': operator.lt,
'>': operator.gt,
'<=': operator.le,
'>=': operator.ge,
}
if hasattr(symbol, 'or_test'):
ops[symbol.or_test] = cls.test
return ops[op]