Python hashlib 模块,new() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用hashlib.new()。
def check_against_chunks(self, chunks):
"""Check good hashes against ones built from iterable of chunks of
data.
Raise HashMismatch if none match.
"""
gots = {}
for hash_name in iterkeys(self._allowed):
try:
gots[hash_name] = hashlib.new(hash_name)
except (ValueError, TypeError):
raise InstallationError('UnkNown hash name: %s' % hash_name)
for chunk in chunks:
for hash in itervalues(gots):
hash.update(chunk)
for hash_name, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def check_against_chunks(self, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def jumpahead(self, n):
"""Change the internal state to one that is likely far away
from the current state. This method will not be in Py3.x,
so it is better to simply reseed.
"""
# The super.jumpahead() method uses shuffling to change state,
# so it needs a large and "interesting" n to work with. Here,
# we use hashing to create a large n for the shuffle.
s = repr(n) + repr(self.getstate())
n = int(_hashlib.new('sha512', s).hexdigest(), 16)
super(Random, self).jumpahead(n)
## ---- Methods below this point do not need to be overridden when
## ---- subclassing for the purpose of using a different core generator.
## -------------------- pickle support -------------------
def __process(l, inFile, stateDir):
if l.startswith("="):
return bytes.fromhex(l[1:])
elif l.startswith("<"):
with open(l[1:], "rb") as f:
return f.read()
elif l.startswith("{"):
import hashlib
return __processBlock(hashlib.new(l[1:]), stateDir)
elif l.startswith("#"):
import os.path
if stateDir:
stateFile = os.path.join(stateDir, l[1:].replace(os.sep, "_"))
else:
stateFile = None
return hashPath(l[1:], stateFile)
elif l.startswith("g"):
from .scm.git import GitScm
return bytes.fromhex(GitScm.processLiveBuildIdSpec(l[1:]))
else:
print("Malformed spec:", l, file=sys.stderr)
sys.exit(1)
def check_against_chunks(self, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def check_against_chunks(self, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def check_against_chunks(self, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def check_against_chunks(self, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def check_against_chunks(self, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def hashex(method, # type: HashMethod
key, # type: KeyType
**options # type: typing.Any
):
# type: (...) -> int
if isinstance(key, six.text_type):
key = key.encode('utf-8')
if method.name.lower() in hashlib.algorithms_guaranteed:
return int(hashlib.new(method.name.lower(), key).hexdigest(), 16)
elif method == HashMethod.MMH3_32:
return hash_murmur3(key, size=32, **options)
elif method == HashMethod.MMH3_64:
return hash_murmur3(key, size=64, **options)
elif method == HashMethod.MMH3_128:
return hash_murmur3(key, size=128, **options)
elif method == HashMethod.SIPHASH:
return hash_siphash(key, **options)
def encrypt(self, passwd=None, length=32):
"""
encrypt gen password
???????????
"""
if not passwd:
passwd = self.gen_rand_pass()
cryptor = AES.new(self.key, self.mode, b'8122ca7d906ad5e1')
try:
count = len(passwd)
except TypeError:
raise ServerError('Encrypt password error,TYpe error.')
add = (length - (count % length))
passwd += ('\0' * add)
cipher_text = cryptor.encrypt(passwd)
return b2a_hex(cipher_text)
def check_against_chunks(self, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def file_digest(self, file_path, digest_type=None):
'''Creates a digest based on the digest_type argument.
digest_type defaults to SHA256.'''
valid_digests = ['md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512']
block_size = 65536
if not digest_type:
digest_type = 'sha256'
if digest_type in valid_digests:
h = hashlib.new(digest_type)
with open(file_path, 'rb') as f:
for block in iter(lambda: f.read(block_size), b''):
h.update(block)
return h.hexdigest()
else:
raise Exception('%s not a valid digest - choose from %s' %
(digest_type, valid_digests))
def check_against_chunks(self, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def check_against_chunks(self, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def jumpahead(self, self).jumpahead(n)
## ---- Methods below this point do not need to be overridden when
## ---- subclassing for the purpose of using a different core generator.
## -------------------- pickle support -------------------
def setUp(self):
super(MD4_Builtin_Test, self).setUp()
if has_native_md4():
# Temporarily make lookup_hash() use builtin pure-python implementation,
# by monkeypatching hashlib.new() to ensure we fall back to passlib's md4 class.
orig = hashlib.new
def wrapper(name, *args):
if name == "md4":
raise ValueError("md4 disabled for testing")
return orig(name, *args)
self.patchAttr(hashlib, "new", wrapper)
# flush cache before & after test,since we're mucking with it.
lookup_hash.clear_cache()
self.addCleanup(lookup_hash.clear_cache)
# make sure we're using right constructor.
self.assertEqual(self.get_md4_const().__module__, "passlib.crypto._md4")
#=============================================================================
# eof
#=============================================================================
def check_against_chunks(self, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def SignECDSA(self,m):
#Sign a message. The private key is self.d .
h=hashlib.new("SHA256")
h.update(m)
z=int(h.hexdigest(),16)
r=0
s=0
while not r or not s:
#k=random.randint(1,self.n-1)
k=random.SystemRandom().randint(1,self.n-1) # Better random fix
R=self*k
R.normalize()
r=R.x[0]%self.n
s=(InvMod(k,self.n)*(z+r*self.d))%self.n
return (r,s)
def check_against_chunks(self, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def leaves(self, value):
if value is None:
raise ValueError('Leaves should be a list.')
elif not isinstance(value, list) and \
not isinstance(value, types.GeneratorType):
raise ValueError('Leaves should be a list or a generator (%s).' % type(value))
if self.prehashed:
# it will create a copy of list or
# it will create a new list based on the generator
self._leaves = list(value)
else:
self._leaves = [ShardManager.hash(leaf) for leaf in value]
if not len(self._leaves) > 0:
raise ValueError('Leaves must contain at least one entry.')
for leaf in self._leaves:
if not isinstance(leaf, six.string_types):
raise ValueError('Leaves should only contain strings.')
def ComputeSessionKeyStrongKey(sharedSecret, clientChallenge, serverChallenge, sharedSecretHash = None):
# added the ability to receive hashes already
if sharedSecretHash is None:
M4SS = ntlm.NTOWFv1(sharedSecret)
else:
M4SS = sharedSecretHash
md5 = hashlib.new('md5')
md5.update('\x00'*4)
md5.update(clientChallenge)
md5.update(serverChallenge)
finalMD5 = md5.digest()
hm = hmac.new(M4SS)
hm.update(finalMD5)
return hm.digest()
def DecryptAttributeValue(dce, attribute):
sessionKey = dce.get_session_key()
# Is it a Kerberos Session Key?
if isinstance(sessionKey, crypto.Key):
# Extract its contents and move on
sessionKey = sessionKey.contents
encryptedPayload = ENCRYPTED_PAYLOAD(attribute)
md5 = hashlib.new('md5')
md5.update(sessionKey)
md5.update(encryptedPayload['Salt'])
finalMD5 = md5.digest()
cipher = ARC4.new(finalMD5)
plainText = cipher.decrypt(attribute[16:])
#chkSum = (binascii.crc32(plainText[4:])) & 0xffffffff
#if unpack('<L',plainText[:4])[0] != chkSum:
# print "RECEIVED 0x%x" % unpack('<L',plainText[:4])[0]
# print "CALculaTED 0x%x" % chkSum
return plainText[4:]
# 5.16.4 ATTRTYP-to-OID Conversion
def __decryptSecret(self, key, value):
# [MS-LSAD] Section 5.1.2
plainText = ''
encryptedSecretSize = unpack('<I', value[:4])[0]
value = value[len(value)-encryptedSecretSize:]
key0 = key
for i in range(0, len(value), 8):
cipherText = value[:8]
tmpStrKey = key0[:7]
tmpKey = self.__cryptoCommon.transformKey(tmpStrKey)
Crypt1 = DES.new(tmpKey, DES.MODE_ECB)
plainText += Crypt1.decrypt(cipherText)
key0 = key0[7:]
value = value[8:]
# AdvanceKey
if len(key0) < 7:
key0 = key[len(key0):]
secret = LSA_SECRET_XP(plainText)
return secret['Secret']
def __decryptLSA(self, value):
if self.__vistaStyle is True:
# Todo: There Could be more than one LSA Keys
record = LSA_SECRET(value)
tmpKey = self.__sha256(self.__bootKey, record['EncryptedData'][:32])
plainText = self.__cryptoCommon.decryptAES(tmpKey, record['EncryptedData'][32:])
record = LSA_SECRET_BLOB(plainText)
self.__LSAKey = record['Secret'][52:][:32]
else:
md5 = hashlib.new('md5')
md5.update(self.__bootKey)
for i in range(1000):
md5.update(value[60:76])
tmpKey = md5.digest()
rc4 = ARC4.new(tmpKey)
plainText = rc4.decrypt(value[12:60])
self.__LSAKey = plainText[0x10:0x20]
def computeResponseNTLMv1(flags, serverName, domain, user, password, lmhash='',
nthash='', use_ntlmv2=USE_NTLMv2):
if user == '' and password == '':
# Special case for anonymous authentication
lmResponse = ''
ntResponse = ''
else:
lmhash = LMOWFv1(password, lmhash, nthash)
nthash = NTOWFv1(password, nthash)
if flags & NTLMssp_NEGOTIATE_LM_KEY:
ntResponse = ''
lmResponse = get_ntlmv1_response(lmhash, serverChallenge)
elif flags & NTLMssp_NEGOTIATE_EXTENDED_SESSIONSecurity:
md5 = hashlib.new('md5')
chall = (serverChallenge + clientChallenge)
md5.update(chall)
ntResponse = ntlmssp_DES_encrypt(nthash, md5.digest()[:8])
lmResponse = clientChallenge + '\x00'*16
else:
ntResponse = get_ntlmv1_response(nthash,serverChallenge)
lmResponse = get_ntlmv1_response(lmhash, serverChallenge)
sessionBaseKey = generateSessionkeyv1(password, nthash)
return ntResponse, lmResponse, sessionBaseKey
def SEALKEY(flags, randomSessionKey, mode = 'Client'):
if flags & NTLMssp_NEGOTIATE_EXTENDED_SESSIONSecurity:
if flags & NTLMssp_NEGOTIATE_128:
sealKey = randomSessionKey
elif flags & NTLMssp_NEGOTIATE_56:
sealKey = randomSessionKey[:7]
else:
sealKey = randomSessionKey[:5]
if mode == 'Client':
md5 = hashlib.new('md5')
md5.update(sealKey + 'session key to client-to-server sealing key magic constant\x00')
sealKey = md5.digest()
else:
md5 = hashlib.new('md5')
md5.update(sealKey + 'session key to server-to-client sealing key magic constant\x00')
sealKey = md5.digest()
elif flags & NTLMssp_NEGOTIATE_56:
sealKey = randomSessionKey[:7] + '\xa0'
else:
sealKey = randomSessionKey[:5] + '\xe5\x38\xb0'
return sealKey
def check_against_chunks(self, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def uuid_from_file(fn, block_size=1 << 20):
"""
Returns an arbitrary sized unique ASCII string based on the file contents.
(exact hashing method may change).
"""
with open(fn, 'rb') as f:
# first get the size
import os
f.seek(0, os.SEEK_END)
size = f.tell()
f.seek(0, os.SEEK_SET)
del os
# done!
import hashlib
sha1 = hashlib.new('sha512')
while True:
data = f.read(block_size)
if not data:
break
sha1.update(data)
# skip the '0x'
return hex(size)[2:] + sha1.hexdigest()
def _uuid_from_file(fn, block_size=1 << 20):
with open(fn, 'rb') as f:
# first get the size
f.seek(0, os.SEEK_SET)
# done!
import hashlib
sha1 = hashlib.new('sha512')
while True:
data = f.read(block_size)
if not data:
break
sha1.update(data)
return (hex(size)[2:] + sha1.hexdigest()).encode()
def uuid_from_file(fn, os.SEEK_SET)
del os
# done!
import hashlib
sha1 = hashlib.new('sha512')
while True:
data = f.read(block_size)
if not data:
break
sha1.update(data)
# skip the '0x'
return hex(size)[2:] + sha1.hexdigest()
def check_against_chunks(self, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def check_against_chunks(self, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def check_against_chunks(self, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def make_hash(self):
''' hash the Metadata in a sane way '''
h = hashlib.new('ripemd160')
pre_h = str()
post_h = None
# nodhcp -> dhcp withname makes different hashes
# {u'tenant': u'FLOORPLATE',u'mac': u'ac:87:a3:2b:7f:12',u'segment': u'prod',u'name': None,u'ip-address': u'10.179.0.100'}}^
# {u'tenant': u'FLOORPLATE',u'name': u'demo-laptop',u'ip-address': u'10.179.0.100'}}
# ^^^ make different hashes if name is included
# for word in ['tenant','mac','segment','name','ip-address']:
for word in ['tenant', 'mac', 'segment', 'ip-address']:
pre_h = pre_h + str(self.endpoint_data.get(str(word), 'missing'))
h.update(pre_h.encode('utf-8'))
post_h = h.hexdigest()
return post_h
def check_against_chunks(self, got in iteritems(gots):
if got.hexdigest() in self._allowed[hash_name]:
return
self._raise(gots)
def rehash(path, algo='sha256', blocksize=1 << 20):
"""Return (hash,length) for path using hashlib.new(algo)"""
h = hashlib.new(algo)
length = 0
with open(path, 'rb') as f:
for block in read_chunks(f, size=blocksize):
length += len(block)
h.update(block)
digest = 'sha256=' + urlsafe_b64encode(
h.digest()
).decode('latin1').rstrip('=')
return (digest, length)
def _hash_of_file(path, algorithm):
"""Return the hash digest of a file."""
with open(path, 'rb') as archive:
hash = hashlib.new(algorithm)
for chunk in read_chunks(archive):
hash.update(chunk)
return hash.hexdigest()
def __init__(self, fd, hashtype='sha256'):
self.fd = fd
self.hashtype = hashtype
self.hash = hashlib.new(hashtype)
self.length = 0
def __init__(self, hash_name, expected):
self.hash_name = hash_name
self.hash = hashlib.new(hash_name)
self.expected = expected
def new(key, msg = None, digestmod = None):
"""Create a new hashing object and return it.
key: The starting key for the hash.
msg: if available,will immediately be hashed into the object's starting
state.
You can Now Feed arbitrary strings into the object using its update()
method,and can ask for the hash value at any time by calling its digest()
method.
"""
return HMAC(key, msg, digestmod)
def __init__(self, url, chunksize, hash_algo=None, verify=True,
certs=None):
if hash_algo is not None:
self.hasher = hashlib.new(hash_algo)
else:
self.hasher = None
self.chunksize = chunksize
resp = requests.get(url, stream=True, verify=verify, certs=certs)
if resp.status_code != 200:
raise Exception('Invalid response code: %s' % resp.status_code)
self._request = resp
def _hash_of_file(path, 'rb') as archive:
hash = hashlib.new(algorithm)
for chunk in read_chunks(archive):
hash.update(chunk)
return hash.hexdigest()
def __init__(self, path, mode, hashtype='sha256'):
self.fd = open(path, mode)
self.hashtype = hashtype
self.hash = hashlib.new(hashtype)
self.length = 0
def __init__(self, expected):
self.hash_name = hash_name
self.hash = hashlib.new(hash_name)
self.expected = expected
def _randbelow(self, n, _log=_log, int=int, _maxwidth=1L<<BPF,
_Method=_MethodType, _BuiltinMethod=_BuiltinMethodType):
"""Return a random int in the range [0,n)
Handles the case where n has more bits than returned
by a single call to the underlying generator.
"""
try:
getrandbits = self.getrandbits
except AttributeError:
pass
else:
# Only call self.getrandbits if the original random() builtin method
# has not been overridden or if a new getrandbits() was supplied.
# This assures that the two methods correspond.
if type(self.random) is _BuiltinMethod or type(getrandbits) is _Method:
k = int(1.00001 + _log(n-1, 2.0)) # 2**k > n-1 > 2**(k-2)
r = getrandbits(k)
while r >= n:
r = getrandbits(k)
return r
if n >= _maxwidth:
_warn("Underlying random() generator does not supply \n"
"enough bits to choose from a population range this large")
return int(self.random() * n)
## -------------------- sequence methods -------------------
def evpKDF(passwd, salt, key_size=8, iv_size=4, iterations=1, hash_algorithm="md5"):
target_key_size = key_size + iv_size
derived_bytes = ""
number_of_derived_words = 0
block = None
hasher = hashlib.new(hash_algorithm)
while number_of_derived_words < target_key_size:
if block is not None:
hasher.update(block)
hasher.update(passwd)
hasher.update(salt)
block = hasher.digest()
hasher = hashlib.new(hash_algorithm)
for _i in range(1, iterations):
hasher.update(block)
block = hasher.digest()
hasher = hashlib.new(hash_algorithm)
derived_bytes += block[0: min(len(block), (target_key_size - number_of_derived_words) * 4)]
number_of_derived_words += len(block) / 4
return {
"key": derived_bytes[0: key_size * 4],
"iv": derived_bytes[key_size * 4:]
}
def _initial_hasher(self, stream, num_chunks, buffer):
"""Fill initial hash values"""
for chunkid in range(num_chunks):
data = stream.read(self._chunk_len)
hasher = hashlib.new(self._hash_func)
hasher.update(data)
buffer[chunkid] = hasher.digest()
def create_address(pubkey):
if is_hex(pubkey):
pubkey = from_hex(pubkey)
h = hashlib.new('ripemd160')
h.update(pubkey)
return h.digest()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。