Python md5 模块,new() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用md5.new()。
def request(self, method, request_uri, headers, content):
"""Modify the request headers"""
keys = _get_end2end_headers(headers)
keylist = "".join(["%s " % k for k in keys])
headers_val = "".join([headers[k] for k in keys])
created = time.strftime('%Y-%m-%dT%H:%M:%sZ',time.gmtime())
cnonce = _cnonce()
request_digest = "%s:%s:%s:%s:%s" % (method, cnonce, self.challenge['snonce'], headers_val)
request_digest = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower()
headers['authorization'] = 'HMACDigest username="%s",realm="%s",snonce="%s",cnonce="%s",uri="%s",created="%s",response="%s",headers="%s"' % (
self.credentials[0],
self.challenge['realm'],
self.challenge['snonce'],
cnonce,
request_uri,
created,
request_digest,
keylist)
def __init__(self, oid=None):
"""Initialize a new ObjectId.
If `oid` is ``None``,create a new (unique) ObjectId. If `oid`
is an instance of (:class:`basestring` (:class:`str` or :class:`bytes`
in python 3),:class:`ObjectId`) validate it and use that. Otherwise,
a :class:`TypeError` is raised. If `oid` is invalid,
:class:`~bson.errors.InvalidId` is raised.
:Parameters:
- `oid` (optional): a valid ObjectId (12 byte binary or 24 character
hex string)
.. versionadded:: 1.2.1
The `oid` parameter can be a ``unicode`` instance (that contains
only hexadecimal digits).
.. mongodoc:: objectids
"""
if oid is None:
self.__generate()
else:
self.__validate(oid)
def __generate(self):
"""Generate a new value for this ObjectId.
"""
oid = EMPTY
# 4 bytes current time
oid += struct.pack(">i", int(time.time()))
# 3 bytes machine
oid += ObjectId._machine_bytes
# 2 bytes pid
oid += struct.pack(">H", os.getpid() % 0xFFFF)
# 3 bytes inc
ObjectId._inc_lock.acquire()
oid += struct.pack(">i", ObjectId._inc)[1:4]
ObjectId._inc = (ObjectId._inc + 1) % 0xFFFFFF
ObjectId._inc_lock.release()
self.__id = oid
def __init__(self,
:class:`~bson.errors.InvalidId` is raised.
:Parameters:
- `oid` (optional): a valid ObjectId (12 byte binary or 24 character
hex string)
.. versionadded:: 1.2.1
The `oid` parameter can be a ``unicode`` instance (that contains
only hexadecimal digits).
.. mongodoc:: objectids
"""
if oid is None:
self.__generate()
else:
self.__validate(oid)
def __generate(self):
"""Generate a new value for this ObjectId.
"""
oid = EMPTY
# 4 bytes current time
oid += struct.pack(">i", ObjectId._inc)[1:4]
ObjectId._inc = (ObjectId._inc + 1) % 0xFFFFFF
ObjectId._inc_lock.release()
self.__id = oid
def __init__(self,
:class:`~bson.errors.InvalidId` is raised.
:Parameters:
- `oid` (optional): a valid ObjectId (12 byte binary or 24 character
hex string)
.. versionadded:: 1.2.1
The `oid` parameter can be a ``unicode`` instance (that contains
only hexadecimal digits).
.. mongodoc:: objectids
"""
if oid is None:
self.__generate()
else:
self.__validate(oid)
def __init__(self,
:class:`~bson.errors.InvalidId` is raised.
:Parameters:
- `oid` (optional): a valid ObjectId (12 byte binary or 24 character
hex string)
.. versionadded:: 1.2.1
The `oid` parameter can be a ``unicode`` instance (that contains
only hexadecimal digits).
.. mongodoc:: objectids
"""
if oid is None:
self.__generate()
else:
self.__validate(oid)
def __generate(self):
"""Generate a new value for this ObjectId.
"""
oid = EMPTY
# 4 bytes current time
oid += struct.pack(">i", ObjectId._inc)[1:4]
ObjectId._inc = (ObjectId._inc + 1) % 0xFFFFFF
ObjectId._inc_lock.release()
self.__id = oid
def verifySignature(self):
key = RSA.importKey(open(self.raven_public_key).read())
# Compile the parts to hash together
parts = self.rav_str.split("!")
parts.pop() # Remove the last two items related to signing
parts.pop()
to_hash = "!".join(parts)
# Now hash it and verify
our_hash = SHA.new(to_hash)
#print our_hash
verifier = PKCS1_v1_5.new(key)
# Obtain the correct form of the signature
signature = urllib.unquote(self.raven_signature)
signature = signature.replace("-","+")
signature = signature.replace(".","/")
signature = signature.replace("_","=")
signature = base64.b64decode(signature)
if verifier.verify(our_hash, signature):
return True
else:
return False
def insertwl():
counter = 0
try:
words = open(wordlist, "r")
except(IOError):
print "Error: check", wordlist
sys.exit(1)
dupes = 0
print "Inserting Wordlist,Skipping Dupes....may take ages"
print "\nStart :", timer()
for word in words.read().split('\n'):
hash = md5.new(word).hexdigest()
counter = counter+1
try:
csr = dbconnect()
csr.execute("INSERT INTO "+dbname+".data (plain,md5)VALUES ('"+str(word)+"','"+str(hash)+"');")
except MysqLdb.Error, e:
dupes = dupes+1
print "\nDupes :", dupes
print "\nDone :", timer()
def compute_md5(prefix, filename, files_done, files_total, bytes_done,
bytes_total, suffix):
m = md5.new()
f = open(filename, "rb")
while 1:
data = f.read(4*1024)
if not data:
break
m.update(data)
bytes_done += len(data)
print_timed_status(("%s: file %d of %d,%s of %s," +
"%d %% done%s") %
(prefix, files_done + 1,
files_total,
format_size(bytes_done),
format_size(bytes_total),
100.0 * float(bytes_done) / bytes_total,
suffix))
f.close()
return m.hexdigest(), bytes_done
# figure out the total size of a set of files.
def __init__(self, seed=None):
self.pool_index = 0
self.digest = None
self.next_byte = 0
self.lock = _threading.Lock()
try:
import hashlib
self.hash = hashlib.sha1()
self.hash_len = 20
except:
try:
import sha
self.hash = sha.new()
self.hash_len = 20
except:
import md5
self.hash = md5.new()
self.hash_len = 16
self.pool = bytearray(b'\0' * self.hash_len)
if seed is not None:
self.stir(bytearray(seed))
self.seeded = True
else:
self.seeded = False
def changePassphrase(options):
if not options['filename']:
filename = os.path.expanduser('~/.ssh/id_rsa')
options['filename'] = raw_input('Enter file in which the key is (%s): ' % filename)
try:
key = keys.getPrivateKeyObject(options['filename'])
except keys.BadKeyError, e:
if e.args[0] != 'encrypted key with no passphrase':
raise
else:
if not options['pass']:
options['pass'] = getpass.getpass('Enter old passphrase: ')
key = keys.getPrivateKeyObject(options['filename'], passphrase = options['pass'])
if not options['newpass']:
while 1:
p1 = getpass.getpass('Enter new passphrase (empty for no passphrase): ')
p2 = getpass.getpass('Enter same passphrase again: ')
if p1 == p2:
break
print 'Passphrases do not match. Try again.'
options['newpass'] = p1
open(options['filename'], 'w').write(
keys.makePrivateKeyString(key, passphrase=options['newpass']))
print 'Your identification has been saved with the new passphrase.'
def _continueGEX_REPLY(self, ignored, pubKey, f, signature):
serverKey = keys.getPublicKeyObject(pubKey)
sharedSecret = _MPpow(f, self.x, self.p)
h = sha.new()
h.update(NS(self.ourVersionString))
h.update(NS(self.otherVersionString))
h.update(NS(self.ourKexInitPayload))
h.update(NS(self.serverKexInitPayload))
h.update(NS(pubKey))
h.update('\x00\x00\x08\x00')
h.update(MP(self.p))
h.update(MP(self.g))
h.update(MP(self.DHpubKey))
h.update(MP(f))
h.update(sharedSecret)
exchangeHash = h.digest()
if not keys.verifySignature(serverKey, signature, exchangeHash):
self.senddisconnect(disCONNECT_KEY_EXCHANGE_Failed, 'bad signature')
return
self._keySetup(sharedSecret, exchangeHash)
def __init__(self, service, root, postmaster=0):
"""Initialize
The first argument is where the Domain directory is rooted.
The second is whether non-existing addresses are simply
forwarded to postmaster instead of outright bounce
The directory structure of a MailddirDirdbmDomain is:
/passwd <-- a dirdbm file
/USER/{cur,new,del} <-- each user has these three directories
"""
AbstractMaildirDomain.__init__(self, root)
dbm = os.path.join(root, 'passwd')
if not os.path.exists(dbm):
os.makedirs(dbm)
self.dbm = dirdbm.open(dbm)
self.postmaster = postmaster
def testinitializer(self):
d = self.d
trash = os.path.join(d, '.Trash')
self.failUnless(os.path.exists(d) and os.path.isdir(d))
self.failUnless(os.path.exists(os.path.join(d, 'new')))
self.failUnless(os.path.exists(os.path.join(d, 'cur')))
self.failUnless(os.path.exists(os.path.join(d, 'tmp')))
self.failUnless(os.path.isdir(os.path.join(d, 'new')))
self.failUnless(os.path.isdir(os.path.join(d, 'cur')))
self.failUnless(os.path.isdir(os.path.join(d, 'tmp')))
self.failUnless(os.path.exists(os.path.join(trash, 'new')))
self.failUnless(os.path.exists(os.path.join(trash, 'cur')))
self.failUnless(os.path.exists(os.path.join(trash, 'tmp')))
self.failUnless(os.path.isdir(os.path.join(trash, 'new')))
self.failUnless(os.path.isdir(os.path.join(trash, 'cur')))
self.failUnless(os.path.isdir(os.path.join(trash, 'tmp')))
def __init__(self, seed=None):
self.pool_index = 0
self.digest = None
self.next_byte = 0
self.lock = _threading.Lock()
try:
import hashlib
self.hash = hashlib.sha1()
self.hash_len = 20
except:
try:
import sha
self.hash = sha.new()
self.hash_len = 20
except:
import md5
self.hash = md5.new()
self.hash_len = 16
self.pool = '\0' * self.hash_len
if not seed is None:
self.stir(seed)
self.seeded = True
else:
self.seeded = False
def __init__(self, seed=None):
self.pool_index = 0
self.digest = None
self.next_byte = 0
self.lock = _threading.Lock()
try:
import hashlib
self.hash = hashlib.sha1()
self.hash_len = 20
except:
try:
import sha
self.hash = sha.new()
self.hash_len = 20
except:
import md5
self.hash = md5.new()
self.hash_len = 16
self.pool = '\0' * self.hash_len
if not seed is None:
self.stir(seed)
self.seeded = True
else:
self.seeded = False
def __init__(self, seed=None):
self.pool_index = 0
self.digest = None
self.next_byte = 0
self.lock = _threading.Lock()
try:
import hashlib
self.hash = hashlib.sha1()
self.hash_len = 20
except:
try:
import sha
self.hash = sha.new()
self.hash_len = 20
except:
import md5
self.hash = md5.new()
self.hash_len = 16
self.pool = '\0' * self.hash_len
if not seed is None:
self.stir(seed)
self.seeded = True
else:
self.seeded = False
def __init__(self, seed=None):
self.pool_index = 0
self.digest = None
self.next_byte = 0
self.lock = _threading.Lock()
try:
import hashlib
self.hash = hashlib.sha1()
self.hash_len = 20
except:
try:
import sha
self.hash = sha.new()
self.hash_len = 20
except:
import md5
self.hash = md5.new()
self.hash_len = 16
self.pool = bytearray(b'\0' * self.hash_len)
if seed is not None:
self.stir(bytearray(seed))
self.seeded = True
else:
self.seeded = False
def load_module(self, code_path):
try:
try:
code_dir = os.path.dirname(code_path)
code_file = os.path.basename(code_path)
fin = open(code_path, 'rb')
return imp.load_source(md5.new(code_path).hexdigest(), code_path, fin)
finally:
try: fin.close()
except: pass
except ImportError, x:
traceback.print_exc(file = sys.stderr)
raise
except:
traceback.print_exc(file = sys.stderr)
raise
def audit(arg):
md5_list = [
'3a1c6cc728dddc258091a601f28a9c12',
'53fef78841c3fae1ee992ae324a51620',
'4c2fc69dc91c885837ce55d03493a5f5',
]
code, head, res, err, _ = curl.curl2(arg)
if code == 200:
md5_value = md5.new(res).hexdigest()
if md5_value in md5_list:
security_warning(arg + '?movieName=%22]%29}catch%28e%29{if%28!window.x%29{window.x=1;alert%28document.cookie%29}}// flash xss')
else:
#debug(arg + ' **_**' + md5_value)
pass
else:
#debug(arg + '**__**not found')
pass
def response(self, response, content):
"""Gives us a chance to update with new nonces
or such returned from the last authorized response.
Over-rise this in sub-classes if necessary.
Return TRUE is the request is to be retried,for
example Digest may return stale=true.
"""
return False
def __init__(self, credentials, host, content, http):
Authentication.__init__(self, http)
challenge = _parse_www_authenticate(response, 'www-authenticate')
self.challenge = challenge['hmacdigest']
# Todo: self.challenge['domain']
self.challenge['reason'] = self.challenge.get('reason', 'unauthorized')
if self.challenge['reason'] not in ['unauthorized', 'integrity']:
self.challenge['reason'] = 'unauthorized'
self.challenge['salt'] = self.challenge.get('salt', '')
if not self.challenge.get('snonce'):
raise UnimplementedHmacDigestAuthOptionError( _("The challenge doesn't contain a server nonce,or this one is empty."))
self.challenge['algorithm'] = self.challenge.get('algorithm', 'HMAC-SHA-1')
if self.challenge['algorithm'] not in ['HMAC-SHA-1', 'HMAC-MD5']:
raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for algorithm: %s." % self.challenge['algorithm']))
self.challenge['pw-algorithm'] = self.challenge.get('pw-algorithm', 'SHA-1')
if self.challenge['pw-algorithm'] not in ['SHA-1', 'MD5']:
raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for pw-algorithm: %s." % self.challenge['pw-algorithm']))
if self.challenge['algorithm'] == 'HMAC-MD5':
self.hashmod = _md5
else:
self.hashmod = _sha
if self.challenge['pw-algorithm'] == 'MD5':
self.pwhashmod = _md5
else:
self.pwhashmod = _sha
self.key = "".join([self.credentials[0], ":",
self.pwhashmod.new("".join([self.credentials[1], self.challenge['salt']])).hexdigest().lower(),
":", self.challenge['realm']])
self.key = self.pwhashmod.new(self.key).hexdigest().lower()
def baidufanyi(strin):
appId = '*???*'
secretKey = '*???*'
fromLang = 'jp'
toLang = 'zh'
salt = random.randint(32768, 65536)
sign = appId+strin+str(salt)+secretKey
m1 = md5.new()
m1.update(sign)
sign = m1.hexdigest()
url = "http://api.fanyi.baidu.com/api/trans/vip/translate?appid=%s&q=%s&from=%s&to=%s&salt=%s&sign=%s"%(appId,urllib.quote(strin),fromLang,toLang,str(salt),sign)
request = urllib2.Request(url=url)
reconnect = 0
while reconnect < 8:
try:
response = urllib2.urlopen(request,timeout = 3)
except:
reconnect = reconnect + 1
print "retry",reconnect
else:
break
jsondata = response.read()
decodejson = json.loads(jsondata)
strout = decodejson['trans_result'][0]['dst']
return strout
def youdaofanyi(strin):
appId = '*???*'
appKey = '*???*'
fromLang = 'ja'
toLang = 'zh-CHS'
salt = random.randint(32768, 65536)
sign = appId+strin+str(salt)+appKey
m1 = md5.new()
m1.update(sign)
sign = m1.hexdigest()
url = 'http://openapi.youdao.com/api?q=%s&from=%s&to=%s&appKey=%s&salt=%s&sign=%s'%(urllib.quote(strin),appId,reconnect
else:
break
jsondata = response.read()
decodejson = json.loads(jsondata)
strout = decodejson["translation"][0]
return strout
def ExistsInCache(cacheLocation, url):
hash = md5.new(url).hexdigest()
return (os.path.exists(cacheLocation + "/" + hash + ".headers") and
os.path.exists(cacheLocation + "/" + hash + ".body"))
def StoreInCache(cacheLocation, url, response):
hash = md5.new(url).hexdigest()
f = open(cacheLocation + "/" + hash + ".headers", "w")
headers = str(response.info())
f.write(headers)
f.close()
f = open(cacheLocation + "/" + hash + ".body", "w")
f.write(response.read())
f.close()
def __init__(self, cacheLocation,url,setCacheHeader=True):
self.cacheLocation = cacheLocation
hash = md5.new(url).hexdigest()
StringIO.StringIO.__init__(self, file(self.cacheLocation + "/" + hash+".body").read())
self.url = url
self.code = 200
self.msg = "OK"
headerbuf = file(self.cacheLocation + "/" + hash+".headers").read()
if setCacheHeader:
headerbuf += "x-cache: %s/%s\r\n" % (self.cacheLocation,hash)
self.headers = httplib.HTTPMessage(StringIO.StringIO(headerbuf))
def verify(self):
"Compute the MD5 checksum for this log to see if the logfiles have been corrupted."
# If there is no self.md5,no checksum exists for this log yet...
if ( not hasattr(self,'md5') ):
print 'WARNING: No MD5 checksum was found for log',self.name
print 'WARNING: Log',self.name,'may be newly created,or it may be corrupt!'
return
# Otherwise,create the MD5 checksum from the log file and Metadata file for verification
import md5
checksum = md5.new()
mfn = open(self.mdf, 'r')
for line in mfn.readlines():
checksum.update(line)
mfn.close()
lfn = open(self.ldf, 'r')
for line in lfn.readlines():
checksum.update(line)
lfn.close()
cs = checksum.hexdigest()
if ( self.debug == 'on' ):
print 'DEBUG: The MD5 digest of the Metadata and log files is',cs
if ( self.md5 == cs ):
if ( self.debug == 'on' ):
print 'DEBUG: The calculated MD5 checksum',cs,'matches the stored MD5 checksum',self.md5
else:
if ( self.debug == 'on' ):
print 'DEBUG: The calculated MD5 checksum','does not match the stored MD5 checksum',self.md5
print 'ERROR: The MD5 checksum for log','is inconsistent!'
print 'ERROR: Log','may be corrupt!'
def passwordToKey(password,bits=64):
assert 64<=bits
assert bits%4==0
length=bits//4
pswd=md5.new(password).hexdigest()
p,q=passwordToPrimePair(pswd,bits)
n=p*q
append="0"*(length-len(pswd))
possible=int(pswd+append,16)|1 # n is always even
# so possible must be odd
while not gcd(possible,n):
possible+=2 # keep it odd
private=possible
public=modInverse(private,totient(p,q))
return public,private,n
def md5(src):
m1 = imd5.new()
m1.update(src)
dest1 = m1.hexdigest()
return dest1
def md5file(fobj):
m = imd5.new()
while True:
d = fobj.read(8096)
if not d:
break
m.update(d)
fobj.seek(0)
return m.hexdigest()
def response(self,for
example Digest may return stale=true.
"""
return False
def __init__(self, self.challenge['realm']])
self.key = self.pwhashmod.new(self.key).hexdigest().lower()
def get_md5():
if sys.version_info >= (2, 6):
import hashlib
hash = hashlib.md5()
else:
import md5
hash = md5.new()
return hash
#LOG_LEVEL can be one of DEBUG INFO ERROR CRITICAL WARNNING
def get_assign(secret_access_key, headers=None, resource="/", result=None, debug=DEBUG):
'''
Create the authorization for OSS based on header input.
You should put it into "Authorization" parameter of header.
'''
if not headers:
headers = {}
if not result:
result = []
content_md5 = ""
content_type = ""
date = ""
canonicalized_oss_headers = ""
secret_access_key = convert_utf8(secret_access_key)
global OSS_LOGGER_SET
if not OSS_LOGGER_SET:
OSS_LOGGER_SET = Logger(debug, "log.txt", LOG_LEVEL, "oss_util").getlogger()
OSS_LOGGER_SET.debug("secret_access_key: %s" % secret_access_key)
content_md5 = safe_get_element('Content-MD5', headers)
content_type = safe_get_element('Content-Type', headers)
date = safe_get_element('Date', headers)
canonicalized_resource = resource
tmp_headers = _format_header(headers)
if len(tmp_headers) > 0:
x_header_list = tmp_headers.keys()
x_header_list.sort()
for k in x_header_list:
if k.startswith(SELF_DEFINE_HEADER_PREFIX):
canonicalized_oss_headers += "%s:%s\n" % (k, tmp_headers[k])
string_to_sign = method + "\n" + content_md5.strip() + "\n" + content_type + "\n" + date + "\n" + canonicalized_oss_headers + canonicalized_resource
result.append(string_to_sign)
OSS_LOGGER_SET.debug("method:%s\n content_md5:%s\n content_type:%s\n data:%s\n canonicalized_oss_headers:%s\n canonicalized_resource:%s\n" % (method, content_md5, content_type, date, canonicalized_oss_headers, canonicalized_resource))
OSS_LOGGER_SET.debug("string_to_sign:%s\n \nlength of string_to_sign:%d\n" % (string_to_sign, len(string_to_sign)))
h = hmac.new(secret_access_key, string_to_sign, sha)
sign_result = base64.encodestring(h.digest()).strip()
OSS_LOGGER_SET.debug("sign result:%s" % sign_result)
return sign_result
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。