Python ftplib 模块,FTP_TLS 实例源码
我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用ftplib.FTP_TLS。
def test_context(self):
self.client.quit()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
context=ctx)
self.assertRaises(ValueError, certfile=CERTFILE,
keyfile=CERTFILE, context=ctx)
self.client = ftplib.FTP_TLS(context=ctx, timeout=10)
self.client.connect(self.server.host, self.server.port)
self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
self.client.auth()
self.assertIs(self.client.sock.context, ctx)
self.assertIsInstance(self.client.sock, ssl.SSLSocket)
self.client.prot_p()
with self.client.transfercmd('list') as sock:
self.assertIs(sock.context, ctx)
self.assertIsInstance(sock, ssl.SSLSocket)
def __init__(self, hostname, root_path, base_url,
username=None, password=None, passive=True, secure=False, **kwargs):
if isinstance(hostname, FTP):
self.ftp_client = hostname
else: # pragma: nocover
if secure:
self.ftp_client = FTP_TLS(host=hostname, user=username, passwd=password, **kwargs)
# noinspection PyUnresolvedReferences
self.ftp_client.prot_p()
else:
self.ftp_client = FTP(host=hostname, **kwargs)
self.ftp_client.set_pasv(passive)
self.root_path = root_path
self.base_url = base_url.rstrip('/')
def test_context(self):
self.client.quit()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertRaises(ValueError, timeout=TIMEOUT)
self.client.connect(self.server.host, ssl.SSLSocket)
self.client.prot_p()
sock = self.client.transfercmd('list')
try:
self.assertIs(sock.context, ssl.SSLSocket)
finally:
sock.close()
def test_context(self):
self.client.quit()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertRaises(ValueError, ssl.SSLSocket)
finally:
sock.close()
def test_context(self):
self.client.quit()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertRaises(ValueError, timeout=2)
self.client.connect(self.server.host, ssl.SSLSocket)
def __init__(self):
self.name = ""
self.commands = {}
self.commands_function = {}
self.user = ""
self.server = ""
self.password = ""
self.port = 0
self.copy_dir = ""
self.dest = ""
self.assets = ""
self.ftp = FTP_TLS()
self.folder = ".tfc"
self.config_file = "config.ini"
self.config_path = self.folder + os.path.sep + self.config_file
self.config_file_content = ""
self.db_name = "ftp_files.db"
self.db_path = self.folder + os.path.sep + self.db_name
self.place_holder_config = "files/config.ini"
self.help_path = "files/help.json"
self.time_out = 300
self.time = 0
def test_context(self):
self.client.quit()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertRaises(ValueError, ssl.SSLSocket)
def test_context(self):
self.client.quit()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertRaises(ValueError, ssl.SSLSocket)
def __init__(self, host, user, password):
ftplib.FTP_TLS.__init__(self)
self.connect(host, 21)
self.login(user, password)
# Set up encrypted data connection.
self.prot_p()
def get_conn(self):
"""
Returns a FTPS connection object.
"""
if self.conn is None:
params = self.get_connection(self.ftp_conn_id)
self.conn = FTP_TLS(timeout=60)
self.conn.connect(params.host, params.port)
self.conn.auth()
self.conn.prot_p()
self.conn.login(params.login, params.password)
return self.conn
def upload_archive_file(self, local_filename, remote_filename, target_dir):
yield("Uploading {} to FTP in directory: {},filename: {}".format(local_filename, target_dir, remote_filename))
local_filesize = os.stat(local_filename).st_size
self.upload_total = os.stat(local_filename).st_size
self.upload_current = 0
if self.settings.ftps['no_certificate_check']:
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.verify_mode = ssl.CERT_NONE
context.check_hostname = False
else:
context = ssl.create_default_context()
ftps = FTP_TLS(
host=self.settings.ftps['address'],
user=self.settings.ftps['user'],
passwd=self.settings.ftps['passwd'],
context=context,
source_address=self.settings.ftps[
'source_address'],
timeout=self.settings.timeout_timer
)
ftps.cwd(target_dir)
ftps.encoding = 'utf8'
ftps.prot_p()
for line in ftps.mlsd(facts=["size"]):
if(line[0] == remote_filename and
local_filesize == int(line[1]["size"])):
yield("File exists and size is equal.")
ftps.close()
return
with open(local_filename, "rb") as file:
for retry_count in range(3):
try:
ftps.storbinary(
'STOR %s' % remote_filename,
file,
callback=lambda data, args=self.print_method: self.print_progress(data, args)
)
except (ConnectionResetError, socket.timeout, TimeoutError):
yield("Upload failed,retrying...")
else:
break
yield("\nFile uploaded.")
ftps.close()
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, 0))
self.server.start()
self.client = ftplib.FTP_TLS(timeout=10)
self.client.connect(self.server.host, self.server.port)
# enable TLS
self.client.auth()
self.client.prot_p()
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, self.server.port)
def get_conn(self):
"""
Returns a FTPS connection object.
"""
if self.conn is None:
params = self.get_connection(self.ftp_conn_id)
if params.port:
ftplib.FTP_TLS.port=params.port
self.conn = ftplib.FTP_TLS(
params.host, params.login, params.password
)
return self.conn
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, self.server.port)
# enable TLS
self.client.auth()
self.client.prot_p()
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, 0))
self.server.start()
self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
self.client.connect(self.server.host, self.server.port)
def test_check_hostname(self):
self.client.quit()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.check_hostname = True
ctx.load_verify_locations(CAFILE)
self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
# 127.0.0.1 doesn't match SAN
self.client.connect(self.server.host, self.server.port)
with self.assertRaises(ssl.CertificateError):
self.client.auth()
# exception quits connection
self.client.connect(self.server.host, self.server.port)
self.client.prot_p()
with self.assertRaises(ssl.CertificateError):
self.client.transfercmd("list").close()
self.client.quit()
self.client.connect("localhost", self.server.port)
self.client.auth()
self.client.quit()
self.client.connect("localhost", self.server.port)
self.client.prot_p()
self.client.transfercmd("list").close()
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, self.server.port)
# enable TLS
self.client.auth()
self.client.prot_p()
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, self.server.port)
def test_check_hostname(self):
self.client.quit()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.check_hostname = True
ctx.load_verify_locations(CAFILE)
self.client = ftplib.FTP_TLS(context=ctx, self.server.port)
self.client.prot_p()
self.client.transfercmd("list").close()
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, 0))
self.server.start()
self.client = ftplib.FTP_TLS(timeout=2)
self.client.connect(self.server.host, self.server.port)
# enable TLS
self.client.auth()
self.client.prot_p()
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, self.server.port)
def connect(self):
if(self.readConfig()):
try:
self.ftp = FTP_TLS(self.server)
password = self.__askPassWord()
self.ftp.login(self.user, password)
#self.ftp.prot_p()
return True
except Exception, e:
print(color.TFC + "ftc " + color.WARNING + "Unable to log in...")
self.__writeConfig("Config", "time", 0)
return False
def do_cmd(term):
session = getsession()
sep_ok = getattr(term, color_secondary)(u'::')
sep_bad = getattr(term, color_primary)(u'::')
colors = {'highlight': getattr(term, color_primary)}
echo(u'\r\n\r\n i hope you know glftpd cmds :)') # feel free to change these echoes to personalize your installation
echo(u'\r\n\r\n if you dont,type quit')
echo(u'\r\n\r\n basically all this is good for at the moment is for msg and request')
echo (u'\r\n\r\n e.g \'msg kniffy hi\' or \'request coolthing -for:<you>\'')
for _ in range(max_attempts):
echo(u'\r\n\r\n{sep} tYPE cMD -> '.format(sep=sep_ok))
handle = LineEditor(max_length, colors=colors
).read() or u''
if handle.strip() == u'':
continue
# user says goodbye
if handle.lower() in bye_u:
return
else:
# do cmd
person = session.user.handle
# session.user.handle = person
ftps = FTP_TLS()
#ftps.set_debuglevel(2) # if you broke something,uncomment this (run it directly,not from eggdrop)
ftps.connect('127.0.0.1', '1234') # enter your server and port within the quotes
ftps.login('cmd', '<make this a non-sysop user please>') # enter your user and pass within the quotes (remember,not a user with privs)
# ftps.login(person,auth)
ftps.prot_p()
ftps.sendcmd('site ' + handle)
echo(u'\r\n\r\n cmd sent')
ftps.quit()
# echo(u'\r\n\r\n{sep} Password: '.format(sep=sep_ok))
# password = LineEditor(password_max_length,
# colors=colors,
# hidden=hidden_char
# ).read() or u''
# user = authenticate_user(handle,password)
# if not user:
# echo(u'\r\n\r\n{sep} Login failed.'.format(sep=sep_bad))
# continue
# goto(top_script,handle=user.handle)
# echo(u'\r\n\r\n{sep} Too many authentication attempts.\r\n'
# .format(sep=sep_bad))
def main():
term = getterminal()
do_cmd(term)
#print "hi :) running cmd.."
#ftps = FTP_TLS()
#ftps.set_debuglevel(2) # if you broke something,not from eggdrop)
#ftps.connect('127.0.0.1','1234') # enter your server and port within the quotes
#ftps.login('cmd','asdf') # enter your user and pass within the quotes (remember,not a user with privs)
#ftps.prot_p()
#ftps.sendcmd('site' + argument)
#print "logged in,what cmd do you want to issue?"
#FANCY = raw_input()
#FANCY = raw_input(promt)
#ftps.sendcmd('site ' + FANCY)
# the tcl script i included will take any output from this python and spam
# it into the channel.. set this how you like,or turn it off
# if you're a tcl guru comment this out and make your tcl do the accounce
#print "cmd sent,sleeping now zzzz"
#ftps.quit()
#quit()
def delete_user(term, tgt_user, point):
""" Delete given user. You may delete yourself. """
_color1, _color2 = [getattr(term, _color)
for _color in (color_lowlight, color_highlight)]
lb, rb, colon = _color1('['), _color1(']'), _color1(':')
echo(term.move(*point))
echo(u'Delete {handle} {lb}yN{rb}{colon}{clear_eos} ?\b\b'
.format(handle=_color2(tgt_user.handle),
rb=rb, lb=lb, colon=colon,
clear_eos=term.clear_eos))
inp = term.inkey()
echo(inp + term.move(point.y + 2, point.x))
if inp == u'y':
if tgt_user.handle != 'anonymous':
ftps = FTP_TLS()
ftps.connect('127.0.0.1', '1234')
ftps.login('bbs', '<please make this a special sysop user in glftpd>')
ftps.prot_p() # for ssl mode
ftps.sendcmd('site deluser ' + tgt_user.handle )
ftps.sendcmd('site msg sysop user ' + tgt_user.handle + ' deleted,maybe purge them too? ' )
ftps.quit()
tgt_user.delete()
echo(_color2('Deleted !'))
time.sleep(1)
return True
echo(_color2('Canceled !'))
time.sleep(1)
return False
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, self.server.port)
# enable TLS
self.client.auth()
self.client.prot_p()
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, self.server.port)
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, self.server.port)
# enable TLS
self.client.auth()
self.client.prot_p()
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, self.server.port)
def test_check_hostname(self):
self.client.quit()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.check_hostname = True
ctx.load_verify_locations(CAFILE)
self.client = ftplib.FTP_TLS(context=ctx, self.server.port)
self.client.prot_p()
with self.assertRaises(ssl.CertificateError):
with self.client.transfercmd("list") as sock:
pass
self.client.quit()
self.client.connect("localhost", self.server.port)
self.client.prot_p()
with self.client.transfercmd("list") as sock:
pass
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, self.server.port)
# enable TLS
self.client.auth()
self.client.prot_p()
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, self.server.port)
def login(self, *args, **kwargs):
ftplib.FTP_TLS.login(self, **kwargs)
self.prot_p()
def setUp(self):
self.server = FTPSServer()
self.server.start()
self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
self.client.connect(self.server.host, self.server.port)
def _fetch_remote(source, destination, parsed_url, creds, tmp_file, tmp_path):
if parsed_url['scheme'] in ('ftp', 'ftps'):
if parsed_url['scheme'] == 'ftp':
connection = ftplib.FTP()
connection.connect(*smoke_zephyr.utilities.parse_server(parsed_url['netloc'], 21))
else:
connection = ftplib.FTP_TLS()
connection.connect(*smoke_zephyr.utilities.parse_server(parsed_url['netloc'], 990))
connection.login(creds.username or '', creds.password or '')
connection.retrbinary('RETR ' + parsed_url['path'], tmp_file.write)
connection.quit()
elif parsed_url['scheme'] in ('git', 'git+ssh', 'git+http', 'git+https'):
parsed_url['scheme'] = parsed_url['scheme'].split('+', 1)[-1]
branch = parsed_url['fragment']
if branch:
parsed_url['fragment'] = ''
else:
branch = None
os.mkdir(destination, mode=MAKEDIR_MODE)
repo = git.Repo.clone_from(urllib.parse.urlunparse(parsed_url.values()), destination)
if branch is None or branch == repo.active_branch.name:
return
origin = repo.remotes['origin']
origin.fetch()
branch_ref = next((ref for ref in repo.refs if isinstance(ref, git.RemoteReference) and ref.remote_head == branch), None)
if branch_ref is None:
raise ValueError('failed to find reference to remote branch name: ' + branch)
branch_ref.checkout(b=branch)
elif parsed_url['scheme'] in ('http', 'https'):
request = urllib.request.Request(urllib.parse.urlunparse(parsed_url.values()))
if creds.username is not None:
request.add_header(
'Authorization',
'Basic ' + base64.b64encode("{0}:{1}".format(creds.username, creds.password).encode('utf-8')).decode('utf-8')
)
url_h = urllib.request.urlopen(request)
shutil.copyfileobj(url_h, tmp_file)
url_h.close()
def get_conn(self):
"""
Returns a FTPS connection object.
"""
if self.conn is None:
params = self.get_connection(self.ftp_conn_id)
self.conn = ftplib.FTP_TLS(
params.host, params.password
)
return self.conn
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, self.server.port)
# enable TLS
self.client.auth()
self.client.prot_p()
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, self.server.port)
def test_check_hostname(self):
self.client.quit()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.check_hostname = True
ctx.load_verify_locations(CAFILE)
self.client = ftplib.FTP_TLS(context=ctx, self.server.port)
self.client.prot_p()
with self.client.transfercmd("list") as sock:
pass
def __init__(self, server, port=21, username="anonymous", password="", use_ssl=False):
self._server = server
self._port = port
self._username = username
self._password = password
self._use_ssl = use_ssl
if use_ssl:
from ftplib import FTP_TLS
self._ftp = FTP_TLS()
else:
from ftplib import FTP
self._ftp = FTP()
def main(handle=u''):
"""
Main procedure.
"""
# set syncterm font,if any
term = getterminal()
if term.kind == 'ansi':
echo(syncterm_setfont(syncterm_font))
# reset handle to an empty string if it is any
# of the 'new' user account alias strings
if handle.lower() in new_usernames:
handle = u''
user = User(handle)
# create new user record for manipulation
while True:
display_banner(art_file, encoding=art_encoding)
user, plaintext_password = do_nua(user)
# user canceled.
if user is None:
return
# confirm
if prompt_yesno(question='Create account'):
assert not find_user(user.handle), (
# prevent race condition,scenario: `billy' begins new account
# process,waits at 'Create account [yn] ?' prompt until a
# second `billy' finishes account creation process,then the
# first `billy' enters 'y',overwriting the existing account.
'Security race condition: account already exists')
# real_ip = getssession().addrport
ftps = FTP_TLS()
ftps.connect('127.0.0.1', '1234') # this can be remote
ftps.login('asdf', '<please set up a glftpd user for this>')
ftps.prot_p()
ftps.sendcmd('site gadduser bbsuser ' + user.handle + ' ' + plaintext_password + ' *@127.0.0.1 ' )
ftps.sendcmd('site deluser ' + user.handle ) # for validation reasons
ftps.sendcmd('site msg sysop ' + user.handle + ' added,please validate them ' )
ftps.quit()
user.save()
goto(top_script, user.handle)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。