Python ftplib 模块,FTP_PORT 实例源码
我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用ftplib.FTP_PORT。
def ftp_open(self, req):
import ftplib
import mimetypes
host = req.get_host()
if not host:
raise URLError('ftp error: no host given')
host, port = splitport(host)
if port is None:
port = ftplib.FTP_PORT
else:
port = int(port)
# username/password handling
user, host = splituser(host)
if user:
user, passwd = splitpasswd(user)
else:
passwd = None
host = unquote(host)
user = user or ''
passwd = passwd or ''
try:
host = socket.gethostbyname(host)
except socket.error, msg:
raise URLError(msg)
path, attrs = splitattr(req.get_selector())
dirs = path.split('/')
dirs = map(unquote, dirs)
dirs, file = dirs[:-1], dirs[-1]
if dirs and not dirs[0]:
dirs = dirs[1:]
try:
fw = self.connect_ftp(user, passwd, host, port, dirs, req.timeout)
type = file and 'I' or 'D'
for attr in attrs:
attr, value = splitvalue(attr)
if attr.lower() == 'type' and \
value in ('a', 'A', 'i', 'I', 'd', 'D'):
type = value.upper()
fp, retrlen = fw.retrfile(file, type)
headers = ""
mtype = mimetypes.guess_type(req.get_full_url())[0]
if mtype:
headers += "Content-type: %s\n" % mtype
if retrlen is not None and retrlen >= 0:
headers += "Content-length: %d\n" % retrlen
sf = StringIO(headers)
headers = mimetools.Message(sf)
return addinfourl(fp, headers, req.get_full_url())
except ftplib.all_errors, msg:
raise URLError, ('ftp error: %s' % msg), sys.exc_info()[2]
def open_ftp(self, url):
"""Use FTP protocol."""
if not isinstance(url, str):
raise IOError, ('ftp error', 'proxy support for ftp protocol currently not implemented')
import mimetypes, mimetools
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
host, path = splithost(url)
if not host: raise IOError, 'no host given')
host, port = splitport(host)
user, host = splituser(host)
if user: user, passwd = splitpasswd(user)
else: passwd = None
host = unquote(host)
user = user or ''
passwd = passwd or ''
host = socket.gethostbyname(host)
if not port:
import ftplib
port = ftplib.FTP_PORT
else:
port = int(port)
path, attrs = splitattr(path)
path = unquote(path)
dirs = path.split('/')
dirs, dirs[-1]
if dirs and not dirs[0]: dirs = dirs[1:]
if dirs and not dirs[0]: dirs[0] = '/'
key = user, '/'.join(dirs)
# XXX thread unsafe!
if len(self.ftpcache) > MAXFTPCACHE:
# Prune the cache,rather arbitrarily
for k in self.ftpcache.keys():
if k != key:
v = self.ftpcache[k]
del self.ftpcache[k]
v.close()
try:
if not key in self.ftpcache:
self.ftpcache[key] = \
ftpwrapper(user, dirs)
if not file: type = 'D'
else: type = 'I'
for attr in attrs:
attr, 'D'):
type = value.upper()
(fp, retrlen) = self.ftpcache[key].retrfile(file, type)
mtype = mimetypes.guess_type("ftp:" + url)[0]
headers = ""
if mtype:
headers += "Content-Type: %s\n" % mtype
if retrlen is not None and retrlen >= 0:
headers += "Content-Length: %d\n" % retrlen
headers = mimetools.Message(StringIO(headers))
return addinfourl(fp, "ftp:" + url)
except ftperrors(), msg:
raise IOError, msg), sys.exc_info()[2]
def ftp_open(self, req):
import ftplib
import mimetypes
host = req.host
if not host:
raise URLError('ftp error: no host given')
host, passwd = splitpasswd(user)
else:
passwd = None
host = unquote(host)
user = user or ''
passwd = passwd or ''
try:
host = socket.gethostbyname(host)
except socket.error as msg:
raise URLError(msg)
path, attrs = splitattr(req.selector)
dirs = path.split('/')
dirs = list(map(unquote, dirs))
dirs, type)
headers = ""
mtype = mimetypes.guess_type(req.full_url)[0]
if mtype:
headers += "Content-type: %s\n" % mtype
if retrlen is not None and retrlen >= 0:
headers += "Content-length: %d\n" % retrlen
headers = email.message_from_string(headers)
return addinfourl(fp, req.full_url)
except ftplib.all_errors as exp:
exc = URLError('ftp error: %r' % exp)
raise_with_traceback(exc)
def open_ftp(self, str):
raise URLError('ftp error: proxy support for ftp protocol currently not implemented')
import mimetypes
host, path = splithost(url)
if not host: raise URLError('ftp error: no host given')
host, passwd = splitpasswd(user)
else: passwd = None
host = unquote(host)
user = unquote(user or '')
passwd = unquote(passwd or '')
host = socket.gethostbyname(host)
if not port:
import ftplib
port = ftplib.FTP_PORT
else:
port = int(port)
path,rather arbitrarily
for k in self.ftpcache.keys():
if k != key:
v = self.ftpcache[k]
del self.ftpcache[k]
v.close()
try:
if key not in self.ftpcache:
self.ftpcache[key] = \
ftpwrapper(user, type)
mtype = mimetypes.guess_type("ftp:" + url)[0]
headers = ""
if mtype:
headers += "Content-Type: %s\n" % mtype
if retrlen is not None and retrlen >= 0:
headers += "Content-Length: %d\n" % retrlen
headers = email.message_from_string(headers)
return addinfourl(fp, "ftp:" + url)
except ftperrors() as exp:
raise_with_traceback(URLError('ftp error %r' % exp))
def ftp_open(self, req.full_url)
except ftplib.all_errors as msg:
exc = URLError('ftp error: %s' % msg)
raise exc.with_traceback(sys.exc_info()[2])
def open_ftp(self, str):
raise URLError('ftp error', 'proxy support for ftp protocol currently not implemented')
import mimetypes
from io import StringIO
host, path = splithost(url)
if not host: raise URLError('ftp error', "ftp:" + url)
except ftperrors() as msg:
raise URLError('ftp error', msg).with_traceback(sys.exc_info()[2])
def ftp_open(self, req.full_url)
except ftplib.all_errors as exp:
exc = URLError('ftp error: %r' % exp)
raise_with_traceback(exc)
def open_ftp(self, "ftp:" + url)
except ftperrors() as exp:
raise_with_traceback(URLError('ftp error %r' % exp))
def ftp_open(self, req.full_url)
except ftplib.all_errors as exp:
exc = URLError('ftp error: %r' % exp)
raise_with_traceback(exc)
def open_ftp(self, "ftp:" + url)
except ftperrors() as exp:
raise_with_traceback(URLError('ftp error %r' % exp))
def ftp_open(self, passwd = splitpasswd(user)
else:
passwd = None
host = unquote(host)
user = unquote(user or '')
passwd = unquote(passwd or '')
try:
host = socket.gethostbyname(host)
except socket.error, sys.exc_info()[2]
def ftp_open(self, req.full_url)
except ftplib.all_errors as exp:
exc = URLError('ftp error: %r' % exp)
raise_with_traceback(exc)
def open_ftp(self, "ftp:" + url)
except ftperrors() as exp:
raise_with_traceback(URLError('ftp error %r' % exp))
def ftp_open(self, sys.exc_info()[2]
def ftp_open(self, req.full_url)
except ftplib.all_errors as exp:
exc = URLError('ftp error: %r' % exp)
raise_with_traceback(exc)
def open_ftp(self, "ftp:" + url)
except ftperrors() as exp:
raise_with_traceback(URLError('ftp error %r' % exp))
def ftp_open(self, req.full_url)
except ftplib.all_errors as exp:
exc = URLError('ftp error: %r' % exp)
raise_with_traceback(exc)
def open_ftp(self, "ftp:" + url)
except ftperrors() as exp:
raise_with_traceback(URLError('ftp error %r' % exp))
def ftp_open(self, passwd = splitpasswd(user)
else:
passwd = None
host = unquote(host)
user = user or ''
passwd = passwd or ''
try:
host = socket.gethostbyname(host)
except OSError as msg:
raise URLError(msg)
path, req.full_url)
except ftplib.all_errors as exp:
exc = URLError('ftp error: %r' % exp)
raise exc.with_traceback(sys.exc_info()[2])
def open_ftp(self,rather arbitrarily
for k in list(self.ftpcache):
if k != key:
v = self.ftpcache[k]
del self.ftpcache[k]
v.close()
try:
if key not in self.ftpcache:
self.ftpcache[key] = \
ftpwrapper(user, "ftp:" + url)
except ftperrors() as exp:
raise URLError('ftp error %r' % exp).with_traceback(sys.exc_info()[2])
def open_ftp(self,rather arbitrarily
for k in self.ftpcache.keys():
if k != key:
v = self.ftpcache[k]
del self.ftpcache[k]
v.close()
try:
if not key in self.ftpcache:
self.ftpcache[key] = ftpwrapper(user, value = splitvalue(attr)
if attr.lower() == 'type' and value in ('a', sys.exc_info()[2]
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。