Python string 模块,encode() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用string.encode()。
def send_content(self, connection, request_body):
connection.putheader("Content-Type", "text/xml")
#optionally encode the request
if (self.encode_threshold is not None and
self.encode_threshold < len(request_body) and
gzip):
connection.putheader("Content-Encoding", "gzip")
request_body = gzip_encode(request_body)
connection.putheader("Content-Length", str(len(request_body)))
connection.endheaders(request_body)
##
# Parse response.
#
# @param file Stream.
# @return Response tuple and target method.
def send_content(self, connection, request_body):
connection.putheader("Content-Type", "text/xml")
#optionally encode the request
if (self.encode_threshold is not None and
self.encode_threshold < len(request_body) and
gzip):
connection.putheader("Content-Encoding", "gzip")
request_body = gzip_encode(request_body)
connection.putheader("Content-Length", str(len(request_body)))
connection.endheaders(request_body)
##
# Parse response.
#
# @param file Stream.
# @return Response tuple and target method.
def serve_amfrpc(self, version=0):
try:
import pyamf
import pyamf.remoting.gateway
except:
return "pyamf not installed or not in Python sys.path"
request = current.request
response = current.response
if version == 3:
services = self.amfrpc3_procedures
base_gateway = pyamf.remoting.gateway.BaseGateway(services)
pyamf_request = pyamf.remoting.decode(request.body)
else:
services = self.amfrpc_procedures
base_gateway = pyamf.remoting.gateway.BaseGateway(services)
context = pyamf.get_context(pyamf.AMF0)
pyamf_request = pyamf.remoting.decode(request.body, context)
pyamf_response = pyamf.remoting.Envelope(pyamf_request.amfVersion)
for name, message in pyamf_request:
pyamf_response[name] = base_gateway.getProcessor(message)(message)
response.headers['Content-Type'] = pyamf.remoting.CONTENT_TYPE
if version == 3:
return pyamf.remoting.encode(pyamf_response).getvalue()
else:
return pyamf.remoting.encode(pyamf_response, context).getvalue()
def serve_amfrpc(self, version=0):
try:
import pyamf
import pyamf.remoting.gateway
except:
return "pyamf not installed or not in Python sys.path"
request = current.request
response = current.response
if version == 3:
services = self.amfrpc3_procedures
base_gateway = pyamf.remoting.gateway.BaseGateway(services)
pyamf_request = pyamf.remoting.decode(request.body)
else:
services = self.amfrpc_procedures
base_gateway = pyamf.remoting.gateway.BaseGateway(services)
context = pyamf.get_context(pyamf.AMF0)
pyamf_request = pyamf.remoting.decode(request.body, context)
pyamf_response = pyamf.remoting.Envelope(pyamf_request.amfVersion)
for name, message in pyamf_request:
pyamf_response[name] = base_gateway.getProcessor(message)(message)
response.headers['Content-Type'] = pyamf.remoting.CONTENT_TYPE
if version == 3:
return pyamf.remoting.encode(pyamf_response).getvalue()
else:
return pyamf.remoting.encode(pyamf_response, context).getvalue()
def send_content(self, connection, request_body):
connection.putheader("Content-Type", "text/xml")
#optionally encode the request
if (self.encode_threshold is not None and
self.encode_threshold < len(request_body) and
gzip):
connection.putheader("Content-Encoding", "gzip")
request_body = gzip_encode(request_body)
connection.putheader("Content-Length", str(len(request_body)))
connection.endheaders(request_body)
##
# Parse response.
#
# @param file Stream.
# @return Response tuple and target method.
def send_content(self, connection, request_body):
connection.putheader("Content-Type", "text/xml")
#optionally encode the request
if (self.encode_threshold is not None and
self.encode_threshold < len(request_body) and
gzip):
connection.putheader("Content-Encoding", "gzip")
request_body = gzip_encode(request_body)
connection.putheader("Content-Length", str(len(request_body)))
connection.endheaders(request_body)
##
# Parse response.
#
# @param file Stream.
# @return Response tuple and target method.
def send_content(self, connection, request_body):
connection.putheader("Content-Type", "text/xml")
#optionally encode the request
if (self.encode_threshold is not None and
self.encode_threshold < len(request_body) and
gzip):
connection.putheader("Content-Encoding", "gzip")
request_body = gzip_encode(request_body)
connection.putheader("Content-Length", str(len(request_body)))
connection.endheaders(request_body)
##
# Parse response.
#
# @param file Stream.
# @return Response tuple and target method.
def sign(self):
logger.debug('Enter sign...')
logger.debug('self.ret:%s' % (self.ret))
#string = '&'.join(['%s=%s' % (key.lower(), self.ret[key]) for key in sorted(self.ret)])
#string = 'appid=%s&body=%s&device_info=%s&mch_id=%s&nonce_str=%s' % (self.ret['appid'],self.ret['body'],self.ret['device_info'],self.ret['mch_id'],self.ret['nonce_str'])
string = 'nima'
logger.debug('string:' + str(string))
logger.debug('string:%s' % (string))
mch_key = System_Config.objects.get(name='wechat_mch_key').val
logger.debug('mch_key:%s' %(mch_key))
#stringSignTemp= '%s&key=%s' % (string,mch_key)
#logger.debug('stringSignTemp:%s' % (stringSignTemp))
#signature = hashlib.sha1(string.encode('utf8')).hexdigest()
#signature = hashlib.md5(stringSignTemp.encode('utf8')).hexdigest().upper()
#self.other_parameters['sign'] = signature
#logger.debug('signature:%s' % (signature))
#return signature
def playTrack(asin):
content = trackPostUnicodeGetHLSPage('https://music.amazon.de/dmls/', asin)
temp_file_path = addonUserDataFolder
if forceDVDPlayer:
temp_file_path += "/temp.mp4"
else:
temp_file_path += "/temp.m3u8"
if xbmcvfs.exists(temp_file_path):
xbmcvfs.delete(temp_file_path)
m3u_temp_file = xbmcvfs.File(temp_file_path, 'w')
manifest_match = re.compile('manifest":"(.+?)"',re.DOTALL).findall(content)
if manifest_match:
m3u_string = manifest_match[0]
m3u_string = m3u_string.replace("\\n", os.linesep)
m3u_temp_file.write(m3u_string.encode("ascii"))
m3u_temp_file.close()
play_item = xbmcgui.ListItem(path=temp_file_path)
play_item = setPlayItemInfo(play_item)
xbmcplugin.setResolvedUrl(pluginhandle, True, listitem=play_item)
def search(type):
keyboard = xbmc.Keyboard('', translation(30015))
keyboard.doModal()
if keyboard.isConfirmed() and keyboard.getText():
search_string = unicode(keyboard.getText(), "utf-8").replace(" ", "+")
search_string = urllib.quote_plus(search_string.encode("utf8"))
if siteVersion=="de":
if type=="albums":
listAlbums(urlMain+"/s?rh=n%3A5686557031%2Ck%2Cp_n_format_browse-bin%3A180848031&keywords="+search_string+"&ie=UTF8")
elif type=="songs":
listSearchedSongs(urlMain+"/s/ref=sr_nr_p_n_format_browse-bi_2?fst=as%3Aoff&rh=n%3A5686557031%2Ck%2Cp_n_format_browse-bin%3A180849031&bbn=5686557031&keywords="+search_string+"&ie=UTF8")
# elif siteVersion=="com":
# if type=="movies":
# listMovies(urlMain+"/mn/search/ajax/?_encoding=UTF8&url=node%3D7613704011&field-keywords="+search_string)
# elif type=="tv":
# listShows(urlMain+"/mn/search/ajax/?_encoding=UTF8&url=node%3D2858778011&field-keywords="+search_string)
# elif siteVersion=="co.uk":
# if type=="movies":
# listMovies(urlMain+"/mn/search/ajax/?_encoding=UTF8&url=node%3D3356010031&field-keywords="+search_string)
# elif type=="tv":
# listShows(urlMain+"/mn/search/ajax/?_encoding=UTF8&url=node%3D3356011031&field-keywords="+search_string)
def __unquote_to_bytes(string):
string = string.encode('utf-8')
bits = string.split(b'%')
if len(bits) == 1:
return string
res = [bits[0]]
append = res.append
# Delay the initialization of the table to not waste memory
# if the function is never called
global _hextobyte
if _hextobyte is None:
_hextobyte = dict((a+b, chr(int(a+b,16))) for a in _hexdig for b in _hexdig)
for item in bits[1:]:
try:
append(_hextobyte[item[:2]])
append(item[2:])
except KeyError:
append(b'%')
append(item)
return b''.join(res)
def send_content(self, connection, request_body):
connection.putheader("Content-Type", "text/xml")
#optionally encode the request
if (self.encode_threshold is not None and
self.encode_threshold < len(request_body) and
gzip):
connection.putheader("Content-Encoding", "gzip")
request_body = gzip_encode(request_body)
connection.putheader("Content-Length", str(len(request_body)))
connection.endheaders(request_body)
##
# Parse response.
#
# @param file Stream.
# @return Response tuple and target method.
def serve_amfrpc(self, version=0):
try:
import pyamf
import pyamf.remoting.gateway
except:
return "pyamf not installed or not in Python sys.path"
request = current.request
response = current.response
if version == 3:
services = self.amfrpc3_procedures
base_gateway = pyamf.remoting.gateway.BaseGateway(services)
pyamf_request = pyamf.remoting.decode(request.body)
else:
services = self.amfrpc_procedures
base_gateway = pyamf.remoting.gateway.BaseGateway(services)
context = pyamf.get_context(pyamf.AMF0)
pyamf_request = pyamf.remoting.decode(request.body, context)
pyamf_response = pyamf.remoting.Envelope(pyamf_request.amfVersion)
for name, message in pyamf_request:
pyamf_response[name] = base_gateway.getProcessor(message)(message)
response.headers['Content-Type'] = pyamf.remoting.CONTENT_TYPE
if version == 3:
return pyamf.remoting.encode(pyamf_response).getvalue()
else:
return pyamf.remoting.encode(pyamf_response, context).getvalue()
def send_content(self, connection, request_body):
connection.putheader("Content-Type", "text/xml")
#optionally encode the request
if (self.encode_threshold is not None and
self.encode_threshold < len(request_body) and
gzip):
connection.putheader("Content-Encoding", "gzip")
request_body = gzip_encode(request_body)
connection.putheader("Content-Length", str(len(request_body)))
connection.endheaders(request_body)
##
# Parse response.
#
# @param file Stream.
# @return Response tuple and target method.
def send_content(self, connection, request_body):
connection.putheader("Content-Type", "text/xml")
#optionally encode the request
if (self.encode_threshold is not None and
self.encode_threshold < len(request_body) and
gzip):
connection.putheader("Content-Encoding", "gzip")
request_body = gzip_encode(request_body)
connection.putheader("Content-Length", str(len(request_body)))
connection.endheaders(request_body)
##
# Parse response.
#
# @param file Stream.
# @return Response tuple and target method.
def send_content(self, connection, request_body):
connection.putheader("Content-Type", "text/xml")
#optionally encode the request
if (self.encode_threshold is not None and
self.encode_threshold < len(request_body) and
gzip):
connection.putheader("Content-Encoding", "gzip")
request_body = gzip_encode(request_body)
connection.putheader("Content-Length", str(len(request_body)))
connection.endheaders(request_body)
##
# Parse response.
#
# @param file Stream.
# @return Response tuple and target method.
def send_content(self, connection, request_body):
connection.putheader("Content-Type", "text/xml")
#optionally encode the request
if (self.encode_threshold is not None and
self.encode_threshold < len(request_body) and
gzip):
connection.putheader("Content-Encoding", "gzip")
request_body = gzip_encode(request_body)
connection.putheader("Content-Length", str(len(request_body)))
connection.endheaders(request_body)
##
# Parse response.
#
# @param file Stream.
# @return Response tuple and target method.
def deunicodise(string, encoding = None, errors = "replace"):
"""
Convert unicode 'string' to <type str>, by default replacing
all invalid characters with '?' or raise an exception.
"""
if not encoding:
encoding = Config.Config().encoding
if type(string) != unicode:
return str(string)
debug("DeUnicodising %r using %s" % (string, encoding))
try:
return string.encode(encoding, errors)
except UnicodeEncodeError:
raise UnicodeEncodeError("Conversion from unicode failed: %r" % string)
def _stringify(string):
# convert to 7-bit ascii if possible
try:
return string.encode("ascii")
except UnicodeError:
return string
def encode(self, out):
out.write("<value><boolean>%d</boolean></value>\n" % self.value)
def encode(self, out):
out.write("<value><dateTime.iso8601>")
out.write(self.value)
out.write("</dateTime.iso8601></value>\n")
def encode(self, out):
out.write("<value><base64>\n")
base64.encode(StringIO.StringIO(self.data), out)
out.write("</base64></value>\n")
def dump_unicode(self, value, write, escape=escape):
value = value.encode(self.encoding)
write("<value><string>")
write(escape(value))
write("</string></value>\n")
def dump_instance(self, value, write):
# check for special wrappers
if value.__class__ in WRAPPERS:
self.write = write
value.encode(self)
del self.write
else:
# store instance attributes as a struct (really?)
self.dump_struct(value.__dict__, write)
def _stringify(string):
# convert to 7-bit ascii if possible
try:
return string.encode("ascii")
except UnicodeError:
return string
def encode(self, out):
out.write("<value><boolean>%d</boolean></value>\n" % self.value)
def encode(self, out):
out.write("<value><dateTime.iso8601>")
out.write(self.value)
out.write("</dateTime.iso8601></value>\n")
def encode(self, out):
out.write("<value><base64>\n")
base64.encode(StringIO.StringIO(self.data), out)
out.write("</base64></value>\n")
def dump_unicode(self, value, write, escape=escape):
value = value.encode(self.encoding)
write("<value><string>")
write(escape(value))
write("</string></value>\n")
def dump_instance(self, value, write):
# check for special wrappers
if value.__class__ in WRAPPERS:
self.write = write
value.encode(self)
del self.write
else:
# store instance attributes as a struct (really?)
self.dump_struct(value.__dict__, write)
def __init__(self, uri, transport=None, encoding=None, verbose=0,
allow_none=0, use_datetime=0, context=None):
# establish a "logical" server connection
if isinstance(uri, unicode):
uri = uri.encode('ISO-8859-1')
# get the url
import urllib
type, uri = urllib.splittype(uri)
if type not in ("http", "https"):
raise IOError, "unsupported XML-RPC protocol"
self.__host, self.__handler = urllib.splithost(uri)
if not self.__handler:
self.__handler = "/RPC2"
if transport is None:
if type == "https":
transport = SafeTransport(use_datetime=use_datetime, context=context)
else:
transport = Transport(use_datetime=use_datetime)
self.__transport = transport
self.__encoding = encoding
self.__verbose = verbose
self.__allow_none = allow_none
def wx_signature(data):
string = '&'.join(['%s=%s' % (key.lower(), data[key]) for key in sorted(data) if data[key] is not None and data[key] is not ''])
return hashlib.sha1(string.encode('utf-8')).hexdigest()
def wx_sign_for_pay(params):
content = '&'.join(['%s=%s' % (key, params[key]) for key in sorted(params) if params[key] is not None and params[key] is not ''])
content += '&key=' + settings.WEIXIN_KEY
return hashlib.md5(content.encode('utf-8')).hexdigest().upper()
def _stringify(string):
# convert to 7-bit ascii if possible
try:
return string.encode("ascii")
except UnicodeError:
return string
def encode(self, out):
out.write("<value><boolean>%d</boolean></value>\n" % self.value)
def __init__(
self,
payload,
filename=None,
content_id=None,
content_type=None,
encoding='utf-8'):
if isinstance(payload, str):
if filename is None:
filename = os.path.basename(payload)
payload = read_file(payload, 'rb')
else:
if filename is None:
raise Exception('Missing attachment name')
payload = payload.read()
filename = filename.encode(encoding)
if content_type is None:
content_type = contenttype(filename)
self.my_filename = filename
self.my_payload = payload
MIMEBase.MIMEBase.__init__(self, *content_type.split('/', 1))
self.set_payload(payload)
self['Content-Disposition'] = 'attachment; filename="%s"' % filename
if content_id is not None:
self['Content-Id'] = '<%s>' % content_id.encode(encoding)
Encoders.encode_base64(self)
def jwt_b64e(string):
if isinstance(string, unicode):
string = string.encode('utf-8', 'strict')
return base64.urlsafe_b64encode(string).strip(b'=')
def jwt_b64d(string):
"""base64 decodes a single bytestring (and is tolerant to getting
called with a unicode string).
The result is also a bytestring.
"""
if isinstance(string, unicode):
string = string.encode('ascii', 'ignore')
return base64.urlsafe_b64decode(string + '=' * (-len(string) % 4))
def generate_token(self, payload):
secret = self.secret_key
if self.salt:
if callable(self.salt):
secret = "%s$%s" % (secret, self.salt(payload))
else:
secret = "%s$%s" % (secret, self.salt)
if isinstance(secret, unicode):
secret = secret.encode('ascii', 'ignore')
b64h = self.cached_b64h
b64p = self.jwt_b64e(serializers.json(payload))
jbody = b64h + '.' + b64p
mauth = hmac.new(key=secret, msg=jbody, digestmod=self.digestmod)
jsign = self.jwt_b64e(mauth.digest())
return jbody + '.' + jsign
def load_token(self, token):
if isinstance(token, unicode):
token = token.encode('utf-8', 'strict')
body, sig = token.rsplit('.', 1)
b64h, b64b = body.split('.', 1)
if b64h != self.cached_b64h:
# header not the same
raise HTTP(400, u'Invalid JWT Header')
secret = self.secret_key
tokend = serializers.loads_json(self.jwt_b64d(b64b))
if self.salt:
if callable(self.salt):
secret = "%s$%s" % (secret, self.salt(tokend))
else:
secret = "%s$%s" % (secret, self.salt)
if isinstance(secret, unicode):
secret = secret.encode('ascii', 'ignore')
if not self.verify_signature(body, sig, secret):
# signature verification failed
raise HTTP(400, u'Token signature is invalid')
if self.verify_expiration:
now = time.mktime(datetime.datetime.utcnow().timetuple())
if tokend['exp'] + self.leeway < now:
raise HTTP(400, u'Token is expired')
if callable(self.before_authorization):
self.before_authorization(tokend)
return tokend
def getHash(self, string):
'''return a unique and stable hash value from hashing the website url'''
m = hashlib.sha1(string.encode('utf-8'))
return m.hexdigest()
def getHash(url):
'''return a unique and stable hash value from hashing the website url'''
m = hashlib.sha1(url.encode('utf-8'))
return m.hexdigest()
def sign(self):
string = '&'.join(['%s=%s' % (key.lower(), self.ret[key]) for key in
sorted(self.ret)])
self.ret['signature'] = hashlib.sha1(string.encode('utf-8')).hexdigest()
return self.ret
def __init__(
self,
payload,
filename=None,
content_id=None,
content_type=None,
encoding='utf-8'):
if isinstance(payload, str):
if filename is None:
filename = os.path.basename(payload)
payload = read_file(payload, 'rb')
else:
if filename is None:
raise Exception('Missing attachment name')
payload = payload.read()
filename = filename.encode(encoding)
if content_type is None:
content_type = contenttype(filename)
self.my_filename = filename
self.my_payload = payload
MIMEBase.MIMEBase.__init__(self, *content_type.split('/', 1))
self.set_payload(payload)
self['Content-Disposition'] = 'attachment; filename="%s"' % filename
if content_id is not None:
self['Content-Id'] = '<%s>' % content_id.encode(encoding)
Encoders.encode_base64(self)
def jwt_b64e(string):
if isinstance(string, unicode):
string = string.encode('utf-8', 'strict')
return base64.urlsafe_b64encode(string).strip(b'=')
def jwt_b64d(string):
"""base64 decodes a single bytestring (and is tolerant to getting
called with a unicode string).
The result is also a bytestring.
"""
if isinstance(string, unicode):
string = string.encode('ascii', 'ignore')
return base64.urlsafe_b64decode(string + '=' * (-len(string) % 4))
def generate_token(self, payload):
secret = self.secret_key
if self.salt:
if callable(self.salt):
secret = "%s$%s" % (secret, self.salt(payload))
else:
secret = "%s$%s" % (secret, self.salt)
if isinstance(secret, unicode):
secret = secret.encode('ascii', 'ignore')
b64h = self.cached_b64h
b64p = self.jwt_b64e(serializers.json(payload))
jbody = b64h + '.' + b64p
mauth = hmac.new(key=secret, msg=jbody, digestmod=self.digestmod)
jsign = self.jwt_b64e(mauth.digest())
return jbody + '.' + jsign
def load_token(self, token):
if isinstance(token, unicode):
token = token.encode('utf-8', 'strict')
body, sig = token.rsplit('.', 1)
b64h, b64b = body.split('.', 1)
if b64h != self.cached_b64h:
# header not the same
raise HTTP(400, u'Invalid JWT Header')
secret = self.secret_key
tokend = serializers.loads_json(self.jwt_b64d(b64b))
if self.salt:
if callable(self.salt):
secret = "%s$%s" % (secret, self.salt(tokend))
else:
secret = "%s$%s" % (secret, self.salt)
if isinstance(secret, unicode):
secret = secret.encode('ascii', 'ignore')
if not self.verify_signature(body, sig, secret):
# signature verification failed
raise HTTP(400, u'Token signature is invalid')
if self.verify_expiration:
now = time.mktime(datetime.datetime.utcnow().timetuple())
if tokend['exp'] + self.leeway < now:
raise HTTP(400, u'Token is expired')
if callable(self.before_authorization):
self.before_authorization(tokend)
return tokend
def _stringify(string):
# convert to 7-bit ascii if possible
try:
return string.encode("ascii")
except UnicodeError:
return string