我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用hmac.HMAC。
def hash_host(hostname, salt=None): """ Return a "hashed" form of the hostname, as used by OpenSSH when storing hashed hostnames in the known_hosts file. :param str hostname: the hostname to hash :param str salt: optional salt to use when hashing (must be 20 bytes long) :return: the hashed hostname as a `str` """ if salt is None: salt = os.urandom(sha1().digest_size) else: if salt.startswith('|1|'): salt = salt.split('|')[2] salt = decodebytes(b(salt)) assert len(salt) == sha1().digest_size hmac = HMAC(salt, b(hostname), sha1).digest() hostkey = '|1|%s|%s' % (u(encodebytes(salt)), u(encodebytes(hmac))) return hostkey.replace('\n', '')
def _hi(data, salt, iterations): """A simple implementation of PBKDF2.""" mac = hmac.HMAC(data, None, sha1) def _digest(msg, mac=mac): """Get a digest for msg.""" _mac = mac.copy() _mac.update(msg) return _mac.digest() from_bytes = _from_bytes to_bytes = _to_bytes _u1 = _digest(salt + b'\x00\x00\x00\x01') _ui = from_bytes(_u1, 'big') for _ in range(iterations - 1): _u1 = _digest(_u1) _ui ^= from_bytes(_u1, 'big') return to_bytes(_ui, 20, 'big')
def _authenticate_cram_md5(credentials, sock_info): """Authenticate using CRAM-MD5 (RFC 2195) """ source = credentials.source username = credentials.username password = credentials.password # The password used as the mac key is the # same as what we use for MONGODB-CR passwd = _password_digest(username, password) cmd = SON([('saslStart', 1), ('mechanism', 'CRAM-MD5'), ('payload', Binary(b'')), ('autoAuthorize', 1)]) response = sock_info.command(source, cmd) # MD5 as implicit default digest for digestmod is deprecated # in python 3.4 mac = hmac.HMAC(key=passwd.encode('utf-8'), digestmod=md5) mac.update(response['payload']) challenge = username.encode('utf-8') + b' ' + b(mac.hexdigest()) cmd = SON([('saslContinue', 1), ('conversationId', response['conversationId']), ('payload', Binary(challenge))]) sock_info.command(source, cmd)
def get_disqus_sso_payload(user): """Return remote_auth_s3 and api_key for user.""" DISQUS_PUBLIC_KEY = current_app.config.get('DISQUS_PUBLIC_KEY') DISQUS_SECRET_KEY = current_app.config.get('DISQUS_SECRET_KEY') if DISQUS_PUBLIC_KEY and DISQUS_SECRET_KEY: if user: data = simplejson.dumps({ 'id': user.id, 'username': user.name, 'email': user.email_addr, }) else: data = simplejson.dumps({}) # encode the data to base64 message = base64.b64encode(data) # generate a timestamp for signing the message timestamp = int(time.time()) # generate our hmac signature sig = hmac.HMAC(DISQUS_SECRET_KEY, '%s %s' % (message, timestamp), hashlib.sha1).hexdigest() return message, timestamp, sig, DISQUS_PUBLIC_KEY else: return None, None, None, None
def _authenticate_cram_md5(credentials, sock_info, cmd_func): """Authenticate using CRAM-MD5 (RFC 2195) """ source, username, password = credentials # The password used as the mac key is the # same as what we use for MONGODB-CR passwd = _password_digest(username, password) cmd = SON([('saslStart', 1), ('mechanism', 'CRAM-MD5'), ('payload', Binary(b(''))), ('autoAuthorize', 1)]) response, _ = cmd_func(sock_info, source, cmd) # MD5 as implicit default digest for digestmod is deprecated # in python 3.4 mac = hmac.HMAC(key=passwd.encode('utf-8'), digestmod=_DMOD) mac.update(response['payload']) challenge = username.encode('utf-8') + b(' ') + b(mac.hexdigest()) cmd = SON([('saslContinue', 1), ('conversationId', response['conversationId']), ('payload', Binary(challenge))]) cmd_func(sock_info, source, cmd)
def testValidLogin(self): p = pop3.POP3() p.factory = TestServerFactory() p.factory.challengers = {'CRAM-MD5': cred.credentials.CramMD5Credentials} p.portal = cred.portal.Portal(TestRealm()) ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse() ch.addUser('testuser', 'testpassword') p.portal.registerChecker(ch) s = StringIO.StringIO() p.transport = internet.protocol.FileWrapper(s) p.connectionMade() p.lineReceived("CAPA") self.failUnless(s.getvalue().find("SASL CRAM-MD5") >= 0) p.lineReceived("AUTH CRAM-MD5") chal = s.getvalue().splitlines()[-1][2:] chal = base64.decodestring(chal) response = hmac.HMAC('testpassword', chal).hexdigest() p.lineReceived(base64.encodestring('testuser ' + response).rstrip('\n')) self.failUnless(p.mbox) self.failUnless(s.getvalue().splitlines()[-1].find("+OK") >= 0) p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
def test_legacy_block_size_warnings(self): class MockCrazyHash(object): """Ain't no block_size attribute here.""" def __init__(self, *args): self._x = hashlib.sha1(*args) self.digest_size = self._x.digest_size def update(self, v): self._x.update(v) def digest(self): return self._x.digest() with warnings.catch_warnings(): warnings.simplefilter('error', RuntimeWarning) with self.assertRaises(RuntimeWarning): hmac.HMAC(b'a', b'b', digestmod=MockCrazyHash) self.fail('Expected warning about missing block_size') MockCrazyHash.block_size = 1 with self.assertRaises(RuntimeWarning): hmac.HMAC(b'a', b'b', digestmod=MockCrazyHash) self.fail('Expected warning about small block_size')
def _buildIoTHubSasToken(self, deviceId, usage): if usage==self.USAGE_CREATE_DEVICE: keyValue=self.initialKeyValue elif usage==self.USAGE_DEVICE_SENDS_MESSAGE: keyValue=self.currentDeviceKey else: keyVale resourceUri = '%s/devices/%s' % (self.iotHost, deviceId) targetUri = resourceUri.lower() expiryTime = self._buildExpiryOn() toSign = '%s\n%s' % (targetUri, expiryTime) key = base64.b64decode(keyValue.encode('utf-8')) signature = urllib.request.pathname2url( base64.b64encode( hmac.HMAC(key, toSign.encode('utf-8'), hashlib.sha256).digest() ) ).replace('/', '%2F') if usage==self.USAGE_CREATE_DEVICE: return self.TOKEN_FORMAT_WITH_POLICY % (signature, expiryTime, self.initialKeyName, targetUri) elif usage==self.USAGE_DEVICE_SENDS_MESSAGE: return self.TOKEN_FORMAT_NO_POLICY % (signature, expiryTime, targetUri) else: return None
def test_legacy_block_size_warnings(self): class MockCrazyHash(object): """Ain't no block_size attribute here.""" def __init__(self, *args): self._x = hashlib.sha1(*args) self.digest_size = self._x.digest_size def update(self, v): self._x.update(v) def digest(self): return self._x.digest() with warnings.catch_warnings(): warnings.simplefilter('error', RuntimeWarning) with self.assertRaises(RuntimeWarning): hmac.HMAC('a', 'b', digestmod=MockCrazyHash) self.fail('Expected warning about missing block_size') MockCrazyHash.block_size = 1 with self.assertRaises(RuntimeWarning): hmac.HMAC('a', 'b', digestmod=MockCrazyHash) self.fail('Expected warning about small block_size')
def time_code(self, moment: int=None): """ Returns a string indicating the current valid token which will be generated, and which should be matched to authenticate the user. :param moment: A time value, defaulting to now. :type moment: int :return: A 6-digit authentication token :rtype: str """ if moment is None: moment = time.time() moment = int(moment // 30) time_bytes = struct.pack('>q', moment) hash_digest = hmac.HMAC(self._secret, time_bytes, hashlib.sha1).digest() offset = hash_digest[-1] & 0x0F truncated_digest = hash_digest[offset:offset + 4] code = struct.unpack('>L', truncated_digest)[0] code &= 0x7FFFFFFF code %= 1000000 return '%06d' % code
def valid_tokens(domain, secret, validity=86400): if isinstance(secret, str): secret = secret.encode() if isinstance(domain, str): domain = domain.encode('idna') def token_at(when): h = hmac.HMAC(secret, digestmod=hashlib.sha256) h.update(domain) h.update(str(int(when/validity)).encode()) return h.hexdigest() # We're not totally strict on validity, but want # to avoid the worst case where we provided a token # that immediately expires. Allow up to three half-validity # intervals in the past. now = int(time.time()) validity = int(validity/2) past = now - 3*validity return [token_at(when) for when in range(past, now, validity)]
def _iotHubSasToken(self, uri): """Create the Azure IOT Hub SAS token Args: uri (str): Resource URI Returns: Token string """ expiry = str(int((time.time() + self.TOKEN_VALID_SECS))) key = base64.b64decode(self.keyvalue.encode('utf-8')) sig = '{}\n{}'.format(uri, expiry).encode('utf-8') signature = urllib.quote( base64.b64encode(hmac.HMAC(key, sig, hashlib.sha256).digest()) ).replace('/', '%2F') token = 'SharedAccessSignature sig={}&se={}&skn={}&sr={}'.format( signature, expiry, self.keyname, uri.lower()) return token
def _CRAM_MD5_AUTH(self, challenge): """ Authobject to use with CRAM-MD5 authentication. """ import hmac return self.user + " " + hmac.HMAC(self.password, challenge).hexdigest()
def test_gensignature(): with patch.object(hmac.HMAC, 'digest', return_value='digest_output'): api_key = ID_WITH_VALID_LENGTH date = 'date' content_type = 'content_type' request_method = 'method' request_body = 'body' query_path = 'path' signature = api_utils.gensignature(api_key, date, content_type, request_method, request_body, query_path) assert signature == 'digest_output'
def _hash_internal(method, salt, password): """Internal password hash helper. Supports plaintext without salt, unsalted and salted passwords. In case salted passwords are used hmac is used. """ if method == 'plain': return password, method if isinstance(password, text_type): password = password.encode('utf-8') if method.startswith('pbkdf2:'): args = method[7:].split(':') if len(args) not in (1, 2): raise ValueError('Invalid number of arguments for PBKDF2') method = args.pop(0) iterations = args and int(args[0] or 0) or DEFAULT_PBKDF2_ITERATIONS is_pbkdf2 = True actual_method = 'pbkdf2:%s:%d' % (method, iterations) else: is_pbkdf2 = False actual_method = method hash_func = _hash_funcs.get(method) if hash_func is None: raise TypeError('invalid method %r' % method) if is_pbkdf2: if not salt: raise ValueError('Salt is required for PBKDF2') rv = pbkdf2_hex(password, salt, iterations, hashfunc=hash_func) elif salt: if isinstance(salt, text_type): salt = salt.encode('utf-8') rv = hmac.HMAC(salt, password, hash_func).hexdigest() else: h = hash_func() h.update(password) rv = h.hexdigest() return rv, actual_method
def _CRAM_MD5_AUTH(self, challenge): """Authobject to use with CRAM-MD5 authentication.""" import hmac return self.user + " " + hmac.HMAC(self.password, challenge).hexdigest()
def keyed_md5(secret, challenge): """Create the keyed MD5 string for the given secret and challenge.""" import warnings warnings.warn( "keyed_md5() is deprecated. Use the stdlib module hmac instead.", DeprecationWarning, stacklevel=2 ) return hmac.HMAC(secret, challenge).hexdigest()
def testCheckPassword(self): c = credentials.CramMD5Credentials() chal = c.getChallenge() c.response = hmac.HMAC('secret', chal).hexdigest() self.failUnless(c.checkPassword('secret'))
def checkPassword(self, password): verify = hmac.HMAC(password, self.challenge).hexdigest() return verify == self.response
def challengeResponse(self, secret, chal): response = hmac.HMAC(secret, chal).hexdigest() return '%s %s' % (self.user, response)
def P_hash(hashModule, secret, seed, length): bytes = createByteArrayZeros(length) secret = bytesToString(secret) seed = bytesToString(seed) A = seed index = 0 while 1: A = hmac.HMAC(secret, A, hashModule).digest() output = hmac.HMAC(secret, A+seed, hashModule).digest() for c in output: if index >= length: return bytes bytes[index] = ord(c) index += 1 return bytes
def pbkdf2_bin(data, salt, iterations=DEFAULT_PBKDF2_ITERATIONS, keylen=None, hashfunc=None): """Returns a binary digest for the PBKDF2 hash algorithm of `data` with the given `salt`. It iterates `iterations` time and produces a key of `keylen` bytes. By default SHA-1 is used as hash function, a different hashlib `hashfunc` can be provided. .. versionadded:: 0.9 :param data: the data to derive. :param salt: the salt for the derivation. :param iterations: the number of iterations. :param keylen: the length of the resulting key. If not provided the digest size will be used. :param hashfunc: the hash function to use. This can either be the string name of a known hash function or a function from the hashlib module. Defaults to sha1. """ if isinstance(hashfunc, string_types): hashfunc = _hash_funcs[hashfunc] elif not hashfunc: hashfunc = hashlib.sha1 salt = to_bytes(salt) mac = hmac.HMAC(to_bytes(data), None, hashfunc) if not keylen: keylen = mac.digest_size def _pseudorandom(x, mac=mac): h = mac.copy() h.update(x) return bytearray(h.digest()) buf = bytearray() for block in range_type(1, -(-keylen // mac.digest_size) + 1): rv = u = _pseudorandom(salt + _pack_int(block)) for i in range_type(iterations - 1): u = _pseudorandom(bytes(u)) rv = bytearray(starmap(xor, izip(rv, u))) buf.extend(rv) return bytes(buf[:keylen])
def get_rest_token(self): """ Returns an auth token for making calls to eventhub REST API. """ uri = urllib.parse.quote_plus("https://{}.servicebus.windows.net/{}" \ .format(self.sb_name, self.eh_name)) sas = self.sas_key.encode('utf-8') expiry = str(int(time.time() + 10000)) string_to_sign = ('{}\n{}'.format(uri,expiry)).encode('utf-8') signed_hmac_sha256 = hmac.HMAC(sas, string_to_sign, hashlib.sha256) signature = urllib.parse.quote(base64.b64encode(signed_hmac_sha256.digest())) return 'SharedAccessSignature sr={}&sig={}&se={}&skn={}' \ .format(uri, signature, expiry, self.policy)
def from_master_secret(class_, master_secret, netcode='BTC'): """Generate a Wallet from a master password.""" I64 = hmac.HMAC(key=b"Bitcoin seed", msg=master_secret, digestmod=hashlib.sha512).digest() return class_(netcode=netcode, chain_code=I64[32:], secret_exponent=from_bytes_32(I64[:32]))
def subkey_secret_exponent_chain_code_pair( secret_exponent, chain_code_bytes, i, is_hardened, public_pair=None): """ Yield info for a child node for this node. secret_exponent: base secret exponent chain_code: base chain code i: the index for this node. is_hardened: use "hardened key derivation". The public version of this node cannot calculate this child. public_pair: the public_pair for the given secret exponent. If you leave it None, it's calculated for you (but then it's slower) Returns a pair (new_secret_exponent, new_chain_code) """ i_as_bytes = struct.pack(">L", i) if is_hardened: data = b'\0' + to_bytes_32(secret_exponent) + i_as_bytes else: if public_pair is None: public_pair = ecdsa.public_pair_for_secret_exponent(ecdsa.generator_secp256k1, secret_exponent) sec = public_pair_to_sec(public_pair, compressed=True) data = sec + i_as_bytes I64 = hmac.HMAC(key=chain_code_bytes, msg=data, digestmod=hashlib.sha512).digest() I_left_as_exponent = from_bytes_32(I64[:32]) if I_left_as_exponent >= ORDER: logger.critical(_SUBKEY_VALIDATION_LOG_ERR_FMT) raise DerivationError('I_L >= {}'.format(ORDER)) new_secret_exponent = (I_left_as_exponent + secret_exponent) % ORDER if new_secret_exponent == 0: logger.critical(_SUBKEY_VALIDATION_LOG_ERR_FMT) raise DerivationError('k_{} == 0'.format(i)) new_chain_code = I64[32:] return new_secret_exponent, new_chain_code
def subkey_public_pair_chain_code_pair(public_pair, chain_code_bytes, i): """ Yield info for a child node for this node. public_pair: base public pair chain_code: base chain code i: the index for this node. Returns a pair (new_public_pair, new_chain_code) """ i_as_bytes = struct.pack(">l", i) sec = public_pair_to_sec(public_pair, compressed=True) data = sec + i_as_bytes I64 = hmac.HMAC(key=chain_code_bytes, msg=data, digestmod=hashlib.sha512).digest() I_left_as_exponent = from_bytes_32(I64[:32]) x, y = public_pair the_point = I_left_as_exponent * ecdsa.generator_secp256k1 + \ ecdsa.Point(ecdsa.generator_secp256k1.curve(), x, y, ORDER) if the_point == INFINITY: logger.critical(_SUBKEY_VALIDATION_LOG_ERR_FMT) raise DerivationError('K_{} == {}'.format(i, the_point)) I_left_as_exponent = from_bytes_32(I64[:32]) if I_left_as_exponent >= ORDER: logger.critical(_SUBKEY_VALIDATION_LOG_ERR_FMT) raise DerivationError('I_L >= {}'.format(ORDER)) new_public_pair = the_point.pair() new_chain_code = I64[32:] return new_public_pair, new_chain_code