Python types 模块,ListType() 实例源码
我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用types.ListType()。
def _buildItemList(self, values, typehint=None):
if isinstance(values, OSCMessage):
items = values.items()
elif type(values) == types.ListType:
items = []
for val in values:
if type(val) == types.TupleType:
items.append(val[:2])
else:
items.append((typehint, val))
elif type(values) == types.TupleType:
items = [values[:2]]
else:
items = [(typehint, values)]
return items
def check_headers(headers):
assert_(type(headers) is ListType,
"Headers (%r) must be of type list: %r"
% (headers, type(headers)))
header_names = {}
for item in headers:
assert_(type(item) is TupleType,
"Individual headers (%r) must be of type tuple: %r"
% (item, type(item)))
assert_(len(item) == 2)
name, value = item
assert_(name.lower() != 'status',
"The Status header cannot be used; it conflicts with CGI "
"script, and HTTP status is not given through headers "
"(value: %r)." % value)
header_names[name.lower()] = None
assert_('\n' not in name and ':' not in name,
"Header names may not contain ':' or '\\n': %r" % name)
assert_(header_re.search(name), "Bad header name: %r" % name)
assert_(not name.endswith('-') and not name.endswith('_'),
"Names may not end in '-' or '_': %r" % name)
if bad_header_value_re.search(value):
assert_(0, "Bad header value: %r (bad char: %r)"
% (value, bad_header_value_re.search(value).group(0)))
def bytes2int(bytes):
"""Converts a list of bytes or a string to an integer
>>> (128*256 + 64)*256 + + 15
8405007
>>> l = [128, 64, 15]
>>> bytes2int(l)
8405007
"""
if not (type(bytes) is types.ListType or type(bytes) is types.StringType):
raise TypeError("You must pass a string or a list")
# Convert byte stream to integer
integer = 0
for byte in bytes:
integer *= 256
if type(byte) is types.StringType: byte = ord(byte)
integer += byte
return integer
def str642int(string):
"""Converts a base64 encoded string into an integer.
The chars of this string in in the range '0'-'9','A'-'Z','a'-'z','-','_'
>>> str642int('7MyqL')
123456789
"""
if not (type(string) is types.ListType or type(string) is types.StringType):
raise TypeError("You must pass a string or a list")
integer = 0
for byte in string:
integer *= 64
if type(byte) is types.StringType: byte = ord(byte)
integer += from64(byte)
return integer
def new_looper(a, arg=None):
"""Helper function for nest()
determines what sort of looper to make given a's type"""
if isinstance(a,types.TupleType):
if len(a) == 2:
return RangeLooper(a[0],a[1])
elif len(a) == 3:
return RangeLooper(a[0],a[1],a[2])
elif isinstance(a, types.BooleanType):
return BooleanLooper(a)
elif isinstance(a,types.IntType) or isinstance(a, types.LongType):
return RangeLooper(a)
elif isinstance(a, types.StringType) or isinstance(a, types.ListType):
return ListLooper(a)
elif isinstance(a, Looper):
return a
elif isinstance(a, types.LambdaType):
return CalcField(a, arg)
def __init__(self,rowset,description):
# save the description as is
self.description = fRow(description)
self.description.__Field2Index__ = self.__fieldToIndex
# Create the list and dict of fields
self.fields = []
self.__fieldDict = {}
for f in range(len(description)):
if type(description[f]) == types.TupleType or type(description[f]) == types.ListType:
self.__fieldDict[description[f][0].lower()] = f
self.fields.append( description[f][0].lower())
else:
self.__fieldDict[description[f].lower()] = f
self.fields.append( description[f].lower())
# Add all the rows
for r in rowset:
self.append(r)
def get_value(my, path):
parts = path.split("/")
current = my.package
for part in parts:
current = current.get(part)
# explict None comparison: empty string should go through
if current == None:
raise HandlerException("Part [%s] does not exist in package" % part)
# if this is still a dictionary and it has __VALUE__ in it, the
# get that value
if type(current) == types.DictionaryType and current.has_key("__VALUE__"):
current = current.get("__VALUE__")
if type(current) == types.ListType:
return current[0]
else:
return current
def rle(iterable):
"""Run length encode a list"""
iterable = iter(iterable)
runlen = 1
result = []
try:
previous = iterable.next()
except StopIteration:
return []
for element in iterable:
if element == previous:
runlen = runlen + 1
continue
else:
if isinstance(previous, (types.ListType, types.TupleType)):
previous = rle(previous)
result.append([previous, runlen])
previous = element
runlen = 1
if isinstance(previous, (types.ListType, types.TupleType)):
previous = rle(previous)
result.append([previous, runlen])
return result
def finish(self, lastLine, unusedCallback):
send = []
unuse = []
for L in self.lines:
names = parseNestedParens(L)
N = len(names)
if (N >= 1 and names[0] in self._1_RESPONSES or
N >= 2 and names[0] == 'OK' and isinstance(names[1], types.ListType) and names[1][0] in self._OK_RESPONSES):
send.append(L)
elif N >= 3 and names[1] in self._2_RESPONSES:
if isinstance(names[2], list) and len(names[2]) >= 1 and names[2][0] == 'FLAGS' and 'FLAGS' not in self.args:
unuse.append(L)
else:
send.append(L)
elif N >= 2 and names[1] in self._2_RESPONSES:
send.append(L)
else:
unuse.append(L)
d, self.defer = self.defer, None
d.callback((send, lastLine))
if unuse:
unusedCallback(unuse)
def check_headers(headers):
assert_(type(headers) is ListType,
"Headers (%r) must be of type list: %r"
% (headers, type(headers)))
header_names = {}
for item in headers:
assert_(type(item) is TupleType,
"Individual headers (%r) must be of type tuple: %r"
% (item, type(item)))
assert_(len(item) == 2)
name, value = item
assert_(name.lower() != 'status',
"The Status header cannot be used; it conflicts with CGI "
"script, and HTTP status is not given through headers "
"(value: %r)." % value)
header_names[name.lower()] = None
assert_('\n' not in name and ':' not in name,
"Header names may not contain ':' or '\\n': %r" % name)
assert_(header_re.search(name), "Bad header name: %r" % name)
assert_(not name.endswith('-') and not name.endswith('_'),
"Names may not end in '-' or '_': %r" % name)
if bad_header_value_re.search(value):
assert_(0, "Bad header value: %r (bad char: %r)"
% (value, bad_header_value_re.search(value).group(0)))
def __init__(self, path, tipo, compile=False, formatlist=None):
"""Constructor"""
if tipo == FRAGMENT or tipo == VERTEX:
self.shader = None
self.file = str(self.load(path))
self.tipo = tipo
self.path = path
self.compiled = False
if formatlist is not None:
if isinstance(formatlist, types.ListType):
num = 0
for f in formatlist:
key = '{' + str(num) + '}'
if key in self.file:
self.file = self.file.replace(key, str(f))
num += 1
else:
raise Exception("el tipo de formatlist debe ser del tipo list")
else:
raise Exception("tipo de shader incorrecto, debe ser FRAGMENT o VERTEX")
if compile:
self.compile()
def __iadd__(self, other):
"""Suma un vector con otro"""
if isinstance(other, Vector3):
self.x += other.get_x()
self.y += other.get_y()
self.z += other.get_z()
return self
elif isinstance(other, types.TupleType) or isinstance(other, types.ListType):
if len(other) == 3:
self.x += other[0]
self.y += other[1]
self.z += other[2]
return self
else:
self.throwError(2, "__iadd__")
return self
def __imul__(self, other):
"""Producto punto con otro"""
if isinstance(other, Vector3):
self.x *= other.get_x()
self.y *= other.get_y()
self.z *= other.get_z()
return self
else:
if isinstance(other, types.ListType) or isinstance(other, types.TupleType):
self.x *= other[0]
self.y *= other[1]
self.z *= other[2]
return self
elif isinstance(other, types.IntType) or isinstance(other, types.FloatType):
self.x *= other
self.y *= other
self.z *= other
return self
else:
self.throwError(2, "__imul__")
return self
def __idiv__(self, other):
"""Division con otro vector por valor"""
if isinstance(other, Vector3):
self.x /= other.get_x()
self.y /= other.get_y()
self.z /= other.get_z()
return self
else:
if isinstance(other, types.ListType) or isinstance(other, types.TupleType):
self.x /= other[0]
self.y /= other[1]
self.z /= other[2]
return self
elif isinstance(other, types.IntType) or isinstance(other, types.FloatType):
self.x /= other
self.y /= other
self.z /= other
return self
else:
self.throwError(2, "__idiv__")
return self
def normal3points(a, b, c):
"""Retorna el vector normal dado tres puntos a, b, c"""
if isinstance(a, types.ListType) or isinstance(a, types.TupleType):
a = Vector3(*a)
b = Vector3(*b)
c = Vector3(*c)
elif isinstance(a, Point3):
a = Vector3(*a.export_to_list())
b = Vector3(*b.export_to_list())
c = Vector3(*c.export_to_list())
cross_result = (a - c).cross(b - c).get_normalized()
if cross_result.get_x() == -0.0:
cross_result.set_x(0.0)
if cross_result.get_y() == -0.0:
cross_result.set_y(0.0)
if cross_result.get_z() == -0.0:
cross_result.set_z(0.0)
return cross_result
def __iadd__(self, other):
"""Suma un vector con otro"""
if isinstance(other, Vector3):
self.x += other.get_x()
self.y += other.get_y()
self.z += other.get_z()
return self
elif isinstance(other, types.TupleType) or isinstance(other, types.ListType):
if len(other) == 3:
self.x += other[0]
self.y += other[1]
self.z += other[2]
return self
else:
self.throwError(2, "__iadd__")
return self
def __isub__(self, other):
"""Resta un vector con otro"""
if isinstance(other, Vector3):
self.x -= other.get_x()
self.y -= other.get_y()
self.z -= other.get_z()
return self
elif isinstance(other, types.TupleType) or isinstance(other, types.ListType):
if len(other) == 3:
self.x -= other[0]
self.y -= other[1]
self.z -= other[2]
return self
else:
self.throwError(2, "__isub__")
return self
def __imul__(self, other):
"""Producto punto con otro"""
if isinstance(other, Vector3):
self.x *= other.get_x()
self.y *= other.get_y()
self.z *= other.get_z()
return self
else:
if isinstance(other, types.ListType) or isinstance(other, types.TupleType):
self.x *= other[0]
self.y *= other[1]
self.z *= other[2]
return self
elif isinstance(other, types.IntType) or isinstance(other, types.FloatType):
self.x *= other
self.y *= other
self.z *= other
return self
else:
self.throwError(2, "__imul__")
return self
def normal3points(a, b, c):
"""Retorna el vector normal dado tres puntos a, b, c"""
if isinstance(a, types.ListType) or isinstance(a, types.TupleType):
a = Vector3(*a)
b = Vector3(*b)
c = Vector3(*c)
elif isinstance(a, Point3):
a = Vector3(*a.export_to_list())
b = Vector3(*b.export_to_list())
c = Vector3(*c.export_to_list())
cross_result = (a - c).cross(b - c).get_normalized()
if cross_result.get_x() == -0.0:
cross_result.set_x(0.0)
if cross_result.get_y() == -0.0:
cross_result.set_y(0.0)
if cross_result.get_z() == -0.0:
cross_result.set_z(0.0)
return cross_result
def bytes2int(bytes):
"""Converts a list of bytes or a string to an integer
>>> (128*256 + 64)*256 + + 15
8405007
>>> l = [128, 64, 15]
>>> bytes2int(l)
8405007
"""
if not (type(bytes) is types.ListType or type(bytes) is types.StringType):
raise TypeError("You must pass a string or a list")
# Convert byte stream to integer
integer = 0
for byte in bytes:
integer *= 256
if type(byte) is types.StringType: byte = ord(byte)
integer += byte
return integer
def str642int(string):
"""Converts a base64 encoded string into an integer.
The chars of this string in in the range '0'-'9','A'-'Z','a'-'z','-','_'
>>> str642int('7MyqL')
123456789
"""
if not (type(string) is types.ListType or type(string) is types.StringType):
raise TypeError("You must pass a string or a list")
integer = 0
for byte in string:
integer *= 64
if type(byte) is types.StringType: byte = ord(byte)
integer += from64(byte)
return integer
def unwrap(value):
t = type(value)
if t is types.InstanceType and isinstance(value, DynamicProxy):
if pyro_daemon:
try:
return pyro_daemon.getLocalObject(value.objectID)
except KeyError:
pass
return value
elif t is types.ListType:
for i in range(len(value)):
value[i] = unwrap(value[i])
elif t is types.TupleType:
value = list(value)
for i in range(len(value)):
value[i] = unwrap(value[i])
return tuple(value)
elif t is types.DictType:
for k, v in value.items():
value[k] = unwrap(v)
return value
def _json_object_to_object(self, json_object, entity_class, context={}):
if type(json_object) == types.DictionaryType:
if hasattr(entity_class, '_is_paginated') and entity_class._is_paginated:
try:
if hasattr(self, '_process_page'):
return self._process_page(json_object, entity_class, context)
else:
raise AttributeError('Class %s does not have _process_page() method' % entity_class.__name__)
except ValueError:
return map_to_object(json_object, entity_class)
else:
return map_to_object(json_object, entity_class)
elif type(json_object) == types.ListType:
objects = []
for obj in json_object:
objects.append(map_to_object(obj, entity_class))
# if auto_page is set, return a page with the items
if 'typeId' in json_object and json_object['typeId'] == 'com.tintri.api.rest.v310.dto.Page' and self.__auto_page:
return TintriPage(paginated=entity_class._is_paginated, items=objects, context=context)
else:
return objects
else:
return json_object # Return any other type as is, such as None, string, number, boolean
def isOlderThan(self, arg):
if not self.timestamp:
return True
if isinstance(arg, types.IntType) or isinstance(arg, types.LongType) or isinstance(arg, types.FloatType):
return self.timestamp < arg
if isinstance(arg, TimeStampFile):
if arg.timestamp is None:
return False
else:
return arg.timestamp > self.timestamp
elif isinstance(arg, types.ListType):
files = arg
else:
files = [arg]
for f in files:
if os.path.getmtime(f) > self.timestamp:
return True
return False
def isNewerThan(self, arg):
if not self.timestamp:
return False
if isinstance(arg, types.IntType) or isinstance(arg, types.LongType) or isinstance(arg, types.FloatType):
return self.timestamp > arg
if isinstance(arg, TimeStampFile):
if arg.timestamp is None:
return False
else:
return arg.timestamp < self.timestamp
elif isinstance(arg, types.ListType):
files = arg
else:
files = [arg]
for f in files:
if os.path.getmtime(f) < self.timestamp:
return True
return False
def check_headers(headers):
assert_(type(headers) is ListType,
"Headers (%r) must be of type list: %r"
% (headers, type(headers)))
header_names = {}
for item in headers:
assert_(type(item) is TupleType,
"Individual headers (%r) must be of type tuple: %r"
% (item, type(item)))
assert_(len(item) == 2)
name, value = item
assert_(name.lower() != 'status',
"The Status header cannot be used; it conflicts with CGI "
"script, and HTTP status is not given through headers "
"(value: %r)." % value)
header_names[name.lower()] = None
assert_('\n' not in name and ':' not in name,
"Header names may not contain ':' or '\\n': %r" % name)
assert_(header_re.search(name), "Bad header name: %r" % name)
assert_(not name.endswith('-') and not name.endswith('_'),
"Names may not end in '-' or '_': %r" % name)
if bad_header_value_re.search(value):
assert_(0, "Bad header value: %r (bad char: %r)"
% (value, bad_header_value_re.search(value).group(0)))
def _generateSignature(self, params, secret):
if not params or not secret:
return ""
hash_string = ""
if not params.get('t'):
# note, this uses a utc timestamp not a local timestamp
params['t'] = str(int(time.mktime(time.gmtime())))
keys = params.keys()
keys.sort()
for k in keys:
if type(params[k]) in [types.ListType, types.TupleType]:
for v in params[k]:
hash_string += v
else:
hash_string += params[k]
hash_string += secret
signature = hashlib.md5(hash_string).hexdigest()[:10]
return signature
def check_headers(headers):
assert_(type(headers) is ListType,
"Headers (%r) must be of type list: %r"
% (headers, type(headers)))
header_names = {}
for item in headers:
assert_(type(item) is TupleType,
"Individual headers (%r) must be of type tuple: %r"
% (item, type(item)))
assert_(len(item) == 2)
name, value = item
assert_(name.lower() != 'status',
"The Status header cannot be used; it conflicts with CGI "
"script, and HTTP status is not given through headers "
"(value: %r)." % value)
header_names[name.lower()] = None
assert_('\n' not in name and ':' not in name,
"Header names may not contain ':' or '\\n': %r" % name)
assert_(header_re.search(name), "Bad header name: %r" % name)
assert_(not name.endswith('-') and not name.endswith('_'),
"Names may not end in '-' or '_': %r" % name)
if bad_header_value_re.search(value):
assert_(0, "Bad header value: %r (bad char: %r)"
% (value, bad_header_value_re.search(value).group(0)))
def iterate(self):
self.check_file()
self.handler.start()
dirpath, filename = os.path.split(self.filename)
modulename = filename[:-3]
if dirpath in sys.path:
sys.path.remove(dirpath)
sys.path.insert(0, dirpath)
module = __import__(modulename)
member_names = dir(module)
for name in member_names:
attr = getattr(module, name)
if isinstance(attr, Template):
self.handler.handle(attr)
elif isinstance(attr, types.ListType):
for mem in attr:
if isinstance(attr, Template):
self.handler.handle(attr)
elif isinstance(attr, types.DictionaryType):
for k in attr:
if isinstance(attr, Template):
self.handler.handle(attr[k])
def check_headers(headers):
assert_(type(headers) is ListType,
"Headers (%r) must be of type list: %r"
% (headers, type(headers)))
header_names = {}
for item in headers:
assert_(type(item) is TupleType,
"Individual headers (%r) must be of type tuple: %r"
% (item, type(item)))
assert_(len(item) == 2)
name, value = item
assert_(name.lower() != 'status',
"The Status header cannot be used; it conflicts with CGI "
"script, and HTTP status is not given through headers "
"(value: %r)." % value)
header_names[name.lower()] = None
assert_('\n' not in name and ':' not in name,
"Header names may not contain ':' or '\\n': %r" % name)
assert_(header_re.search(name), "Bad header name: %r" % name)
assert_(not name.endswith('-') and not name.endswith('_'),
"Names may not end in '-' or '_': %r" % name)
if bad_header_value_re.search(value):
assert_(0, "Bad header value: %r (bad char: %r)"
% (value, bad_header_value_re.search(value).group(0)))
def type_check(tags):
"""Perform a type check on a list of tags"""
if type(tags) in (types.ListType, types.TupleType):
single = False
elif tags == None:
tags = []
single = False
else:
tags = [tags]
single = True
if len([t for t in tags if type(t) not in types.StringTypes]) == 0:
valid = True
else:
valid = False
return tags, single, valid
def __getitem__(self, key):
if key == 'category':
return UserDict.__getitem__(self, 'tags')[0]['term']
if key == 'enclosures':
norel = lambda link: FeedParserDict([(name,value) for (name,value) in link.items() if name!='rel'])
return [norel(link) for link in UserDict.__getitem__(self, 'links') if link['rel']=='enclosure']
if key == 'license':
for link in UserDict.__getitem__(self, 'links'):
if link['rel']=='license' and link.has_key('href'):
return link['href']
if key == 'categories':
return [(tag['scheme'], tag['term']) for tag in UserDict.__getitem__(self, 'tags')]
realkey = self.keymap.get(key, key)
if type(realkey) == types.ListType:
for k in realkey:
if UserDict.__contains__(self, k):
return UserDict.__getitem__(self, k)
if UserDict.__contains__(self, key):
return UserDict.__getitem__(self, key)
return UserDict.__getitem__(self, realkey)
def rle(iterable):
"""Run length encode a list"""
iterable = iter(iterable)
runlen = 1
result = []
try:
previous = iterable.next()
except StopIteration:
return []
for element in iterable:
if element == previous:
runlen = runlen + 1
continue
else:
if isinstance(previous, (types.ListType, types.TupleType)):
previous = rle(previous)
result.append([previous, runlen])
previous = element
runlen = 1
if isinstance(previous, (types.ListType, types.TupleType)):
previous = rle(previous)
result.append([previous, runlen])
return result
def finish(self, lastLine, unusedCallback):
send = []
unuse = []
for L in self.lines:
names = parseNestedParens(L)
N = len(names)
if (N >= 1 and names[0] in self._1_RESPONSES or
N >= 2 and names[0] == 'OK' and isinstance(names[1], types.ListType) and names[1][0] in self._OK_RESPONSES):
send.append(L)
elif N >= 3 and names[1] in self._2_RESPONSES:
if isinstance(names[2], list) and len(names[2]) >= 1 and names[2][0] == 'FLAGS' and 'FLAGS' not in self.args:
unuse.append(L)
else:
send.append(L)
elif N >= 2 and names[1] in self._2_RESPONSES:
send.append(L)
else:
unuse.append(L)
d, self.defer = self.defer, None
d.callback((send, lastLine))
if unuse:
unusedCallback(unuse)
def check_headers(headers):
assert_(type(headers) is ListType,
"Headers (%r) must be of type list: %r"
% (headers, type(headers)))
header_names = {}
for item in headers:
assert_(type(item) is TupleType,
"Individual headers (%r) must be of type tuple: %r"
% (item, type(item)))
assert_(len(item) == 2)
name, value = item
assert_(name.lower() != 'status',
"The Status header cannot be used; it conflicts with CGI "
"script, and HTTP status is not given through headers "
"(value: %r)." % value)
header_names[name.lower()] = None
assert_('\n' not in name and ':' not in name,
"Header names may not contain ':' or '\\n': %r" % name)
assert_(header_re.search(name), "Bad header name: %r" % name)
assert_(not name.endswith('-') and not name.endswith('_'),
"Names may not end in '-' or '_': %r" % name)
if bad_header_value_re.search(value):
assert_(0, "Bad header value: %r (bad char: %r)"
% (value, bad_header_value_re.search(value).group(0)))
def unparse(self,dn,record):
"""
dn
string-representation of distinguished name
record
Either a dictionary holding the LDAP entry {attrtype:record}
or a list with a modify list like for LDAPObject.modify().
"""
if not record:
# Simply ignore empty records
return
# Start with line containing the distinguished name
self._unparseAttrTypeandValue('dn',dn)
# Dispatch to record type specific writers
if isinstance(record,types.DictType):
self._unparseEntryRecord(record)
elif isinstance(record,types.ListType):
self._unparseChangeRecord(record)
else:
raise ValueError, "Argument record must be dictionary or list"
# Write empty line separating the records
self._output_file.write(self._line_sep)
# Count records written
self.records_written = self.records_written+1
return # unparse()
def check_headers(headers):
assert_(type(headers) is ListType,
"Headers (%r) must be of type list: %r"
% (headers, type(headers)))
header_names = {}
for item in headers:
assert_(type(item) is TupleType,
"Individual headers (%r) must be of type tuple: %r"
% (item, type(item)))
assert_(len(item) == 2)
name, value = item
assert_(name.lower() != 'status',
"The Status header cannot be used; it conflicts with CGI "
"script, and HTTP status is not given through headers "
"(value: %r)." % value)
header_names[name.lower()] = None
assert_('\n' not in name and ':' not in name,
"Header names may not contain ':' or '\\n': %r" % name)
assert_(header_re.search(name), "Bad header name: %r" % name)
assert_(not name.endswith('-') and not name.endswith('_'),
"Names may not end in '-' or '_': %r" % name)
if bad_header_value_re.search(value):
assert_(0, "Bad header value: %r (bad char: %r)"
% (value, bad_header_value_re.search(value).group(0)))
def check_headers(headers):
assert_(type(headers) is ListType,
"Headers (%r) must be of type list: %r"
% (headers, type(headers)))
header_names = {}
for item in headers:
assert_(type(item) is TupleType,
"Individual headers (%r) must be of type tuple: %r"
% (item, type(item)))
assert_(len(item) == 2)
name, value = item
assert_(name.lower() != 'status',
"The Status header cannot be used; it conflicts with CGI "
"script, and HTTP status is not given through headers "
"(value: %r)." % value)
header_names[name.lower()] = None
assert_('\n' not in name and ':' not in name,
"Header names may not contain ':' or '\\n': %r" % name)
assert_(header_re.search(name), "Bad header name: %r" % name)
assert_(not name.endswith('-') and not name.endswith('_'),
"Names may not end in '-' or '_': %r" % name)
if bad_header_value_re.search(value):
assert_(0, "Bad header value: %r (bad char: %r)"
% (value, bad_header_value_re.search(value).group(0)))
def _genFileKey(fileInfo):
"""
Generate a dictionary key from information in the File Info object,
which is either a list with information from ngas_files, or an
ngamsFileInfo object.
fileInfo: File Info as read from the ngas_files table or
an instance of ngamsFileInfo (list|ngamsFileInfo).
Returns: File key (string).
"""
if ((type(fileInfo) == types.ListType) or
(type(fileInfo) == types.TupleType)):
fileId = fileInfo[ngamsDbCore.NGAS_FILES_FILE_ID]
fileVer = fileInfo[ngamsDbCore.NGAS_FILES_FILE_VER]
else:
fileId = fileInfo.getFileId()
fileVer = fileInfo.getFileVersion()
return ngamsLib.genFileKey(None, fileId, fileVer)
def validate_request(request):
if not isinstance(request, dict):
fault = Fault(
-32600, 'Request must be {}, not %s.' % type(request)
)
return fault
rpcid = request.get('id', None)
version = get_version(request)
if not version:
fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid)
return fault
request.setdefault('params', [])
method = request.get('method', None)
params = request.get('params')
param_types = (types.ListType, types.DictType, types.TupleType)
if not method or type(method) not in types.StringTypes or \
type(params) not in param_types:
fault = Fault(
-32600, 'Invalid request parameters or method.', rpcid=rpcid
)
return fault
return True
def isbatch(result):
if type(result) not in (types.ListType, types.TupleType):
return False
if len(result) < 1:
return False
if not isinstance(result[0], dict):
return False
if 'jsonrpc' not in result[0].keys():
return False
try:
version = float(result[0]['jsonrpc'])
except ValueError:
raise ProtocolError('"jsonrpc" key must be a float(able) value.')
if version < 2:
return False
return True
def iterate(self):
self.check_file()
self.handler.start()
dirpath, filename = os.path.split(self.filename)
modulename = filename[:-3]
if dirpath in sys.path:
sys.path.remove(dirpath)
sys.path.insert(0, dirpath)
module = __import__(modulename)
member_names = dir(module)
for name in member_names:
attr = getattr(module, name)
if isinstance(attr, Template):
self.handler.handle(attr)
elif isinstance(attr, types.ListType):
for mem in attr:
if isinstance(attr, Template):
self.handler.handle(attr)
elif isinstance(attr, types.DictionaryType):
for k in attr:
if isinstance(attr, Template):
self.handler.handle(attr[k])
def PnmToPng(pnmstring, reporterror = Utils.Error):
'''Convert a pnm file into a png file by piping through "pnmtopng".
"pnmstring" can either be a string or a list of strings. Accordingly,
one png in a string or a list of pngs in strings is returned.'''
if type(pnmstring) == types.ListType:
res = []
for p in pnmstring:
res.append(PnmToPng(p,reporterror))
return res
if type(pnmstring) != types.StringType:
return pnmstring
try:
inp,out = os.popen2('pnmtopng -compression 9 -background white '+
'-transparent white', bufsize=len(pnmstring)+1024)
inp.write(pnmstring)
inp.close()
res = out.read()
out.close()
except:
msg = 'Problems during call to "pnmtopng".'
reporterror(msg)
raise Utils.UtilsError, msg
return res
def bytes2int(bytes):
"""Converts a list of bytes or a string to an integer
>>> (128*256 + 64)*256 + + 15
8405007
>>> l = [128, 64, 15]
>>> bytes2int(l)
8405007
"""
if not (type(bytes) is types.ListType or type(bytes) is types.StringType):
raise TypeError("You must pass a string or a list")
# Convert byte stream to integer
integer = 0
for byte in bytes:
integer *= 256
if type(byte) is types.StringType: byte = ord(byte)
integer += byte
return integer
def str642int(string):
"""Converts a base64 encoded string into an integer.
The chars of this string in in the range '0'-'9','A'-'Z','a'-'z','-','_'
>>> str642int('7MyqL')
123456789
"""
if not (type(string) is types.ListType or type(string) is types.StringType):
raise TypeError("You must pass a string or a list")
integer = 0
for byte in string:
integer *= 64
if type(byte) is types.StringType: byte = ord(byte)
integer += from64(byte)
return integer
def send_resp_header(self, cont_type, cont_length, range=False): # @ReservedAssignment
if range:
self.send_response(206, 'Partial Content')
else:
self.send_response(200, 'OK')
self.send_header('Content-Type', cont_type)
self.send_header('transferMode.dlna.org', 'Streaming')
self.send_header('contentFeatures.dlna.org', 'DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=01700000000000000000000000000000')
self.send_header('Accept-Ranges', 'bytes')
if range:
if isinstance(range, (types.TupleType, types.ListType)) and len(range)==3:
self.send_header('Content-Range', 'bytes %d-%d/%d' % range)
self.send_header('Content-Length', range[1]-range[0]+1)
else:
raise ValueError('Invalid range value')
else:
self.send_header('Content-Length', cont_length)
self.finish_header()
def send_resp_header(self, cont_type, size, range=False):
if range:
self.send_response(206, 'Partial Content')
else:
self.send_response(200, 'OK')
self.send_header('Content-Type', cont_type)
self.send_header('Accept-Ranges', 'bytes')
if range:
if isinstance(range, (types.TupleType, types.ListType)) and len(range)==3:
self.send_header('Content-Range', 'bytes %d-%d/%d' % range)
self.send_header('Content-Length', range[1]-range[0]+1)
else:
raise ValueError('Invalid range value')
else:
self.send_header('Content-Length', size)
self.send_header('Connection', 'close')
self.end_headers()