Python OpenSSL.crypto 模块,X509Store() 实例源码
我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用OpenSSL.crypto.X509Store()。
def verify_cert_chain(chain_pem, trusted_certs):
cert = crypto.load_certificate(crypto.FILETYPE_PEM, chain_pem.decode('utf-8'))
# Build store of trusted certificates
store = crypto.X509Store()
for _cert in trusted_certs:
tmp = crypto.load_certificate(crypto.FILETYPE_PEM, _cert.decode('utf-8'))
store.add_cert(tmp)
# Prepare context
ctx = crypto.X509StoreContext(store, cert)
# Start validation
try:
ctx.verify_certificate()
return True
except crypto.X509StoreContextError as e:
logging.error("Certificate validation failed: %s" % e)
return False
def _verify_ca(self):
"""
(internal use only)
verifies the current x509 is signed
by the associated CA
"""
store = crypto.X509Store()
store.add_cert(self.ca.x509)
store_ctx = crypto.X509StoreContext(store, self.x509)
try:
store_ctx.verify_certificate()
except crypto.X509StoreContextError as e:
raise ValidationError(_("CA doesn't match,got the "
"following error from pyOpenSSL: \"%s\"") % e.args[0][2])
def registerOrdererAdminTuple(self, userName, ordererName, organizationName):
' Assign the user as orderer admin'
ordererAdminTuple = NodeAdminTuple(user=userName, nodeName=ordererName, organization=organizationName)
assert ordererAdminTuple not in self.ordererAdminTuples, "Orderer admin tuple already registered {0}".format(
ordererAdminTuple)
assert organizationName in self.organizations, "Orderer Organization not defined {0}".format(organizationName)
user = self.getUser(userName, shouldCreate=True)
# Add the subjectAlternativeName if the current entity is a signer,and the nodeName contains peer or orderer
extensions = self._get_cert_extensions_ip_sans(userName, ordererName)
certReq = user.createCertRequest(ordererAdminTuple.nodeName, extensions=extensions)
userCert = self.getOrganization(organizationName).createCertificate(certReq, extensions=extensions)
# Verify the newly created certificate
store = crypto.X509Store()
# Assuming a list of trusted certs
for trustedCert in [self.getOrganization(organizationName).signedCert]:
store.add_cert(trustedCert)
# Create a certificate context using the store and the certificate to verify
store_ctx = crypto.X509StoreContext(store, userCert)
# Verify the certificate,returns None if it can validate the certificate
store_ctx.verify_certificate()
self.ordererAdminTuples[ordererAdminTuple] = userCert
return ordererAdminTuple
def test_get_cert_store(self):
"""
`Context.get_cert_store` returns a `X509Store` instance.
"""
context = Context(TLSv1_METHOD)
store = context.get_cert_store()
assert isinstance(store, X509Store)
def test_new(self):
cert = self._create_cert()
self.assertNotEqual(cert.certificate, '')
self.assertNotEqual(cert.private_key, '')
x509 = cert.x509
self.assertEqual(x509.get_serial_number(), cert.serial_number)
subject = x509.get_subject()
# check subject
self.assertEqual(subject.countryName, cert.country_code)
self.assertEqual(subject.stateOrProvinceName, cert.state)
self.assertEqual(subject.localityName, cert.city)
self.assertEqual(subject.organizationName, cert.organization_name)
self.assertEqual(subject.emailAddress, cert.email)
self.assertEqual(subject.commonName, cert.common_name)
# check issuer
issuer = x509.get_issuer()
ca = cert.ca
self.assertEqual(issuer.countryName, ca.country_code)
self.assertEqual(issuer.stateOrProvinceName, ca.state)
self.assertEqual(issuer.localityName, ca.city)
self.assertEqual(issuer.organizationName, ca.organization_name)
self.assertEqual(issuer.emailAddress, ca.email)
self.assertEqual(issuer.commonName, ca.common_name)
# check signature
store = crypto.X509Store()
store.add_cert(ca.x509)
store_ctx = crypto.X509StoreContext(store, cert.x509)
store_ctx.verify_certificate()
# ensure version is 3 (indexed 0 based counting)
self.assertEqual(x509.get_version(), 2)
# basic constraints
e = cert.x509.get_extension(0)
self.assertEqual(e.get_critical(), 0)
self.assertEqual(e.get_short_name().decode(), 'basicConstraints')
self.assertEqual(e.get_data(), b'0\x00')
def verify_certificate_chain(ca_pem_data, cert_pem_data):
try:
ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, ca_pem_data)
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem_data)
store = crypto.X509Store()
store.add_cert(ca_cert)
store_ctx = crypto.X509StoreContext(store, cert)
store_ctx.verify_certificate()
except crypto.Error as e:
raise InvalidCertificate('Broken certificate') from e
except crypto.X509StoreContextError as e:
raise InvalidCertificate('Invalid certificate chain: ' + str(e)) from e
def test_get_cert_store(self):
"""
:py:obj:`Context.get_cert_store` returns a :py:obj:`X509Store` instance.
"""
context = Context(TLSv1_METHOD)
store = context.get_cert_store()
self.assertIsInstance(store, X509Store)
def __init__(self, local_cert, priv_key, ca_cert, controller_name_re):
"""Initialize JWTUtils
Load the local node certificate,the node private
key and the CA certificate from files; prepare for both
signing and validation of key-value pairs.
Signing will take place with the local certificate,and the
public half will be added to signed objects.
Validation will take place with the CA certificate,along with
other checks that the signing matches the payload.
:param local_cert: file containing public half of the local key
:param priv_key: file containing private half of the local key
:param ca_cert: file containing CA root certificate
raise: IOError if the files cannot be read.
"""
priv_key_pem = self._get_crypto_material(priv_key)
self.private_key = serialization.load_pem_private_key(
priv_key_pem,
password=None,
backend=default_backend())
self.node_certificate = self._get_crypto_material(local_cert)
self.node_cert_obj = load_pem_x509_certificate(
self.node_certificate,
default_backend())
self.node_cert_pem = self.node_cert_obj.public_bytes(
serialization.Encoding.PEM)
ca_certificate = self._get_crypto_material(ca_cert)
# pyopenssl
root_ca = crypto.load_certificate(crypto.FILETYPE_PEM,
ca_certificate)
self.store = crypto.X509Store()
self.store.add_cert(root_ca)
self.controller_name_re = controller_name_re
def test_get_cert_store(self):
"""
:py:obj:`Context.get_cert_store` returns a :py:obj:`X509Store` instance.
"""
context = Context(TLSv1_METHOD)
store = context.get_cert_store()
self.assertIsInstance(store, X509Store)
def verify_certificate(ca, cert):
store = crypto.X509Store()
store.add_cert(ca)
try:
crypto.X509StoreContext(store, cert).verify_certificate()
except:
return False
return True
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。