Python types 模块,StringTypes() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用types.StringTypes()。
def _send_data_part(data, connection):
if isinstance(data, types.StringTypes):
connection.send(data)
return
# Check to see if data is a file-like object that has a read method.
elif hasattr(data, 'read'):
# Read the file and send it a chunk at a time.
while 1:
binarydata = data.read(100000)
if binarydata == '': break
connection.send(binarydata)
return
else:
# The data object was not a file.
# Try to convert to a string and send the data.
connection.send(str(data))
return
def _setTarget(self, address, prefix=None, filters=None):
"""Add (i.e. subscribe) a new OSCTarget, or change the prefix for an existing OSCTarget.
- address ((host, port) tuple): IP-address & UDP-port
- prefix (string): The OSC-address prefix prepended to the address of each OSCMessage
sent to this OSCTarget (optional)
"""
if address not in self.targets.keys():
self.targets[address] = ["",{}]
if prefix != None:
if len(prefix):
# make sure prefix starts with ONE '/', and does not end with '/'
prefix = '/' + prefix.strip('/')
self.targets[address][0] = prefix
if filters != None:
if type(filters) in types.StringTypes:
(_, filters) = parseFilterStr(filters)
elif type(filters) != types.DictType:
raise TypeError("'filters' argument must be a dict with {addr:bool} entries")
self._updateFilters(self.targets[address][1], filters)
def setOSCTarget(self, address, prefix=None, filters=None):
"""Add (i.e. subscribe) a new OSCTarget, or change the prefix for an existing OSCTarget.
the 'address' argument can be a ((host, port) tuple) : The target server address & UDP-port
or a 'host' (string) : The host will be looked-up
- prefix (string): The OSC-address prefix prepended to the address of each OSCMessage
sent to this OSCTarget (optional)
"""
if type(address) in types.StringTypes:
address = self._searchHostAddr(address)
elif (type(address) == types.TupleType):
(host, port) = address[:2]
try:
host = socket.gethostbyname(host)
except:
pass
address = (host, port)
else:
raise TypeError("'address' argument must be a (host, port) tuple or a 'host' string")
self._setTarget(address, prefix, filters)
def delOSCTarget(self, address, prefix=None):
"""Delete the specified OSCTarget from the Client's dict.
the 'address' argument can be a ((host, port) tuple), or a hostname.
If the 'prefix' argument is given, the Target is only deleted if the address and prefix match.
"""
if type(address) in types.StringTypes:
address = self._searchHostAddr(address)
if type(address) == types.TupleType:
(host, port) = address[:2]
try:
host = socket.gethostbyname(host)
except socket.error:
pass
address = (host, port)
self._delTarget(address, prefix)
def getOSCTarget(self, address):
"""Returns the OSCTarget matching the given address as a ((host, port), [prefix, filters]) tuple.
'address' can be a (host, port) tuple, or a 'host' (string), in which case the first matching OSCTarget is returned
Returns (None, ['',{}]) if address not found.
"""
if type(address) in types.StringTypes:
address = self._searchHostAddr(address)
if (type(address) == types.TupleType):
(host, port) = address[:2]
try:
host = socket.gethostbyname(host)
except socket.error:
pass
address = (host, port)
if (address in self.targets.keys()):
try:
(host, _, _) = socket.gethostbyaddr(host)
except socket.error:
pass
return ((host, port), self.targets[address])
return (None, ['',{}])
def _build_selector(self, selector, cmd):
if not isinstance(selector, StringTypes):
self._selector = selector(self)._selector
return self
_selector = selector
component = self
# ???
for name, _component in self.components.items():
if _selector.startswith(name):
_selector = _component.selector + _selector[len(name):]
component = _component(self)
break
if not self._selector:
if cmd:
self._selector = "$(NODE).%s('%s')" % (cmd, _selector)
else:
self._selector = "$('%s')" % _selector
else:
self._selector += ".%s('%s')" % (cmd, _selector)
# ???
return component
def getValueStrings( val, blnUgly=True ):
#Used by joinWithComma function to join list items for SQL queries.
#Expects to receive 'valid' types, as this was designed specifically for joining object attributes and nonvalid attributes were pulled.
#If the default blnUgly is set to false, then the nonvalid types are ignored and the output will be pretty, but the SQL Insert statement will
#probably be wrong.
tplStrings = (types.StringType, types.StringTypes )
tplNums = ( types.FloatType, types.IntType, types.LongType, types.BooleanType )
if isinstance( val, tplNums ):
return '#num#'+ str( val ) + '#num#'
elif isinstance( val, tplStrings ):
strDblQuote = '"'
return strDblQuote + val + strDblQuote
else:
if blnUgly == True:
return "Error: nonconvertable value passed - value type: %s" % type(val )
else:
return None
def scopes_to_string(scopes):
"""Converts scope value to a string.
If scopes is a string then it is simply passed through. If scopes is an
iterable then a string is returned that is all the individual scopes
concatenated with spaces.
Args:
scopes: string or iterable of strings, the scopes.
Returns:
The scopes formatted as a single string.
"""
if isinstance(scopes, types.StringTypes):
return scopes
else:
return ' '.join(scopes)
def is_ip4(ip):
if type(ip) not in types.StringTypes:
return False
ip = ip.split('.')
for s in ip:
if not s.isdigit():
return False
i = int(s)
if i < 0 or i > 255:
return False
return len(ip) == 4
def parse_lock_data(data_str):
"""
Parse string generated by lock_data()
"""
node_id, ip, process_id = (data_str.split('-') + ([None] * 3))[:3]
if type(process_id) in types.StringTypes and process_id.isdigit():
process_id = int(process_id)
else:
process_id = None
return {
'node_id': node_id,
'ip': ip,
'process_id': process_id,
}
def __call__(self, context):
if isinstance(self._object, types.StringTypes):
self._object = context[self._object]
# get dialect object
if isinstance(self._dialect, types.StringTypes):
dialect = csv.get_dialect(self._dialect)
if self._path.startswith("memory:"):
buffer_ = StringIO.StringIO()
self._write_object(buffer_, dialect)
buffer_.seek(0)
context[self._path[len("memory:"):]] = buffer_
else:
with open(self._path, "w") as f:
self._write_object(f, dialect)
return self._path
def validate_config(self, config):
"""????????????????"""
if self._validated:
return
for section in config.prefix("smtp_"):
smtp_config_items = config[section]
for rule in SMTPManager.config_rules:
item_value = smtp_config_items.get(rule.name)
rule.validate(item_value)
if item_value is None:
smtp_config_items[rule.name] = rule.default
if rule.name == "port" and isinstance(
item_value, types.StringTypes):
smtp_config_items["port"] = int(item_value)
smtp_config = SMTPConfig(**smtp_config_items)
self._all_smtp_config[section] = smtp_config
self._validated = True
def __init__(self, attachment_file, mime_type, attachment_filename=None):
"""
:param attachment_file ?????????????file????StringIO???
????????????????????
:param mime_type ???mime type???application/octet-stream
:param attachment_filename ?????????
"""
if attachment_filename is None:
if isinstance(attachment_file, types.StringTypes):
self._attachment_filename = os.path.split(
attachment_file)[1]
elif isinstance(attachment_file, types.FileType):
self._attachment_filename = os.path.split(
attachment_file.name)[1]
else:
raise InvalidArgumentException(
u"????attachement_filename?????????")
def _build_query(self, engine, session, query_items):
query = session.query(*query_items)
if self._query is not None:
if isinstance(self._query, types.StringTypes):
query = query.filter(text(self._query))
else:
query = query.filter(self._query)
if self._order_by is not None:
query = query.order_by(self._order_by)
if self._group_by is not None:
if isinstance(self._group_by, types.StringTypes):
self._group_by = self.automap(engine, self._group_by)
query = query.group_by(self._group_by)
if self._params is not None:
query = query.params(**self._params)
return query
def __call__(self, context):
global _engine_manager
engine_container = _engine_manager.engine(self._engine_name)
session = engine_container.session()
if isinstance(self._sql, types.StringTypes) and \
_SELECT_STATEMENT_REGEX.search(self._sql):
return self._execute_select_statement(session, context)
else:
# ?????
try:
if isinstance(self._sql, types.StringTypes):
session.execute(self._sql, self._params)
# ????
elif isinstance(self._sql, SequenceCollectionType):
if isinstance(self._params, SequenceCollectionType):
for idx, sql in enumerate(self._sql):
session.execute(sql, self._params[idx])
else:
for sql in self._sql:
session.execute(sql)
session.commit()
finally:
session.close()
def _handle_titles(self, titles, auto_title_name):
"""?Title????????????title
???Title????????????:
(Title("id", u"??"), Title("name", u"??"), Title("grade", u"??"))
???????????:
("id", u"??", "name", u"??", "grade", u"??")
"""
first_ele = titles[0]
if isinstance(first_ele, Title):
return titles
elif isinstance(first_ele, types.StringTypes):
if auto_title_name:
return [Title("field_%d" % idx, arg)
for idx, arg in enumerate(titles)]
else:
return [Title(*arg) for arg in zip(titles[::2], titles[1::2])]
else:
raise InvalidTypeException(u"title???????????Title??")
def __init__(self, url, fileOrName,
method='GET', postdata=None, headers=None,
agent="Twisted client", supportPartial=0):
self.requestedPartial = 0
if isinstance(fileOrName, types.StringTypes):
self.fileName = fileOrName
self.file = None
if supportPartial and os.path.exists(self.fileName):
fileLength = os.path.getsize(self.fileName)
if fileLength:
self.requestedPartial = fileLength
if headers == None:
headers = {}
headers["range"] = "bytes=%d-" % fileLength
else:
self.file = fileOrName
HTTPClientFactory.__init__(self, url, method=method, postdata=postdata, headers=headers, agent=agent)
self.deferred = defer.Deferred()
self.waiting = 1
def _send_data_part(data, connection):
if isinstance(data, types.StringTypes):
connection.send(data)
return
# Check to see if data is a file-like object that has a read method.
elif hasattr(data, 'read'):
# Read the file and send it a chunk at a time.
while 1:
binarydata = data.read(100000)
if binarydata == '': break
connection.send(binarydata)
return
else:
# The data object was not a file.
# Try to convert to a string and send the data.
connection.send(str(data))
return
def scopes_to_string(scopes):
"""Converts scope value to a string.
If scopes is a string then it is simply passed through. If scopes is an
iterable then a string is returned that is all the individual scopes
concatenated with spaces.
Args:
scopes: string or iterable of strings, the scopes.
Returns:
The scopes formatted as a single string.
"""
if isinstance(scopes, types.StringTypes):
return scopes
else:
return ' '.join(scopes)
def _render_without_enums(self, rendered_by_wraper):
""""""
_ksn = self._replace_table_without_enum.values()
_ksn = tuple(_ksn)
for i in self._replace_table_without_enum:
_target = self._replace_table_without_enum[i].value
if not isinstance(_target, types.StringTypes):
_target = str(_target)
_tag = self._replace_table_without_enum[i]
if self._wraper_flag[_tag]:
rendered_by_wraper = rendered_by_wraper.replace(i, self._wrap(_target))
else:
rendered_by_wraper = rendered_by_wraper.replace(i, _target)
return rendered_by_wraper
#----------------------------------------------------------------------
def fmtstr(self, *fmt):
str = ''
encoding = 'utf8'#??utf8??
for i in fmt:
if not type(i) in [types.UnicodeType, types.StringTypes, types.StringType]:
s= repr(i)
else:
s = i
if type(s) == type(u''):
str += s.encode(encoding)
else:
str += s
str += '.'
#str += '/n'
#print 'fmtstr:'+str
return str
def expect_exact(self, pattern_list, timeout = -1, searchwindowsize = -1):
"""This is similar to expect(), but uses plain string matching instead
of compiled regular expressions in 'pattern_list'. The 'pattern_list'
may be a string; a list or other sequence of strings; or TIMEOUT and
EOF.
This call might be faster than expect() for two reasons: string
searching is faster than RE matching and it is possible to limit the
search to just the end of the input buffer.
This method is also useful when you don't want to have to worry about
escaping regular expression characters that you want to match."""
if type(pattern_list) in types.StringTypes or pattern_list in (TIMEOUT, EOF):
pattern_list = [pattern_list]
return self.expect_loop(searcher_string(pattern_list), timeout, searchwindowsize)
def __init__(self, depends_on, encoder=ENC_BITS_DEFAULT, fuzzable=True, name=None):
'''
:param depends_on: (name of) field we depend on
:type encoder: :class:`~kitty.model.low_level.encoder.BitsEncoder`
:param encoder: encoder for the field
:param fuzzable: is container fuzzable
:param name: (unique) name of the container
'''
self._rendered_field = None
self.dependency_type = Calculated.VALUE_BASED
super(Calculated, self).__init__(value=self.__class__._default_value_, encoder=encoder, fuzzable=fuzzable, name=name)
if isinstance(depends_on, types.StringTypes):
self._field_name = depends_on
self._field = None
elif isinstance(depends_on, BaseField):
self._field_name = None
self._field = depends_on
else:
raise KittyException('depends_on parameter (%s) is neither a string nor a valid field' % depends_on)
def __init__(self, depends_on, func, encoder=ENC_STR_DEFAULT, fuzzable=False, name=None):
'''
:param depends_on: (name of) field we depend on
:param func: function for processing of the dependant data. func(str)->str
:type encoder: :class:`~kitty.model.low_level.encoder.StrEncoder`
:param encoder: encoder for the field (default: ENC_STR_DEFAULT)
:param fuzzable: is container fuzzable
:param name: (unique) name of the container
'''
try:
res = func('')
kassert.is_of_types(res, types.StringTypes)
self._func = func
except:
raise KittyException('func should be func(str)->str')
super(CalculatedStr, self).__init__(depends_on=depends_on, encoder=encoder, fuzzable=fuzzable, name=name)
def __init__(self, value, num_bits=1, fuzzable=True, name=None):
'''
:param value: value to mutate (str)
:param num_bits: number of consequtive bits to flip (invert)
:param fuzzable: is field fuzzable (default: True)
:param name: name of the object (default: None)
:raises: ``KittyException`` if num_bits is bigger than the value length in bits
:raises: ``KittyException`` if num_bits is not positive
'''
kassert.is_of_types(value, types.StringTypes)
if len(value) * 8 < num_bits:
raise KittyException('len of value in bits(%d) < num_bits(%d)' % (len(value) * 8, num_bits))
if num_bits <= 0:
raise KittyException('num_bits(%d) <= 0' % (num_bits))
super(BitFlip, self).__init__(value=Bits(bytes=value), encoder=ENC_BITS_DEFAULT, fuzzable=fuzzable, name=name)
self._data_len = len(value) * 8
self._num_bits = num_bits
self._num_mutations = self._data_len - (num_bits - 1)
def type_check(tags):
"""Perform a type check on a list of tags"""
if type(tags) in (types.ListType, types.TupleType):
single = False
elif tags == None:
tags = []
single = False
else:
tags = [tags]
single = True
if len([t for t in tags if type(t) not in types.StringTypes]) == 0:
valid = True
else:
valid = False
return tags, single, valid
def __init__(self, collection_names, collection_labels=None, file_names=None, field_names=None):
# load the collections
if isinstance(collection_names,types.StringTypes):
collection_names = [ collection_names ]
if collection_labels is None:
collection_labels = [ None ] * len(collection_names)
self._collections = [ Collection(name,label,file_names,field_names) \
for name,label in zip(collection_names,collection_labels) ]
# find the set of common galaxies and the set of common properties
if len(self._collections) > 1:
self._ids = sorted(reduce(lambda x,y: x&y, [ c.galaxy_ids() for c in self._collections ]))
self._props = reduce(lambda x,y: x&y, [ c.property_names() for c in self._collections ])
else:
self._ids = sorted(self._collections[0].galaxy_ids())
self._props = self._collections[0].property_names()
# print the number of common galaxies
print "Loaded a set of {} collections with {} common galaxies and {} common properties" \
.format(len(self._collections), len(self._ids), len(self._props))
## This function returns a two-dimensional array with the values of the specified property for all common galaxies
# in all collections of the set. The index on the first axis iterates over the collections, the index on the last
# axis iterates over the galaxies, in order of increasing galaxy id.
def __init__(self, collection_names, collection_labels=None, file_names=None, field_names=None):
# load the collections
if isinstance(collection_names,types.StringTypes):
collection_names = [ collection_names ]
if collection_labels is None:
collection_labels = [ None ] * len(collection_names)
self._collections = [ Collection(name,label,file_names,field_names) \
for name,label in zip(collection_names,collection_labels) ]
# find the set of common galaxies and the set of common properties
if len(self._collections) > 1:
self._ids = sorted(reduce(lambda x,y: x&y, [ c.galaxy_ids() for c in self._collections ]))
self._props = reduce(lambda x,y: x&y, [ c.property_names() for c in self._collections ])
else:
self._ids = sorted(self._collections[0].galaxy_ids())
self._props = self._collections[0].property_names()
# print the number of common galaxies
print "Loaded a set of {} collections with {} common galaxies and {} common properties" \
.format(len(self._collections), len(self._ids), len(self._props))
## This function returns a two-dimensional array with the values of the specified property for all common galaxies
# in all collections of the set. The index on the first axis iterates over the collections, the index on the last
# axis iterates over the galaxies, in order of increasing galaxy id.
def __init__(self, url, fileOrName,
method='GET', postdata=None, headers=None,
agent="Twisted client", supportPartial=0):
self.requestedPartial = 0
if isinstance(fileOrName, types.StringTypes):
self.fileName = fileOrName
self.file = None
if supportPartial and os.path.exists(self.fileName):
fileLength = os.path.getsize(self.fileName)
if fileLength:
self.requestedPartial = fileLength
if headers == None:
headers = {}
headers["range"] = "bytes=%d-" % fileLength
else:
self.file = fileOrName
HTTPClientFactory.__init__(self, url, method=method, postdata=postdata, headers=headers, agent=agent)
self.deferred = defer.Deferred()
self.waiting = 1
def p_prop(self, p):
'''prop : prop_name operator prop_value
'''
if p[1].value.upper() in self.INT_TYPE_PROPNAMES:
if p[2].value == '~=':
self._error('"%s"???????"~="???'%(p[1].value), p[2], p[2].lexpos)
if not isinstance(p[3].value, types.IntType):
try:
p[3].value = int(p[3].value)
except ValueError:
self._error('"%s"??????"%s"??????int??'%(p[1].value, type(p[3].value)), p[3], p[3].lexpos)
if p[1].value.upper() == 'MAXDEPTH':
if p[3].value <= 0:
self._error("MaxDepth?????>0", p[3], p[3].lexpos)
elif p[2].value == '~=':
if not isinstance(p[3].value, types.StringTypes):
self._error('???"~="?????"%s"?????'%(type(p[3].value)), p[2], p[2].lexpos)
p[0] = UIObjectProperty(p[1], p[2], p[3])
def scopes_to_string(scopes):
"""Converts scope value to a string.
If scopes is a string then it is simply passed through. If scopes is an
iterable then a string is returned that is all the individual scopes
concatenated with spaces.
Args:
scopes: string or iterable of strings, the scopes.
Returns:
The scopes formatted as a single string.
"""
if isinstance(scopes, types.StringTypes):
return scopes
else:
return ' '.join(scopes)
def matching_files(root_path, file_patterns, test_file_path=None):
"""
_matching_files returns all files in the root path that match the provided patterns.
:param file_patterns: the iterable which contains the file name patterns
:param root_path: is the root path in which should be searched
:return matching files in a set:
"""
s = []
if isinstance(file_patterns, types.StringTypes):
file_patterns = [file_patterns]
if not test_file_path:
test_file_path = ""
elif test_file_path[-1:] == "/":
test_file_path = test_file_path[:-1]
if not root_path:
root_path = os.path.abspath(os.curdir)
for dirpath, _, filenames in os.walk(root_path):
if test_file_path in dirpath:
for f in filenames:
for p in file_patterns:
if fnmatch.fnmatch(f, p):
s.append(dirpath + '/' + f)
return frozenset(s)
def assertMultiLineEqual(self, first, second, msg=None):
"""Assert that two multi-line strings are equal."""
assert isinstance(first, types.StringTypes), (
'First argument is not a string: %r' % (first,))
assert isinstance(second, types.StringTypes), (
'Second argument is not a string: %r' % (second,))
if first == second:
return
if msg:
failure_message = [msg, ':\n']
else:
failure_message = ['\n']
for line in difflib.ndiff(first.splitlines(True), second.splitlines(True)):
failure_message.append(line)
if not line.endswith('\n'):
failure_message.append('\n')
raise self.failureException(''.join(failure_message))
def assertMultiLineEqual(self, first, second, msg=None):
"""Assert that two multi-line strings are equal."""
assert isinstance(first, types.StringTypes), (
'First argument is not a string: %r' % (first,))
assert isinstance(second, types.StringTypes), (
'Second argument is not a string: %r' % (second,))
if first == second:
return
if msg:
failure_message = [msg, ':\n']
else:
failure_message = ['\n']
for line in difflib.ndiff(first.splitlines(True), second.splitlines(True)):
failure_message.append(line)
if not line.endswith('\n'):
failure_message.append('\n')
raise self.failureException(''.join(failure_message))
def validate_request(request):
if not isinstance(request, dict):
fault = Fault(
-32600, 'Request must be {}, not %s.' % type(request)
)
return fault
rpcid = request.get('id', None)
version = get_version(request)
if not version:
fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid)
return fault
request.setdefault('params', [])
method = request.get('method', None)
params = request.get('params')
param_types = (types.ListType, types.DictType, types.TupleType)
if not method or type(method) not in types.StringTypes or \
type(params) not in param_types:
fault = Fault(
-32600, 'Invalid request parameters or method.', rpcid=rpcid
)
return fault
return True
def __init__(self, depends_on, encoder=ENC_BITS_DEFAULT, fuzzable=True, name=None):
'''
:param depends_on: (name of) field we depend on
:type encoder: :class:`~kitty.model.low_level.encoder.BitsEncoder`
:param encoder: encoder for the field
:param fuzzable: is container fuzzable
:param name: (unique) name of the container
'''
self._rendered_field = None
self.dependency_type = Calculated.VALUE_BASED
super(Calculated, self).__init__(value=self.__class__._default_value_, encoder=encoder, fuzzable=fuzzable, name=name)
if isinstance(depends_on, types.StringTypes):
self._field_name = depends_on
self._field = None
elif isinstance(depends_on, BaseField):
self._field_name = None
self._field = depends_on
else:
raise KittyException('depends_on parameter (%s) is neither a string nor a valid field' % depends_on)
def __init__(self, depends_on, func, encoder=ENC_STR_DEFAULT, fuzzable=False, name=None):
'''
:param depends_on: (name of) field we depend on
:param func: function for processing of the dependant data. func(str)->str
:type encoder: :class:`~kitty.model.low_level.encoder.StrEncoder`
:param encoder: encoder for the field (default: ENC_STR_DEFAULT)
:param fuzzable: is container fuzzable
:param name: (unique) name of the container
'''
try:
res = func('')
kassert.is_of_types(res, types.StringTypes)
self._func = func
except:
raise KittyException('func should be func(str)->str')
super(CalculatedStr, self).__init__(depends_on=depends_on, encoder=encoder, fuzzable=fuzzable, name=name)
def __init__(self, value, num_bits=1, fuzzable=True, name=None):
'''
:param value: value to mutate (str)
:param num_bits: number of consequtive bits to flip (invert)
:param fuzzable: is field fuzzable (default: True)
:param name: name of the object (default: None)
:raises: ``KittyException`` if num_bits is bigger than the value length in bits
:raises: ``KittyException`` if num_bits is not positive
'''
kassert.is_of_types(value, types.StringTypes)
if len(value) * 8 < num_bits:
raise KittyException('len of value in bits(%d) < num_bits(%d)' % (len(value) * 8, num_bits))
if num_bits <= 0:
raise KittyException('num_bits(%d) <= 0' % (num_bits))
super(BitFlip, self).__init__(value=Bits(bytes=value), encoder=ENC_BITS_DEFAULT, fuzzable=fuzzable, name=name)
self._data_len = len(value) * 8
self._num_bits = num_bits
self._num_mutations = self._data_len - (num_bits - 1)
def __init__(self, name, attribute, value, fuzz_attribute=False, fuzz_value=True):
'''
:param name: name of the block
:param attribute: attribute
:type value: str/unicode/int
:param value: value of the attribute
:param fuzz_attribute: should we fuzz the attribute field (default: False)
:param fuzz_value: should we fuzz the value field (default: True)
'''
_check_type(attribute, StringTypes, 'attribute')
_check_type(value, StringTypes + (IntType, ), 'value')
value_name = _valuename(name)
if isinstance(value, StringTypes):
value_field = String(value, name=value_name, fuzzable=fuzz_value)
else:
value_field = SInt32(value, encoder=ENC_INT_DEC, fuzzable=fuzz_value, name=value_name)
fields = [
String(attribute, fuzzable=fuzz_attribute, name='%s_attribute' % name),
Static('='),
Static('"'),
value_field,
Static('"')
]
super(XmlAttribute, self).__init__(fields, name=name)
def _PrepareCommand(command):
"""Transform a command to list format."""
if isinstance(command, types.StringTypes):
# Need to account for the fact that shlex escapes backslashes when parsing
# in Posix mode.
if _IsOnWindows():
command = command.replace(os.sep, os.sep + os.sep)
return shlex.split(command, comments=True)
if isinstance(command, tuple) or isinstance(command, list):
return list(command)
raise error.SDKError(
'Command [{cmd}] must be a string, list or tuple.'.format(cmd=command))
# TODO(magimaster): Windows.
# TODO(magimaster): Verify that things are cleaned up if something here fails.
def similarity(self, v1, v2):
"""Return cosine similarity of given words or vectors.
If v1/v2 is a string, look up the corresponding word vector.
This is not particularly efficient function. Instead of many
invocations, consider word_similarity() or direct computation.
"""
vs = [v1, v2]
for i, v in enumerate(vs):
if isinstance(v, StringTypes):
v = self.word_to_unit_vector(v)
else:
v = v/numpy.linalg.norm(v) # costly but safe
vs[i] = v
return numpy.dot(vs[0], vs[1])
def similarity(self, v1, v2):
"""Return cosine similarity of given words or vectors.
If v1/v2 is a string, look up the corresponding word vector.
This is not particularly efficient function. Instead of many
invocations, consider word_similarity() or direct computation.
"""
vs = [v1, v2]
for i, v in enumerate(vs):
if isinstance(v, StringTypes):
v = self.word_to_unit_vector(v)
else:
v = v/numpy.linalg.norm(v) # costly but safe
vs[i] = v
return numpy.dot(vs[0], vs[1])
def __init__(self, url, fileOrName,
method='GET', postdata=None, headers=None,
agent="Twisted client", supportPartial=0,
timeout=0, cookies=None, followRedirect=1,
redirectLimit=20, afterFoundGet=False):
self.requestedPartial = 0
if isinstance(fileOrName, types.StringTypes):
self.fileName = fileOrName
self.file = None
if supportPartial and os.path.exists(self.fileName):
fileLength = os.path.getsize(self.fileName)
if fileLength:
self.requestedPartial = fileLength
if headers == None:
headers = {}
headers["range"] = "bytes=%d-" % fileLength
else:
self.file = fileOrName
HTTPClientFactory.__init__(
self, url, method=method, postdata=postdata, headers=headers,
agent=agent, timeout=timeout, cookies=cookies,
followRedirect=followRedirect, redirectLimit=redirectLimit,
afterFoundGet=afterFoundGet)
def scopes_to_string(scopes):
"""Converts scope value to a string.
If scopes is a string then it is simply passed through. If scopes is an
iterable then a string is returned that is all the individual scopes
concatenated with spaces.
Args:
scopes: string or iterable of strings, the scopes.
Returns:
The scopes formatted as a single string.
"""
if isinstance(scopes, types.StringTypes):
return scopes
else:
return ' '.join(scopes)
def scopes_to_string(scopes):
"""Converts scope value to a string.
If scopes is a string then it is simply passed through. If scopes is an
iterable then a string is returned that is all the individual scopes
concatenated with spaces.
Args:
scopes: string or iterable of strings, the scopes.
Returns:
The scopes formatted as a single string.
"""
if isinstance(scopes, types.StringTypes):
return scopes
else:
return ' '.join(scopes)
def to_xml(self):
"""Convert CORS object into XML string representation."""
s = '<' + CORS_CONFIG + '>'
for collections in self.cors:
s += '<' + CORS + '>'
for (collection, elements_or_value) in collections:
assert collection is not None
s += '<' + collection + '>'
# If collection elements has type string, append atomic value,
# otherwise, append sequence of values in named tags.
if isinstance(elements_or_value, types.StringTypes):
s += elements_or_value
else:
for (name, value) in elements_or_value:
assert name is not None
assert value is not None
s += '<' + name + '>' + value + '</' + name + '>'
s += '</' + collection + '>'
s += '</' + CORS + '>'
s += '</' + CORS_CONFIG + '>'
return s
def __sanity(self, query, sections):
dbg = _debug('_dnsquery::__sanity')
addr, qtype, qclass = query
if not isinstance(addr, types.StringTypes):
raise ValueError('Invalid name %s' % str(addr))
if qtype == 0 or not DNS_TYPE.has_key(qtype):
raise ValueError('Invalid type %u' % qtype)
if qclass == 0 or not DNS_CLASS.has_key(qclass):
raise ValueError('Invalid class %u' % qclass)
self._query = query
if not sections:
return
sections = self.__normalize(sections)
for k in ['AUTHORITY', 'ADDITIONAL']:
if sections.has_key(k):
v = sections[k]
if not (isinstance(v, types.ListType) or isinstance(v, types.TupleType)):
raise ValueError('%s format error' % k)
self._sections[k] = v
def scopes_to_string(scopes):
"""Converts scope value to a string.
If scopes is a string then it is simply passed through. If scopes is an
iterable then a string is returned that is all the individual scopes
concatenated with spaces.
Args:
scopes: string or iterable of strings, the scopes.
Returns:
The scopes formatted as a single string.
"""
if isinstance(scopes, types.StringTypes):
return scopes
else:
return ' '.join(scopes)
def to_xml(self):
"""Convert CORS object into XML string representation."""
s = '<' + CORS_CONFIG + '>'
for collections in self.cors:
s += '<' + CORS + '>'
for (collection, elements_or_value) in collections:
assert collection is not None
s += '<' + collection + '>'
# If collection elements has type string, append atomic value,
# otherwise, append sequence of values in named tags.
if isinstance(elements_or_value, types.StringTypes):
s += elements_or_value
else:
for (name, value) in elements_or_value:
assert name is not None
assert value is not None
s += '<' + name + '>' + value + '</' + name + '>'
s += '</' + collection + '>'
s += '</' + CORS + '>'
s += '</' + CORS_CONFIG + '>'
return s