Python ftplib 模块,error_temp() 实例源码
我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用ftplib.error_temp()。
def close(self):
if not self.closed:
with self._lock:
try:
if self._write_conn is not None:
self._write_conn.close()
self._write_conn = None
if self._read_conn is not None:
self._read_conn.close()
self._read_conn = None
try:
self.ftp.quit()
except error_temp: # pragma: nocover
pass
finally:
super(FTPFile, self).close()
def test_max_connections_per_ip(self):
# Test FTPServer.max_cons_per_ip attribute
with self.server.lock:
self.server.server.max_cons_per_ip = 3
self.client.quit()
c1 = self.client_class()
c2 = self.client_class()
c3 = self.client_class()
c4 = self.client_class()
try:
c1.connect(self.server.host, self.server.port)
c2.connect(self.server.host, self.server.port)
c3.connect(self.server.host, self.server.port)
self.assertRaises(ftplib.error_temp, c4.connect, self.server.host,
self.server.port)
# Make sure client has been disconnected.
# socket.error (Windows) or EOFError (Linux) exception is
# supposed to be raised in such a case.
self.assertRaises((socket.error, EOFError), c4.sendcmd, 'noop')
finally:
for c in (c1, c2, c3, c4):
try:
c.quit()
except (socket.error, EOFError): # already disconnected
c.close()
def test_active_conn_error(self):
# we open a socket() but avoid to invoke accept() to
# reproduce this error condition:
# http://code.google.com/p/pyftpdlib/source/detail?r=905
with contextlib.closing(socket.socket()) as sock:
sock.bind((HOST, 0))
port = sock.getsockname()[1]
self.client.sock.settimeout(.1)
try:
resp = self.client.sendport(HOST, port)
except ftplib.error_temp as err:
self.assertEqual(str(err)[:3], '425')
except (socket.timeout, getattr(ssl, "SSLError", object())):
pass
else:
self.assertNotEqual(str(resp)[:3], '200')
def list(self, dir, skip_mtime=False):
month_to_int = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4,
'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8, 'Sep': 9,
'Oct': 10, 'Nov': 11, 'Dec': 12}
try:
buffer = []
self.ftp.dir('-a ', buffer.append)
except ftplib.error_temp:
buffer = []
self.ftp.dir(dir, buffer.append)
dirs = []
files = {}
for line in buffer:
cols = line.split(None, 8)
name = os.path.split(cols[8])[1]
if cols[0] == 'total' or name in ('.', '..'):
continue
if cols[0].startswith('d'):
dirs.append(name)
else:
if skip_mtime:
mtime = 0
else:
month = month_to_int[cols[5]]
day = int(cols[6])
if cols[7].find(':') == -1:
year = int(cols[7])
hour = minute = 0
else:
year = datetime.date.today().year
hour, minute = [int(s) for s in cols[7].split(':')]
mtime = datetime.datetime(year, month, day, hour, minute)
mtime = int(time.mktime(mtime.timetuple()))
size = int(cols[4])
files[name] = {
'size': size,
'mtime': mtime,
}
return (dirs, files)
def test_exceptions(self):
self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
self.assertRaises(ftplib.error_temp, 'echo 499')
self.assertRaises(ftplib.error_perm, 'echo 500')
self.assertRaises(ftplib.error_perm, 'echo 599')
self.assertRaises(ftplib.error_proto, 'echo 999')
def test_all_errors(self):
exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
ftplib.error_proto, ftplib.Error, IOError, EOFError)
for x in exceptions:
try:
raise x('exception not included in all_errors set')
except ftplib.all_errors:
pass
def test_exceptions(self):
self.assertRaises(ftplib.error_temp, 'echo 999')
def test_all_errors(self):
exceptions = (ftplib.error_reply, EOFError)
for x in exceptions:
try:
raise x('exception not included in all_errors set')
except ftplib.all_errors:
pass
def test_exceptions(self):
self.assertRaises(ftplib.error_temp, 'echo 999')
def test_all_errors(self):
exceptions = (ftplib.error_reply, EOFError)
for x in exceptions:
try:
raise x('exception not included in all_errors set')
except ftplib.all_errors:
pass
def test_exceptions(self):
self.assertRaises(ftplib.error_temp, 'echo 999')
def test_all_errors(self):
exceptions = (ftplib.error_reply, EOFError)
for x in exceptions:
try:
raise x('exception not included in all_errors set')
except ftplib.all_errors:
pass
def ftp_errors(fs, path=None):
try:
with fs._lock:
yield
except socket.error:
raise errors.RemoteConnectionError(
msg='unable to connect to {}'.format(fs.host)
)
except error_temp as error:
if path is not None:
raise errors.ResourceError(
path,
msg="ftp error on resource '{}' ({})".format(path, error)
)
else:
raise errors.OperationFailed(
msg='ftp error ({})'.format(error)
)
except error_perm as error:
code, message = _parse_ftp_error(error)
if code == 552:
raise errors.InsufficientStorage(
path=path,
msg=message
)
elif code in (501, 550):
raise errors.ResourceNotFound(path=path)
raise errors.PermissionDenied(
msg=message
)
def test_exceptions(self):
self.assertRaises(ftplib.error_temp, 'echo 999')
def test_all_errors(self):
exceptions = (ftplib.error_reply, EOFError)
for x in exceptions:
try:
raise x('exception not included in all_errors set')
except ftplib.all_errors:
pass
def test_exceptions(self):
self.assertRaises(ftplib.error_temp, 'echo 999')
def test_all_errors(self):
exceptions = (ftplib.error_reply, OSError, EOFError)
for x in exceptions:
try:
raise x('exception not included in all_errors set')
except ftplib.all_errors:
pass
def test_exceptions(self):
self.assertRaises(ftplib.error_temp, 'echo 999')
def test_all_errors(self):
exceptions = (ftplib.error_reply, EOFError)
for x in exceptions:
try:
raise x('exception not included in all_errors set')
except ftplib.all_errors:
pass
def test_stou_rest(self):
# Watch for STOU preceded by REST,which makes no sense.
self.client.sendcmd('type i')
self.client.sendcmd('rest 10')
self.assertRaisesRegex(ftplib.error_temp, "Can't STOU while REST",
self.client.sendcmd, 'stou')
def test_appe_rest(self):
# Watch for APPE preceded by REST, "Can't APPE while REST", 'appe x')
def test_max_connections(self):
# Test FTPServer.max_cons attribute
with self.server.lock:
self.server.server.max_cons = 3
self.client.quit()
c1 = self.client_class()
c2 = self.client_class()
c3 = self.client_class()
try:
c1.connect(self.server.host, c3.connect,
self.server.port)
# with passive data channel established
c2.quit()
c1.login(USER, PASSWD)
c1.makepasv()
self.assertRaises(ftplib.error_temp, c2.connect,
self.server.port)
# with passive data socket waiting for connection
c1.login(USER, PASSWD)
c1.sendcmd('pasv')
self.assertRaises(ftplib.error_temp,
self.server.port)
# with active data channel established
c1.login(USER, PASSWD)
with contextlib.closing(c1.makeport()):
self.assertRaises(
ftplib.error_temp,
self.server.port)
finally:
for c in (c1, c3):
try:
c.quit()
except (socket.error, EOFError): # already disconnected
c.close()
def test_on_incomplete_file_received(self):
_file = []
class TestHandler(FTPHandler):
def on_incomplete_file_received(self, file):
_file.append(file)
self._setUp(TestHandler)
data = b'abcde12345' * 100000
self.dummyfile.write(data)
self.dummyfile.seek(0)
with contextlib.closing(
self.client.transfercmd('stor ' + TESTFN)) as conn:
bytes_sent = 0
while True:
chunk = self.dummyfile.read(BUFSIZE)
conn.sendall(chunk)
bytes_sent += len(chunk)
# stop transfer while it isn't finished yet
if bytes_sent >= INTERRUPTED_TRANSF_SIZE or not chunk:
self.client.putcmd('abor')
break
self.assertRaises(ftplib.error_temp, self.client.getresp) # 426
self.client.quit() # prevent race conditions
call_until(lambda: _file, "ret == [os.path.abspath(TESTFN)]")
def test_exceptions(self):
self.assertRaises(ftplib.error_temp, 'echo 999')
def test_all_errors(self):
exceptions = (ftplib.error_reply, EOFError)
for x in exceptions:
try:
raise x('exception not included in all_errors set')
except ftplib.all_errors:
pass
def main_run(self, args):
"""
.. py:attribute:: main_run()
Run threads by passing a leading directory to `traverse_branch` function.
:param args: a tuple contain root and another tuple contains base and
leadings. The root is the path of parent directory (assigned to a process)
base is a tuple contain the path of sub-directory and file names that are
associated with.
:type args: iterable
:rtype: None
"""
if self.resume:
root, leadings = args
base = ['/']
else:
root, (base, leadings) = args
print ('---' * 5, datetime.now(), '{}'.format(root), '---' * 5)
try:
# base,leadings = self.find_leading(root)
# print("base and leadings for {} --> {},{}".format(root,base,leadings))
if not self.resume:
leadings = [(ospath.join('/', root, i.strip('/')), root) for i in leadings]
if leadings:
pool = ThreadPool()
pool.map(self.traverse_branch, leadings)
pool.close()
pool.join()
else:
self.all_path.put(base[0])
except (ftplib.error_temp, socket.gaierror) as exp:
print(exp)
def _retreive_file_lines(self, filename_format, station, year):
string = BytesIO()
if self.ftp is None:
self.ftp = self._get_ftp_connection()
for station_id in self._get_potential_station_ids(station):
filename = filename_format.format(station=station_id, year=year)
try:
self.ftp.retrbinary('RETR {}'.format(filename), string.write)
except (IOError, ftplib.error_perm) as e1:
logger.warn(
"Failed FTP RETR for station {}: {}."
" Not attempting reconnect."
.format(station_id, e1)
)
except (ftplib.error_temp, EOFError) as e2:
# Bad connection. attempt to reconnect.
logger.warn(
"Failed FTP RETR for station {}: {}."
" Attempting reconnect."
.format(station_id, e2)
)
self.ftp.close()
self.ftp = self._get_ftp_connection()
try:
self.ftp.retrbinary('RETR {}'.format(filename),
string.write)
except (IOError, ftplib.error_perm) as e3:
logger.warn(
"Failed FTP RETR for station {}: {}."
" Trying another station id."
.format(station_id, e3)
)
else:
break
else:
break
logger.info(
'Successfully retrieved ftp://ftp.ncdc.noaa.gov{}'
.format(filename)
)
string.seek(0)
f = gzip.GzipFile(fileobj=string)
lines = f.readlines()
string.close()
return lines
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。