Python ftplib 模块,error_perm() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用ftplib.error_perm()。
def info(remote):
try:
sfmstat = remote.readlines(os.path.join(remote.root, '.sfmstat'))
except ftplib.error_perm, err:
if not err[0].startswith('550'):
log(err, abort=True)
sfmstat = None
print
if sfmstat:
last_updated, mirror_path = sfmstat[0].split(None, 1)
last_updated = datetime.datetime.fromtimestamp(float(last_updated))
print 'Mirror of', mirror_path
print last_updated.strftime('Last updated on %A,%d. %B %Y at %H:%M:%S')
else:
print 'No mirror recognized'
print
print 'Content of %s%s:' % (remote.host, remote.root)
remote.ftp.dir(remote.root)
print
def _get_files_in_dir(ftp_session, source, dest, status):
filelist = ftp_session.nlst()
for path in filelist:
local_path = os.path.join(dest, path)
remote_path = source + path
try:
# See if it's a directory
new_remote_path = source + path + "/"
ftp_session.cwd(source + path + "/")
# It is! So we should create the directory on dest
try:
os.mkdir(local_path)
except OSError:
logger.debug("Dir: {0} already exists".format(local_path))
logger.debug("Get Folder: {0}".format(new_remote_path))
_get_files_in_dir(ftp_session, new_remote_path, local_path, status)
except ftplib.error_perm:
logger.debug("Get file: {0}".format(path))
with open(local_path, "wb") as file_handle:
ftp_session.retrbinary(
"RETR " + remote_path, file_handle.write)
def download_files(ftp_session, status):
# dest must already exist
try:
ftp_session.cwd(source)
except ftplib.error_perm:
# invalid entry (ensure input form: "/dir/folder/something/")
msg = "Could not open the ftp directory: '{0}'".format(source)
logger.error(msg)
status.update({"error": msg})
return
if not os.path.isdir(dest):
msg = "Local path does not exist: '{0}'".format(dest)
logger.error(msg)
status.update({"error": msg})
return
_get_files_in_dir(ftp_session, status)
def FtpListCommand(self, directory = '/'):
ls = []
if directory[-1] == '/':
try:
self.ftpDirectory += directory
self.ftpDirectory.replace('//','/')
self.ftp.retrlines('LIST %s' % self.ftpDirectory, ls.append)
except error_perm, msg:
errorListingMessage.text = ""
parsedErrorMessage = (str(msg)).split(" ")[1:]
for item in parsedErrorMessage:
errorListingMessage.text = errorListingMessage.text + str(item) + " "
listingErrorPopup.open()
else:
self.ftp.retrlines('LIST', ls.append)
return self.ListParser(ls)
def test_manager(self):
mem_fs = open_fs('mem://')
with self.assertRaises(errors.ResourceError):
with ftp_errors(mem_fs, path='foo'):
raise error_temp
with self.assertRaises(errors.OperationFailed):
with ftp_errors(mem_fs):
raise error_temp
with self.assertRaises(errors.InsufficientStorage):
with ftp_errors(mem_fs):
raise error_perm('552 foo')
with self.assertRaises(errors.ResourceNotFound):
with ftp_errors(mem_fs):
raise error_perm('501 foo')
with self.assertRaises(errors.PermissionDenied):
with ftp_errors(mem_fs):
raise error_perm('999 foo')
def makedir(self, path, permissions=None, recreate=False):
_path = self.validatepath(path)
with ftp_errors(self, path=path):
if _path == '/':
if recreate:
return self.opendir(path)
else:
raise errors.DirectoryExists(path)
if not (recreate and self.isdir(path)):
try:
self.ftp.mkd(_encode(_path, self.ftp.encoding))
except error_perm as error:
code, _ = _parse_ftp_error(error)
if code == 550:
if self.isdir(path):
raise errors.DirectoryExists(path)
else:
if self.exists(path):
raise errors.DirectoryExists(path)
raise errors.ResourceNotFound(path)
return self.opendir(path)
def removedir(self, path):
_path = self.validatepath(path)
if _path == '/':
raise errors.RemoveRootError()
with ftp_errors(self, path):
try:
self.ftp.rmd(_encode(_path, self.ftp.encoding))
except error_perm as error:
code, _ = _parse_ftp_error(error)
if code == 550:
if self.isfile(path):
raise errors.DirectoryExpected(path)
if not self.isempty(path):
raise errors.DirectoryNotEmpty(path)
raise # pragma: no cover
def getbytes(self, path):
_path = self.validatepath(path)
data = io.BytesIO()
with ftp_errors(self, path):
with self._manage_ftp() as ftp:
try:
ftp.retrbinary(
str("RETR ") + _encode(_path, self.ftp.encoding),
data.write
)
except error_perm as error:
code, _ = _parse_ftp_error(error)
if code == 550:
if self.isdir(path):
raise errors.FileExpected(path)
raise
data_bytes = data.getvalue()
return data_bytes
def get_mtime(self, name):
""" Return modification time of the file in ftp server.
If the file does not exist,return None.
If the name refers to a directory,return 0.
To get the local modification time,use
``get_mtime()-clock_offset``.
"""
filename = self.abspath(name)
try:
resp = self.ftp.sendcmd('MDTM ' + filename.replace('\\', '/'))
except error_perm, msg:
s = str(msg)
if s.startswith('550 I can only retrieve regular files'):
# filename is directory
return 0
if s.startswith('550 Can\'t check for file existence'):
# file with filename does not exist
return None
raise
assert resp[:3]=='213', `resp,filename`
modtime = int(time.mktime(time.strptime(resp[3:].strip(), '%Y%m%d%H%M%S')))
return modtime
def ftp_walk(ftpconn: FTP, rootpath=''):
"""Recursively traverse an ftp directory to discovery directory listing."""
current_directory = rootpath
try:
directories, files = directory_listing(ftpconn, current_directory)
except ftplib.error_perm:
return
# Yield before recursion
yield current_directory, directories, files
# Recurse into sub-directories
for name in directories:
new_path = os.path.join(current_directory, name)
for entry in ftp_walk(ftpconn, rootpath=new_path):
yield entry
else:
return
def placeFiles(ftp, path):
"""Upload the built files to FTP"""
for name in os.listdir(path):
if name != "config.py" and name != "config.pyc" and name != "templates" and name != "content":
localpath = os.path.join(path, name)
if os.path.isfile(localpath):
print("STOR", name, localpath)
ftp.storbinary('STOR ' + name, open(localpath, 'rb'))
elif os.path.isdir(localpath):
print("MKD", name)
try:
ftp.mkd(name)
# ignore "directory already exists"
except error_perm as e:
if not e.args[0].startswith('550'):
raise
print("CWD", name)
ftp.cwd(name)
placeFiles(ftp, localpath)
print("CWD", "..")
ftp.cwd("..")
def test_rnfr_rnto(self):
# rename file
tempname = os.path.basename(tempfile.mktemp(dir=HOME))
self.client.rename(self.tempfile, tempname)
self.client.rename(tempname, self.tempfile)
# rename dir
tempname = os.path.basename(tempfile.mktemp(dir=HOME))
self.client.rename(self.tempdir, self.tempdir)
# rnfr/rnto over non-existing paths
bogus = os.path.basename(tempfile.mktemp(dir=HOME))
self.assertRaises(ftplib.error_perm, self.client.rename, bogus, '/x')
self.assertRaises(
ftplib.error_perm, self.tempfile, u('/'))
# rnto sent without first specifying the source
self.assertRaises(ftplib.error_perm, self.client.sendcmd,
'rnto ' + self.tempfile)
# make sure we can't rename root directory
self.assertRaisesRegex(ftplib.error_perm,
"Can't rename home directory",
self.client.rename, '/', '/x')
def test_unforeseen_mdtm_event(self):
# Emulate a case where the file last modification time is prior
# to year 1900. This most likely will never happen unless
# someone specifically force the last modification time of a
# file in some way.
# To do so we temporarily override os.path.getmtime so that it
# returns a negative value referring to a year prior to 1900.
# It causes time.localtime/gmtime to raise a ValueError exception
# which is supposed to be handled by server.
# On python 3 it seems that the trick of replacing the original
# method with the lambda doesn't work.
if not PY3:
_getmtime = AbstractedFS.getmtime
try:
AbstractedFS.getmtime = lambda x, y: -9000000000
self.assertRaisesRegex(
ftplib.error_perm,
"550 Can't determine file's last modification time",
self.client.sendcmd, 'mdtm ' + self.tempfile)
# make sure client hasn't been disconnected
self.client.sendcmd('noop')
finally:
AbstractedFS.getmtime = _getmtime
def test_stou_orphaned_file(self):
# Check that no orphaned file gets left behind when STOU fails.
# Even if STOU fails the file is first created and then erased.
# Since we can't know the name of the file the best way that
# we have to test this case is comparing the content of the
# directory before and after STOU has been issued.
# Assuming that TESTFN is supposed to be a "reserved" file
# name we shouldn't get false positives.
safe_remove(TESTFN)
# login as a limited user in order to make STOU fail
self.client.login('anonymous', '@nopasswd')
before = os.listdir(HOME)
self.assertRaises(ftplib.error_perm,
'stou ' + TESTFN)
after = os.listdir(HOME)
if before != after:
for file in after:
self.assertFalse(file.startswith(TESTFN))
def test_epsv(self):
# test wrong proto
try:
self.client.sendcmd('epsv ' + self.other_proto)
except ftplib.error_perm as err:
self.assertEqual(str(err)[0:3], "522")
else:
self.fail("Exception not raised")
# proto > 2
self.assertRaises(ftplib.error_perm, 'epsv 3')
# test connection
for cmd in ('EPSV', 'EPSV ' + self.proto):
host, port = ftplib.parse229(self.client.sendcmd(cmd),
self.client.sock.getpeername())
with contextlib.closing(
socket.socket(self.client.af, socket.SOCK_STREAM)) as s:
s.settimeout(TIMEOUT)
s.connect((host, port))
self.client.sendcmd('abor')
def test_mlst(self):
# utility function for extracting the line of interest
def mlstline(cmd):
return self.client.voidcmd(cmd).split('\n')[1]
if self.utf8fs:
self.assertTrue('type=dir' in
mlstline('mlst ' + TESTFN_UNICODE))
self.assertTrue('/' + TESTFN_UNICODE in
mlstline('mlst ' + TESTFN_UNICODE))
self.assertTrue('type=file' in
mlstline('mlst ' + TESTFN_UNICODE_2))
self.assertTrue('/' + TESTFN_UNICODE_2 in
mlstline('mlst ' + TESTFN_UNICODE_2))
else:
self.assertRaises(ftplib.error_perm,
mlstline, 'mlst ' + TESTFN_UNICODE)
# --- file transfer
def test_stor(self):
if self.utf8fs:
data = b'abcde12345' * 500
os.remove(TESTFN_UNICODE_2)
dummy = BytesIO()
dummy.write(data)
dummy.seek(0)
self.client.storbinary('stor ' + TESTFN_UNICODE_2, dummy)
dummy_recv = BytesIO()
self.client.retrbinary('retr ' + TESTFN_UNICODE_2,
dummy_recv.write)
dummy_recv.seek(0)
self.assertEqual(dummy_recv.read(), data)
else:
dummy = BytesIO()
self.assertRaises(ftplib.error_perm, self.client.storbinary,
'stor ' + TESTFN_UNICODE_2, dummy)
def test_prot(self):
self.client.login(secure=False)
msg = "503 PROT not allowed on insecure control connection."
self.assertRaisesWithMsg(ftplib.error_perm, msg,
self.client.sendcmd, 'prot p')
self.client.login(secure=True)
# secured
self.client.prot_p()
sock = self.client.transfercmd('list')
with contextlib.closing(sock):
while True:
if not sock.recv(1024):
self.client.voidresp()
break
self.assertTrue(isinstance(sock, ssl.SSLSocket))
# unsecured
self.client.prot_c()
sock = self.client.transfercmd('list')
with contextlib.closing(sock):
while True:
if not sock.recv(1024):
self.client.voidresp()
break
self.assertFalse(isinstance(sock, ssl.SSLSocket))
def ls_ftp(dir):
from urlparse import urlparse
from ftplib import FTP, error_perm
o = urlparse(dir)
ftp = FTP(o.netloc)
ftp.login()
ftp.cwd(o.path)
files = []
try:
files = ftp.nlst()
except error_perm, resp:
if str(resp) == "550 No files found":
print "no files in this directory"
else:
raise
return files
def cwd(self,path):
#-----------------------------------------------------------------------------
# Name: cwd fuction
# param: path :cd to the path,please write absolute path
# explain: cd to this path
# Author: gongke
#
# Created: 2013/01/13
#-----------------------------------------------------------------------------
try:
self.mftp.cwd(path)
except ftplib.error_perm:
#print "cannot cd to this path %s" %path
print_mes = "cannot cd to this path " +path
print print_mes
info_public(print_mes)
self.mftp.quit()
return False
#print "cd %s" %path
print_mes = "cd " +path
print print_mes
info_public(print_mes)
return True
def open(self, flags):
if flags in ('r',):
self._socket = self._conn.transfercmd('RETR %s' % self.url.path)
elif flags in ('w', 'w+'):
try:
self._socket = self._conn.transfercmd('STOR %s' % self.url.path)
except ftplib.error_perm:
raise FileExistsError(str(self.url))
def size(self):
try:
return self._conn.size(self.url.path)
except ftplib.error_perm as err:
if 'not a regular file' in str(err):
return None
raise FileNotFoundError(self.url)
except ValueError:
return None
def isdir(self):
try:
self._conn.size(self.url.path)
except ftplib.error_perm as err:
if "regular" in str(err):
return True
return False
def delete(self):
if self.isfile():
self._conn.delete(self.url.path)
elif self.isdir():
try:
self._conn.rmd(self.url.path)
except ftplib.error_perm as err:
raise OSError(err)
else:
raise FileNotFoundError(str(self.url))
def mkdir(self, name=None):
try:
if name is None:
if self.isdir():
return self
self._conn.mkd(self.url.path)
return self
else:
if self.join(name).isdir():
return self.join(name)
self._conn.mkd(name)
return self.join(name)
except ftplib.error_perm as err:
raise TransfertPermissionError(err)
def retrfile(self, file, type):
import ftplib
self.endtransfer()
if type in ('d', 'D'): cmd = 'TYPE A'; isdir = 1
else: cmd = 'TYPE ' + type; isdir = 0
try:
self.ftp.voidcmd(cmd)
except ftplib.all_errors:
self.init()
self.ftp.voidcmd(cmd)
conn = None
if file and not isdir:
# Try to retrieve as a file
try:
cmd = 'RETR ' + file
conn = self.ftp.ntransfercmd(cmd)
except ftplib.error_perm, reason:
if str(reason)[:3] != '550':
raise IOError, ('ftp error', reason), sys.exc_info()[2]
if not conn:
# Set transfer mode to ASCII!
self.ftp.voidcmd('TYPE A')
# Try a directory listing. Verify that directory exists.
if file:
pwd = self.ftp.pwd()
try:
try:
self.ftp.cwd(file)
except ftplib.error_perm, reason:
raise IOError, sys.exc_info()[2]
finally:
self.ftp.cwd(pwd)
cmd = 'LIST ' + file
else:
cmd = 'LIST'
conn = self.ftp.ntransfercmd(cmd)
self.busy = 1
# Pass back both a suitably decorated object and a retrieval length
return (addclosehook(conn[0].makefile('rb'),
self.endtransfer), conn[1])
def get_ftpfile_lastmodified(self, file_path):
'''return lastmodified for a given file on ftp server.'''
try:
response = self.client.sendcmd('MDTM ' + file_path)
except error_perm as e:
self.logger.debug("Skip %s: %s" % (file_path,e))
return None
code, lastmodified = response.split()
# an example: 'last-modified': '20121128150000'
lastmodified = int(time.mktime(datetime.strptime(lastmodified, '%Y%m%d%H%M%S').timetuple()))
return lastmodified
def main():
try:
f = ftplib.FTP(HOST)
except (socket.error, socket.gaierror) as e:
print('ERROR: connot reach "%s"' % HOST)
exit(1)
print('*** Connected to host "%s"' % HOST)
try:
f.login()
except ftplib.error_perm:
print('ERROR: connot login anonymously')
f.quit()
exit(1)
print('*** Logged in as "anonymous"')
try:
f.cwd(DIRN)
except ftplib.error_perm:
print('ERROR: cannot cd to "%s"' % DIRN)
f.quit()
exit(1)
print('*** Changed to "%s" folder' % DIRN)
try:
locFile = open(FILE, 'wb')
f.retrbinary('RETR %s' % FILE, locFile.write)
except ftplib.error_perm:
print('ERROR: cannot read file "%s"' % FILE)
os.unlink(FILE)
else:
print('*** Downloaded "%s" to CWD' % FILE)
finally:
locFile.close()
f.quit()
return
def get_directory_listing(ftp_con):
try:
files = ftp_con.nlst()
if "." in files: files.remove(".")
if ".." in files: files.remove("..")
rnd_files = random.sample(set(files), 3)
return "{} files listed - sample of files: {}".format(len(files), rnd_files)
except ftplib.error_perm as resp:
if "550" in resp:
return "No files"
finally:
con.quit()
con.close()
def test_exceptions(self):
self.assertRaises(ftplib.error_temp, 'echo 400')
self.assertRaises(ftplib.error_temp, 'echo 499')
self.assertRaises(ftplib.error_perm, 'echo 500')
self.assertRaises(ftplib.error_perm, 'echo 599')
self.assertRaises(ftplib.error_proto, 'echo 999')
def test_all_errors(self):
exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
ftplib.error_proto, ftplib.Error, IOError, EOFError)
for x in exceptions:
try:
raise x('exception not included in all_errors set')
except ftplib.all_errors:
pass
def mkdir_parent(self, dirname):
from ftplib import error_perm
from os.path import sep
dirname = strings.rtrim(dirname, sep)
dir_arr = dirname.split(sep)
for i in range(2 if dirname.startswith("/") else 1, len(dir_arr) + 1):
try:
self.mkdir(sep.join(dir_arr[:i]))
except error_perm:
pass
except:
logger.error_traceback()
def poke(self, context):
with self._create_hook() as hook:
self.log.info('Poking for %s', self.path)
try:
hook.get_mod_time(self.path)
except ftplib.error_perm as e:
error = str(e).split(None, 1)
if error[1] != "Can't check for file existence":
raise e
return False
return True
def test_poke(self):
op = FTPSensor(path="foobar.json", ftp_conn_id="bob_ftp",
task_id="test_task")
self.hook_mock.get_mod_time.side_effect = \
[error_perm("550: Can't check for file existence"), None]
self.assertFalse(op.poke(None))
self.assertTrue(op.poke(None))
def test_poke_fails_due_error(self):
op = FTPSensor(path="foobar.json",
task_id="test_task")
self.hook_mock.get_mod_time.side_effect = \
error_perm("530: Login authentication failed")
with self.assertRaises(error_perm) as context:
op.execute(None)
self.assertTrue("530" in str(context.exception))
def I1ii11iIi11i(host, port, user, pwd):
I1IiI = False
for o0OOO in range(10):
iIiiiI = False
try:
if 0:
iii1II11ii * i11iII1iiI + iI1Ii11111iIi + ii1II11I1ii1I + oO0o0ooO0 - iiIIIII1i1iI
debug(decode('\xedP\xb6\x15\x8f\x82\xd5\xed\x92\x95 \xfaA\xe9\xd6\xa4k\x92\xe8\xb9\xea\xd0X3\xa4)'), o0OOO, pwd, host, port)
o0oO0 = ftplib.FTP(timeout=10)
o0oO0.connect(host, port)
o0oO0.login(user, pwd)
o0oO0.quit()
o0oO0.close()
I1IiI = True
except Exception as oo00:
if type(oo00) is ftplib.error_perm:
if 0:
O0Oo0oO0o.II1iI.i1iIii1Ii1II
else:
if 0:
O0Oooo00
if 0:
i1IIi11111i / iIiiiI1IiI1I1 % II1iI * II1iI * O0Oo0oO0o / IIii1I
iIiiiI = True
else:
if 0:
II1iI / iIiiiI1IiI1I1 + O00ooooo00 % II1.i1iIii1Ii1II / i1IIi11111i
finally:
if not iIiiiI:
return I1IiI
return
if 0:
i1IIi11111i + O0Oooo00 - i1IIi11111i.II1
def test_exceptions(self):
self.assertRaises(ftplib.error_temp, 'echo 999')
def test_all_errors(self):
exceptions = (ftplib.error_reply, EOFError)
for x in exceptions:
try:
raise x('exception not included in all_errors set')
except ftplib.all_errors:
pass
def test_exceptions(self):
self.assertRaises(ftplib.error_temp, 'echo 999')
def test_all_errors(self):
exceptions = (ftplib.error_reply, EOFError)
for x in exceptions:
try:
raise x('exception not included in all_errors set')
except ftplib.all_errors:
pass
def ftp_command(ftp):
while True: # Run until 'exit' command is received from user
command = input('Enter a command: ') # get next command from user
commands = command.split() # split command and file/directory into list
if commands[0] == 'cd': # Change directory
try:
ftp.cwd(commands[1])
print('Directory listing for', ftp.pwd())
ftp.dir()
print('Current Directory', ftp.pwd())
except ftplib.error_perm as e: # Handle 550 (not found / no permission error)
error_code = str(e).split(None, 1)
if error_code[0] == '550':
print(error_code[1], 'Directory may not exist or you may not have permission to view it.')
elif commands[0] == 'get': # Download file
try:
ftp.retrbinary('RETR ' + commands[1], open(commands[1], 'wb').write)
print('File successfully downloaded.')
except ftplib.error_perm as e: # Handle 550 (not found / no permission error)
error_code = str(e).split(None, 'File may not exist or you may not have permission to view it.')
elif commands[0] == 'ls': # Print directory listing
print('Directory listing for', ftp.pwd())
ftp.dir()
elif commands[0] == 'exit': # Exit application
ftp.quit()
print('Goodbye.')
break
else:
print('Invalid command,try again (valid options: cd/get/exit).')
def test_exceptions(self):
self.assertRaises(ftplib.error_temp, 'echo 999')
def test_all_errors(self):
exceptions = (ftplib.error_reply, EOFError)
for x in exceptions:
try:
raise x('exception not included in all_errors set')
except ftplib.all_errors:
pass
def ftp_errors(fs, path=None):
try:
with fs._lock:
yield
except socket.error:
raise errors.RemoteConnectionError(
msg='unable to connect to {}'.format(fs.host)
)
except error_temp as error:
if path is not None:
raise errors.ResourceError(
path,
msg="ftp error on resource '{}' ({})".format(path, error)
)
else:
raise errors.OperationFailed(
msg='ftp error ({})'.format(error)
)
except error_perm as error:
code, message = _parse_ftp_error(error)
if code == 552:
raise errors.InsufficientStorage(
path=path,
msg=message
)
elif code in (501, 550):
raise errors.ResourceNotFound(path=path)
raise errors.PermissionDenied(
msg=message
)
def _open_ftp(self):
"""Open a new ftp object."""
_ftp = FTP()
_ftp.set_debuglevel(0)
with ftp_errors(self):
_ftp.connect(self.host, self.port, self.timeout)
_ftp.login(self.user, self.passwd, self.acct)
self._features = {}
try:
feat_response = _decode(_ftp.sendcmd("FEAT"), 'latin-1')
except error_perm:
self.encoding = 'latin-1'
else:
self._features = self._parse_features(feat_response)
self.encoding = (
'utf-8'
if 'UTF8' in self._features
else 'latin-1'
)
if not PY2:
_ftp.file = _ftp.sock.makefile(
'r',
encoding=self.encoding
)
_ftp.encoding = self.encoding
self._welcome = _ftp.welcome
return _ftp
def upload_listing_map(self, d):
filename = self.abspath(path, '.listing')
fp = StringIO('\n'.join(['%s:%s' % item for item in d.iteritems()]))
try:
self.ftp.storbinary('STOR ' + filename.replace('\\', '/'), fp, 8*1024)
except error_perm:
return 0
return 1
def download(self, filename, target, verbose=True):
if verbose:
sys.stdout.write('downloading %r..' % (filename))
sys.stdout.flush()
fullname = self.abspath(filename)
targetdir = os.path.dirname(target)
if not os.path.exists(targetdir):
os.makedirs(targetdir)
f = open(target, 'wb')
try:
self.ftp.retrbinary('RETR '+fullname.replace('\\', f.write, 8*1024)
except error_perm, msg:
if verbose:
sys.stdout.write('FAILED: %s\n' % (msg))
sys.stdout.flush()
else:
sys.stderr.write('FAILED to download %r: %s\n' % (filename, msg))
sys.stderr.flush()
f.close()
os.remove(target)
return 0
f.close()
self.fix_local_mtime(filename, target)
if verbose:
sys.stdout.write(' ok [%s bytes]\n' % (os.path.getsize(target)))
sys.stdout.flush()
return 1
def upload(self, mk_backup=True, verbose=True):
fullname = self.abspath(filename)
if verbose:
sys.stdout.write('uploading %r [%s]..' % (filename, os.path.getsize(source)))
sys.stdout.flush()
self.makedirs(os.path.dirname(fullname), rm_local_listing=True, verbose=verbose)
if mk_backup:
if verbose:
sys.stdout.write('<creating %r>' % (filename+'.backup'))
sys.stdout.flush()
self.ftp.rename(fullname.replace('\\', fullname.replace('\\', '/') + '.backup')
f = open(source, 'rb')
if verbose:
sys.stdout.write('<storing>')
try:
self.ftp.storbinary('STOR '+fullname.replace('\\', f, msg:
if verbose:
sys.stdout.write('FAILED: %s\n' % (msg))
sys.stdout.flush()
else:
sys.stderr.write('FAILED to upload %r: %s\n' % (filename, msg))
sys.stderr.flush()
f.close()
if mk_backup:
if verbose:
sys.stdout.write('<restoring from %r>' % (filename+'.backup'))
sys.stdout.flush()
self.ftp.rename(fullname.replace('\\', '/') + '.backup', '/'))
return 0
f.close()
if mk_backup:
if verbose:
sys.stdout.write('<cleaning up %r>' % (filename+'.backup'))
sys.stdout.flush()
self.ftp.delete(fullname.replace('\\', '/') + '.backup')
self.fix_local_mtime(filename, source)
if verbose:
sys.stdout.write(' ok\n')
sys.stdout.flush()
return 1
def remove(self, verbose=True):
fullname = self.abspath(filename)
if verbose:
sys.stdout.write('<removing %r..' % (filename))
sys.stdout.flush()
try:
self.ftp.delete(fullname.replace('\\', msg:
if verbose:
sys.stdout.write('FAILED: %s>' % (msg))
sys.stdout.flush()
else:
sys.stderr.write('FAILED to remove %r: %s\n' % (filename, msg))
sys.stderr.flush()
return
dname = os.path.dirname(fullname)
lst = self.ftp.nlst(dname.replace('\\', '/'))
if '.listing' in lst:
listing = os.path.join(dname, '.listing')
if verbose:
sys.stdout.write('<removing %r>' % (listing))
sys.stdout.flush()
self.ftp.delete(listing.replace('\\', '/'))
if verbose:
sys.stdout.write('ok>')
sys.stdout.flush()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。