Python ftplib 模块-error_perm() 实例源码

Python ftplib 模块,error_perm() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用ftplib.error_perm()

项目:code    作者:ActiveState    | 项目源码 | 文件源码
def info(remote):
    try:
        sfmstat = remote.readlines(os.path.join(remote.root, '.sfmstat'))
    except ftplib.error_perm, err:
        if not err[0].startswith('550'):
            log(err, abort=True)
        sfmstat = None
    print
    if sfmstat:
        last_updated, mirror_path = sfmstat[0].split(None, 1)
        last_updated = datetime.datetime.fromtimestamp(float(last_updated))
        print 'Mirror of', mirror_path
        print last_updated.strftime('Last updated on %A,%d. %B %Y at %H:%M:%S')
    else:
        print 'No mirror recognized'
    print
    print 'Content of %s%s:' % (remote.host, remote.root)
    remote.ftp.dir(remote.root)
    print
项目:hoplite    作者:ni    | 项目源码 | 文件源码
def _get_files_in_dir(ftp_session, source, dest, status):
    filelist = ftp_session.nlst()
    for path in filelist:
        local_path = os.path.join(dest, path)
        remote_path = source + path
        try:
            # See if it's a directory
            new_remote_path = source + path + "/"
            ftp_session.cwd(source + path + "/")
            # It is! So we should create the directory on dest
            try:
                os.mkdir(local_path)
            except OSError:
                logger.debug("Dir: {0} already exists".format(local_path))
            logger.debug("Get Folder: {0}".format(new_remote_path))
            _get_files_in_dir(ftp_session, new_remote_path, local_path, status)
        except ftplib.error_perm:
            logger.debug("Get file: {0}".format(path))
            with open(local_path, "wb") as file_handle:
                ftp_session.retrbinary(
                    "RETR " + remote_path, file_handle.write)
项目:hoplite    作者:ni    | 项目源码 | 文件源码
def download_files(ftp_session, status):
    # dest must already exist
    try:
        ftp_session.cwd(source)
    except ftplib.error_perm:
        # invalid entry (ensure input form: "/dir/folder/something/")
        msg = "Could not open the ftp directory: '{0}'".format(source)
        logger.error(msg)
        status.update({"error": msg})
        return
    if not os.path.isdir(dest):
        msg = "Local path does not exist: '{0}'".format(dest)
        logger.error(msg)
        status.update({"error": msg})
        return
    _get_files_in_dir(ftp_session, status)
项目:Team-HK-Project    作者:GrahamMThomas    | 项目源码 | 文件源码
def FtpListCommand(self, directory = '/'):
        ls = []
        if directory[-1] == '/':
            try:
                self.ftpDirectory += directory
                self.ftpDirectory.replace('//','/')
                self.ftp.retrlines('LIST %s' % self.ftpDirectory, ls.append)
            except error_perm, msg:
                errorListingMessage.text = ""
                parsedErrorMessage = (str(msg)).split(" ")[1:]
                for item in parsedErrorMessage:
                    errorListingMessage.text = errorListingMessage.text + str(item) + " "   
                listingErrorPopup.open()
        else:
            self.ftp.retrlines('LIST', ls.append)

        return self.ListParser(ls)
项目:pyfilesystem2    作者:PyFilesystem    | 项目源码 | 文件源码
def test_manager(self):
        mem_fs = open_fs('mem://')

        with self.assertRaises(errors.ResourceError):
            with ftp_errors(mem_fs, path='foo'):
                raise error_temp

        with self.assertRaises(errors.OperationFailed):
            with ftp_errors(mem_fs):
                raise error_temp

        with self.assertRaises(errors.InsufficientStorage):
            with ftp_errors(mem_fs):
                raise error_perm('552 foo')

        with self.assertRaises(errors.ResourceNotFound):
            with ftp_errors(mem_fs):
                raise error_perm('501 foo')

        with self.assertRaises(errors.PermissionDenied):
            with ftp_errors(mem_fs):
                raise error_perm('999 foo')
项目:pyfilesystem2    作者:PyFilesystem    | 项目源码 | 文件源码
def makedir(self, path, permissions=None, recreate=False):
        _path = self.validatepath(path)

        with ftp_errors(self, path=path):
            if _path == '/':
                if recreate:
                    return self.opendir(path)
                else:
                    raise errors.DirectoryExists(path)

            if not (recreate and self.isdir(path)):
                try:
                    self.ftp.mkd(_encode(_path, self.ftp.encoding))
                except error_perm as error:
                    code, _ = _parse_ftp_error(error)
                    if code == 550:
                        if self.isdir(path):
                            raise errors.DirectoryExists(path)
                        else:
                            if self.exists(path):
                                raise errors.DirectoryExists(path)
                    raise errors.ResourceNotFound(path)
        return self.opendir(path)
项目:pyfilesystem2    作者:PyFilesystem    | 项目源码 | 文件源码
def removedir(self, path):
        _path = self.validatepath(path)
        if _path == '/':
            raise errors.RemoveRootError()

        with ftp_errors(self, path):
            try:
                self.ftp.rmd(_encode(_path, self.ftp.encoding))
            except error_perm as error:
                code, _ = _parse_ftp_error(error)
                if code == 550:
                    if self.isfile(path):
                        raise errors.DirectoryExpected(path)
                    if not self.isempty(path):
                        raise errors.DirectoryNotEmpty(path)
                raise  # pragma: no cover
项目:pyfilesystem2    作者:PyFilesystem    | 项目源码 | 文件源码
def getbytes(self, path):
        _path = self.validatepath(path)
        data = io.BytesIO()
        with ftp_errors(self, path):
            with self._manage_ftp() as ftp:
                try:
                    ftp.retrbinary(
                        str("RETR ") + _encode(_path, self.ftp.encoding),
                        data.write
                    )
                except error_perm as error:
                    code, _ = _parse_ftp_error(error)
                    if code == 550:
                        if self.isdir(path):
                            raise errors.FileExpected(path)
                    raise

        data_bytes = data.getvalue()
        return data_bytes
项目:openmv-ide    作者:openmv    | 项目源码 | 文件源码
def get_mtime(self, name):
        """ Return modification time of the file in ftp server.

        If the file does not exist,return None.
        If the name refers to a directory,return 0.

        To get the local modification time,use
        ``get_mtime()-clock_offset``.
        """
        filename = self.abspath(name)
        try:
            resp = self.ftp.sendcmd('MDTM ' + filename.replace('\\', '/'))
        except error_perm, msg:
            s = str(msg)
            if s.startswith('550 I can only retrieve regular files'):
                # filename is directory
                return 0
            if s.startswith('550 Can\'t check for file existence'):
                # file with filename does not exist
                return None
            raise
        assert resp[:3]=='213', `resp,filename`
        modtime = int(time.mktime(time.strptime(resp[3:].strip(), '%Y%m%d%H%M%S')))
        return modtime
项目:foil    作者:portfoliome    | 项目源码 | 文件源码
def ftp_walk(ftpconn: FTP, rootpath=''):
    """Recursively traverse an ftp directory to discovery directory listing."""

    current_directory = rootpath

    try:
        directories, files = directory_listing(ftpconn, current_directory)
    except ftplib.error_perm:
        return

    # Yield before recursion
    yield current_directory, directories, files

    # Recurse into sub-directories
    for name in directories:
        new_path = os.path.join(current_directory, name)

        for entry in ftp_walk(ftpconn, rootpath=new_path):
            yield entry
    else:
        return
项目:Blended    作者:BlendedSiteGenerator    | 项目源码 | 文件源码
def placeFiles(ftp, path):
    """Upload the built files to FTP"""
    for name in os.listdir(path):
        if name != "config.py" and name != "config.pyc" and name != "templates" and name != "content":
            localpath = os.path.join(path, name)
            if os.path.isfile(localpath):
                print("STOR", name, localpath)
                ftp.storbinary('STOR ' + name, open(localpath, 'rb'))
            elif os.path.isdir(localpath):
                print("MKD", name)

                try:
                    ftp.mkd(name)

                # ignore "directory already exists"
                except error_perm as e:
                    if not e.args[0].startswith('550'):
                        raise

                print("CWD", name)
                ftp.cwd(name)
                placeFiles(ftp, localpath)
                print("CWD", "..")
                ftp.cwd("..")
项目:sdk-samples    作者:cradlepoint    | 项目源码 | 文件源码
def test_rnfr_rnto(self):
        # rename file
        tempname = os.path.basename(tempfile.mktemp(dir=HOME))
        self.client.rename(self.tempfile, tempname)
        self.client.rename(tempname, self.tempfile)
        # rename dir
        tempname = os.path.basename(tempfile.mktemp(dir=HOME))
        self.client.rename(self.tempdir, self.tempdir)
        # rnfr/rnto over non-existing paths
        bogus = os.path.basename(tempfile.mktemp(dir=HOME))
        self.assertRaises(ftplib.error_perm, self.client.rename, bogus, '/x')
        self.assertRaises(
            ftplib.error_perm, self.tempfile, u('/'))
        # rnto sent without first specifying the source
        self.assertRaises(ftplib.error_perm, self.client.sendcmd,
                          'rnto ' + self.tempfile)

        # make sure we can't rename root directory
        self.assertRaisesRegex(ftplib.error_perm,
                               "Can't rename home directory",
                               self.client.rename, '/', '/x')
项目:sdk-samples    作者:cradlepoint    | 项目源码 | 文件源码
def test_unforeseen_mdtm_event(self):
        # Emulate a case where the file last modification time is prior
        # to year 1900.  This most likely will never happen unless
        # someone specifically force the last modification time of a
        # file in some way.
        # To do so we temporarily override os.path.getmtime so that it
        # returns a negative value referring to a year prior to 1900.
        # It causes time.localtime/gmtime to raise a ValueError exception
        # which is supposed to be handled by server.

        # On python 3 it seems that the trick of replacing the original
        # method with the lambda doesn't work.
        if not PY3:
            _getmtime = AbstractedFS.getmtime
            try:
                AbstractedFS.getmtime = lambda x, y: -9000000000
                self.assertRaisesRegex(
                    ftplib.error_perm,
                    "550 Can't determine file's last modification time",
                    self.client.sendcmd, 'mdtm ' + self.tempfile)
                # make sure client hasn't been disconnected
                self.client.sendcmd('noop')
            finally:
                AbstractedFS.getmtime = _getmtime
项目:sdk-samples    作者:cradlepoint    | 项目源码 | 文件源码
def test_stou_orphaned_file(self):
        # Check that no orphaned file gets left behind when STOU fails.
        # Even if STOU fails the file is first created and then erased.
        # Since we can't know the name of the file the best way that
        # we have to test this case is comparing the content of the
        # directory before and after STOU has been issued.
        # Assuming that TESTFN is supposed to be a "reserved" file
        # name we shouldn't get false positives.
        safe_remove(TESTFN)
        # login as a limited user in order to make STOU fail
        self.client.login('anonymous', '@nopasswd')
        before = os.listdir(HOME)
        self.assertRaises(ftplib.error_perm,
                          'stou ' + TESTFN)
        after = os.listdir(HOME)
        if before != after:
            for file in after:
                self.assertFalse(file.startswith(TESTFN))
项目:sdk-samples    作者:cradlepoint    | 项目源码 | 文件源码
def test_epsv(self):
        # test wrong proto
        try:
            self.client.sendcmd('epsv ' + self.other_proto)
        except ftplib.error_perm as err:
            self.assertEqual(str(err)[0:3], "522")
        else:
            self.fail("Exception not raised")

        # proto > 2
        self.assertRaises(ftplib.error_perm, 'epsv 3')

        # test connection
        for cmd in ('EPSV', 'EPSV ' + self.proto):
            host, port = ftplib.parse229(self.client.sendcmd(cmd),
                                         self.client.sock.getpeername())
            with contextlib.closing(
                    socket.socket(self.client.af, socket.SOCK_STREAM)) as s:
                s.settimeout(TIMEOUT)
                s.connect((host, port))
                self.client.sendcmd('abor')
项目:sdk-samples    作者:cradlepoint    | 项目源码 | 文件源码
def test_mlst(self):
        # utility function for extracting the line of interest
        def mlstline(cmd):
            return self.client.voidcmd(cmd).split('\n')[1]

        if self.utf8fs:
            self.assertTrue('type=dir' in
                            mlstline('mlst ' + TESTFN_UNICODE))
            self.assertTrue('/' + TESTFN_UNICODE in
                            mlstline('mlst ' + TESTFN_UNICODE))
            self.assertTrue('type=file' in
                            mlstline('mlst ' + TESTFN_UNICODE_2))
            self.assertTrue('/' + TESTFN_UNICODE_2 in
                            mlstline('mlst ' + TESTFN_UNICODE_2))
        else:
            self.assertRaises(ftplib.error_perm,
                              mlstline, 'mlst ' + TESTFN_UNICODE)

    # --- file transfer
项目:sdk-samples    作者:cradlepoint    | 项目源码 | 文件源码
def test_stor(self):
        if self.utf8fs:
            data = b'abcde12345' * 500
            os.remove(TESTFN_UNICODE_2)
            dummy = BytesIO()
            dummy.write(data)
            dummy.seek(0)
            self.client.storbinary('stor ' + TESTFN_UNICODE_2, dummy)
            dummy_recv = BytesIO()
            self.client.retrbinary('retr ' + TESTFN_UNICODE_2,
                                   dummy_recv.write)
            dummy_recv.seek(0)
            self.assertEqual(dummy_recv.read(), data)
        else:
            dummy = BytesIO()
            self.assertRaises(ftplib.error_perm, self.client.storbinary,
                              'stor ' + TESTFN_UNICODE_2, dummy)
项目:sdk-samples    作者:cradlepoint    | 项目源码 | 文件源码
def test_prot(self):
        self.client.login(secure=False)
        msg = "503 PROT not allowed on insecure control connection."
        self.assertRaisesWithMsg(ftplib.error_perm, msg,
                                 self.client.sendcmd, 'prot p')
        self.client.login(secure=True)
        # secured
        self.client.prot_p()
        sock = self.client.transfercmd('list')
        with contextlib.closing(sock):
            while True:
                if not sock.recv(1024):
                    self.client.voidresp()
                    break
            self.assertTrue(isinstance(sock, ssl.SSLSocket))
            # unsecured
            self.client.prot_c()
        sock = self.client.transfercmd('list')
        with contextlib.closing(sock):
            while True:
                if not sock.recv(1024):
                    self.client.voidresp()
                    break
            self.assertFalse(isinstance(sock, ssl.SSLSocket))
项目:tredparse    作者:humanlongevity    | 项目源码 | 文件源码
def ls_ftp(dir):
    from urlparse import urlparse
    from ftplib import FTP, error_perm
    o = urlparse(dir)

    ftp = FTP(o.netloc)
    ftp.login()
    ftp.cwd(o.path)

    files = []
    try:
        files = ftp.nlst()
    except error_perm, resp:
        if str(resp) == "550 No files found":
            print "no files in this directory"
        else:
            raise
    return files
项目:Automation-Framework-for-devices    作者:tok-gogogo    | 项目源码 | 文件源码
def cwd(self,path):
        #-----------------------------------------------------------------------------
        # Name:        cwd fuction
        # param:       path :cd to the path,please write absolute path
        # explain:     cd to this path
        # Author:      gongke
        #
        # Created:     2013/01/13
        #-----------------------------------------------------------------------------
        try:
            self.mftp.cwd(path)
        except ftplib.error_perm:
            #print "cannot cd to this path %s" %path
            print_mes = "cannot cd to this path  " +path 
            print print_mes
            info_public(print_mes)
            self.mftp.quit()
            return False
        #print "cd %s" %path
        print_mes = "cd  " +path 
        print print_mes
        info_public(print_mes)
        return True
项目:transfert    作者:rbernand    | 项目源码 | 文件源码
def open(self, flags):
        if flags in ('r',):
            self._socket = self._conn.transfercmd('RETR %s' % self.url.path)
        elif flags in ('w', 'w+'):
            try:
                self._socket = self._conn.transfercmd('STOR %s' % self.url.path)
            except ftplib.error_perm:
                raise FileExistsError(str(self.url))
项目:transfert    作者:rbernand    | 项目源码 | 文件源码
def size(self):
        try:
            return self._conn.size(self.url.path)
        except ftplib.error_perm as err:
            if 'not a regular file' in str(err):
                return None
            raise FileNotFoundError(self.url)
        except ValueError:
            return None
项目:transfert    作者:rbernand    | 项目源码 | 文件源码
def isdir(self):
        try:
            self._conn.size(self.url.path)
        except ftplib.error_perm as err:
            if "regular" in str(err):
                return True
            return False
项目:transfert    作者:rbernand    | 项目源码 | 文件源码
def delete(self):
        if self.isfile():
            self._conn.delete(self.url.path)
        elif self.isdir():
            try:
                self._conn.rmd(self.url.path)
            except ftplib.error_perm as err:
                raise OSError(err)
        else:
            raise FileNotFoundError(str(self.url))
项目:transfert    作者:rbernand    | 项目源码 | 文件源码
def mkdir(self, name=None):
        try:
            if name is None:
                if self.isdir():
                    return self
                self._conn.mkd(self.url.path)
                return self
            else:
                if self.join(name).isdir():
                    return self.join(name)
                self._conn.mkd(name)
                return self.join(name)
        except ftplib.error_perm as err:
            raise TransfertPermissionError(err)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def retrfile(self, file, type):
        import ftplib
        self.endtransfer()
        if type in ('d', 'D'): cmd = 'TYPE A'; isdir = 1
        else: cmd = 'TYPE ' + type; isdir = 0
        try:
            self.ftp.voidcmd(cmd)
        except ftplib.all_errors:
            self.init()
            self.ftp.voidcmd(cmd)
        conn = None
        if file and not isdir:
            # Try to retrieve as a file
            try:
                cmd = 'RETR ' + file
                conn = self.ftp.ntransfercmd(cmd)
            except ftplib.error_perm, reason:
                if str(reason)[:3] != '550':
                    raise IOError, ('ftp error', reason), sys.exc_info()[2]
        if not conn:
            # Set transfer mode to ASCII!
            self.ftp.voidcmd('TYPE A')
            # Try a directory listing. Verify that directory exists.
            if file:
                pwd = self.ftp.pwd()
                try:
                    try:
                        self.ftp.cwd(file)
                    except ftplib.error_perm, reason:
                        raise IOError, sys.exc_info()[2]
                finally:
                    self.ftp.cwd(pwd)
                cmd = 'LIST ' + file
            else:
                cmd = 'LIST'
            conn = self.ftp.ntransfercmd(cmd)
        self.busy = 1
        # Pass back both a suitably decorated object and a retrieval length
        return (addclosehook(conn[0].makefile('rb'),
                             self.endtransfer), conn[1])
项目:mygene.info    作者:biothings    | 项目源码 | 文件源码
def get_ftpfile_lastmodified(self, file_path):
        '''return lastmodified for a given file on ftp server.'''
        try:
            response = self.client.sendcmd('MDTM ' + file_path)
        except error_perm as e:
            self.logger.debug("Skip %s: %s" % (file_path,e))
            return None
        code, lastmodified = response.split()
        # an example: 'last-modified': '20121128150000'
        lastmodified = int(time.mktime(datetime.strptime(lastmodified, '%Y%m%d%H%M%S').timetuple()))
        return lastmodified
项目:core-python    作者:yidao620c    | 项目源码 | 文件源码
def main():
    try:
        f = ftplib.FTP(HOST)
    except (socket.error, socket.gaierror) as e:
        print('ERROR: connot reach "%s"' % HOST)
        exit(1)
    print('*** Connected to host "%s"' % HOST)

    try:
        f.login()
    except ftplib.error_perm:
        print('ERROR: connot login anonymously')
        f.quit()
        exit(1)
    print('*** Logged in as "anonymous"')

    try:
        f.cwd(DIRN)
    except ftplib.error_perm:
        print('ERROR: cannot cd to "%s"' % DIRN)
        f.quit()
        exit(1)
    print('*** Changed to "%s" folder' % DIRN)

    try:
        locFile = open(FILE, 'wb')
        f.retrbinary('RETR %s' % FILE, locFile.write)
    except ftplib.error_perm:
        print('ERROR: cannot read file "%s"' % FILE)
        os.unlink(FILE)
    else:
        print('*** Downloaded "%s" to CWD' % FILE)
    finally:
        locFile.close()
    f.quit()
    return
项目:ftpscout    作者:RubenRocha    | 项目源码 | 文件源码
def get_directory_listing(ftp_con):
    try:
        files = ftp_con.nlst()
        if "." in files: files.remove(".")
        if ".." in files: files.remove("..")

        rnd_files = random.sample(set(files), 3)

        return "{} files listed - sample of files: {}".format(len(files), rnd_files)
    except ftplib.error_perm as resp:
        if "550" in resp:
            return "No files"
    finally:
        con.quit()
        con.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_exceptions(self):
        self.assertRaises(ftplib.error_temp, 'echo 400')
        self.assertRaises(ftplib.error_temp, 'echo 499')
        self.assertRaises(ftplib.error_perm, 'echo 500')
        self.assertRaises(ftplib.error_perm, 'echo 599')
        self.assertRaises(ftplib.error_proto, 'echo 999')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_all_errors(self):
        exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
                      ftplib.error_proto, ftplib.Error, IOError, EOFError)
        for x in exceptions:
            try:
                raise x('exception not included in all_errors set')
            except ftplib.all_errors:
                pass
项目:inotiftpsync    作者:Major1201    | 项目源码 | 文件源码
def mkdir_parent(self, dirname):
        from ftplib import error_perm
        from os.path import sep
        dirname = strings.rtrim(dirname, sep)
        dir_arr = dirname.split(sep)
        for i in range(2 if dirname.startswith("/") else 1, len(dir_arr) + 1):
            try:
                self.mkdir(sep.join(dir_arr[:i]))
            except error_perm:
                pass
            except:
                logger.error_traceback()
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def poke(self, context):
        with self._create_hook() as hook:
            self.log.info('Poking for %s', self.path)
            try:
                hook.get_mod_time(self.path)
            except ftplib.error_perm as e:
                error = str(e).split(None, 1)
                if error[1] != "Can't check for file existence":
                    raise e

                return False

            return True
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def test_poke(self):
        op = FTPSensor(path="foobar.json", ftp_conn_id="bob_ftp",
                       task_id="test_task")

        self.hook_mock.get_mod_time.side_effect = \
            [error_perm("550: Can't check for file existence"), None]

        self.assertFalse(op.poke(None))
        self.assertTrue(op.poke(None))
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def test_poke_fails_due_error(self):
        op = FTPSensor(path="foobar.json",
                       task_id="test_task")

        self.hook_mock.get_mod_time.side_effect = \
            error_perm("530: Login authentication failed")

        with self.assertRaises(error_perm) as context:
            op.execute(None)

        self.assertTrue("530" in str(context.exception))
项目:poc    作者:Chinalover    | 项目源码 | 文件源码
def I1ii11iIi11i(host, port, user, pwd):
    I1IiI = False
    for o0OOO in range(10):
        iIiiiI = False
        try:
            if 0:
                iii1II11ii * i11iII1iiI + iI1Ii11111iIi + ii1II11I1ii1I + oO0o0ooO0 - iiIIIII1i1iI
            debug(decode('\xedP\xb6\x15\x8f\x82\xd5\xed\x92\x95 \xfaA\xe9\xd6\xa4k\x92\xe8\xb9\xea\xd0X3\xa4)'), o0OOO, pwd, host, port)
            o0oO0 = ftplib.FTP(timeout=10)
            o0oO0.connect(host, port)
            o0oO0.login(user, pwd)
            o0oO0.quit()
            o0oO0.close()
            I1IiI = True
        except Exception as oo00:
            if type(oo00) is ftplib.error_perm:
                if 0:
                    O0Oo0oO0o.II1iI.i1iIii1Ii1II
            else:
                if 0:
                    O0Oooo00
                if 0:
                    i1IIi11111i / iIiiiI1IiI1I1 % II1iI * II1iI * O0Oo0oO0o / IIii1I
                iIiiiI = True
        else:
            if 0:
                II1iI / iIiiiI1IiI1I1 + O00ooooo00 % II1.i1iIii1Ii1II / i1IIi11111i
        finally:
            if not iIiiiI:
                return I1IiI

    return
    if 0:
        i1IIi11111i + O0Oooo00 - i1IIi11111i.II1
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_exceptions(self):
        self.assertRaises(ftplib.error_temp, 'echo 999')
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_all_errors(self):
        exceptions = (ftplib.error_reply, EOFError)
        for x in exceptions:
            try:
                raise x('exception not included in all_errors set')
            except ftplib.all_errors:
                pass
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_exceptions(self):
        self.assertRaises(ftplib.error_temp, 'echo 999')
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_all_errors(self):
        exceptions = (ftplib.error_reply, EOFError)
        for x in exceptions:
            try:
                raise x('exception not included in all_errors set')
            except ftplib.all_errors:
                pass
项目:A-Z-of-Python    作者:crvaden    | 项目源码 | 文件源码
def ftp_command(ftp):
    while True:  # Run until 'exit' command is received from user
        command = input('Enter a command: ')  # get next command from user
        commands = command.split()  # split command and file/directory into list
        if commands[0] == 'cd':  # Change directory
            try:
                ftp.cwd(commands[1])
                print('Directory listing for', ftp.pwd())
                ftp.dir()
                print('Current Directory', ftp.pwd())
            except ftplib.error_perm as e:  # Handle 550 (not found / no permission error)
                error_code = str(e).split(None, 1)
                if error_code[0] == '550':
                    print(error_code[1], 'Directory may not exist or you may not have permission to view it.')
        elif commands[0] == 'get':  # Download file
            try:
                ftp.retrbinary('RETR ' + commands[1], open(commands[1], 'wb').write)
                print('File successfully downloaded.')
            except ftplib.error_perm as e: # Handle 550 (not found / no permission error)
                error_code = str(e).split(None, 'File may not exist or you may not have permission to view it.')
        elif commands[0] == 'ls':  # Print directory listing
            print('Directory listing for', ftp.pwd())
            ftp.dir()
        elif commands[0] == 'exit':  # Exit application
            ftp.quit()
            print('Goodbye.')
            break
        else:
            print('Invalid command,try again (valid options: cd/get/exit).')
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_exceptions(self):
        self.assertRaises(ftplib.error_temp, 'echo 999')
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_all_errors(self):
        exceptions = (ftplib.error_reply, EOFError)
        for x in exceptions:
            try:
                raise x('exception not included in all_errors set')
            except ftplib.all_errors:
                pass
项目:pyfilesystem2    作者:PyFilesystem    | 项目源码 | 文件源码
def ftp_errors(fs, path=None):
    try:
        with fs._lock:
            yield
    except socket.error:
        raise errors.RemoteConnectionError(
            msg='unable to connect to {}'.format(fs.host)
        )
    except error_temp as error:
        if path is not None:
            raise errors.ResourceError(
                path,
                msg="ftp error on resource '{}' ({})".format(path, error)
            )
        else:
            raise errors.OperationFailed(
                msg='ftp error ({})'.format(error)
            )
    except error_perm as error:
        code, message = _parse_ftp_error(error)
        if code == 552:
            raise errors.InsufficientStorage(
                path=path,
                msg=message
            )
        elif code in (501, 550):
            raise errors.ResourceNotFound(path=path)
        raise errors.PermissionDenied(
            msg=message
        )
项目:pyfilesystem2    作者:PyFilesystem    | 项目源码 | 文件源码
def _open_ftp(self):
        """Open a new ftp object."""
        _ftp = FTP()
        _ftp.set_debuglevel(0)
        with ftp_errors(self):
            _ftp.connect(self.host, self.port, self.timeout)
            _ftp.login(self.user, self.passwd, self.acct)
            self._features = {}
            try:
                feat_response = _decode(_ftp.sendcmd("FEAT"), 'latin-1')
            except error_perm:
                self.encoding = 'latin-1'
            else:
                self._features = self._parse_features(feat_response)
                self.encoding = (
                    'utf-8'
                    if 'UTF8' in self._features
                    else 'latin-1'
                )
                if not PY2:
                    _ftp.file = _ftp.sock.makefile(
                        'r',
                        encoding=self.encoding
                    )
        _ftp.encoding = self.encoding
        self._welcome = _ftp.welcome
        return _ftp
项目:openmv-ide    作者:openmv    | 项目源码 | 文件源码
def upload_listing_map(self, d):
        filename = self.abspath(path, '.listing')
        fp = StringIO('\n'.join(['%s:%s' % item for item in d.iteritems()]))
        try:
            self.ftp.storbinary('STOR ' + filename.replace('\\', '/'), fp, 8*1024)
        except error_perm:
            return 0
        return 1
项目:openmv-ide    作者:openmv    | 项目源码 | 文件源码
def download(self, filename, target, verbose=True):
        if verbose:
            sys.stdout.write('downloading %r..' % (filename))
            sys.stdout.flush()
        fullname = self.abspath(filename)
        targetdir = os.path.dirname(target)
        if not os.path.exists(targetdir):
            os.makedirs(targetdir)
        f = open(target, 'wb')
        try:
            self.ftp.retrbinary('RETR '+fullname.replace('\\', f.write, 8*1024)
        except error_perm, msg:
            if verbose:
                sys.stdout.write('FAILED: %s\n' % (msg))
                sys.stdout.flush()
            else:
                sys.stderr.write('FAILED to download %r: %s\n' % (filename, msg))
                sys.stderr.flush()
            f.close()
            os.remove(target)
            return 0
        f.close()
        self.fix_local_mtime(filename, target)
        if verbose:
            sys.stdout.write(' ok [%s bytes]\n' % (os.path.getsize(target)))
            sys.stdout.flush()
        return 1
项目:openmv-ide    作者:openmv    | 项目源码 | 文件源码
def upload(self, mk_backup=True, verbose=True):
        fullname = self.abspath(filename)
        if verbose:
            sys.stdout.write('uploading %r [%s]..' % (filename, os.path.getsize(source)))
            sys.stdout.flush()
        self.makedirs(os.path.dirname(fullname), rm_local_listing=True, verbose=verbose)
        if mk_backup:
            if verbose:
                sys.stdout.write('<creating %r>' % (filename+'.backup'))
                sys.stdout.flush()
            self.ftp.rename(fullname.replace('\\', fullname.replace('\\', '/') + '.backup')
        f = open(source, 'rb')
        if verbose:
            sys.stdout.write('<storing>')
        try:
            self.ftp.storbinary('STOR '+fullname.replace('\\', f, msg:
            if verbose:
                sys.stdout.write('FAILED: %s\n' % (msg))
                sys.stdout.flush()
            else:
                sys.stderr.write('FAILED to upload %r: %s\n' % (filename, msg))
                sys.stderr.flush()
            f.close()
            if mk_backup:
                if verbose:
                    sys.stdout.write('<restoring from %r>' % (filename+'.backup'))
                    sys.stdout.flush()
                self.ftp.rename(fullname.replace('\\', '/') + '.backup', '/'))
            return 0
        f.close()
        if mk_backup:
            if verbose:
                sys.stdout.write('<cleaning up %r>' % (filename+'.backup'))
                sys.stdout.flush()
            self.ftp.delete(fullname.replace('\\', '/') + '.backup')
        self.fix_local_mtime(filename, source)
        if verbose:
            sys.stdout.write(' ok\n')
            sys.stdout.flush()
        return 1
项目:openmv-ide    作者:openmv    | 项目源码 | 文件源码
def remove(self, verbose=True):
        fullname = self.abspath(filename)
        if verbose:
            sys.stdout.write('<removing %r..' % (filename))
            sys.stdout.flush()
        try:
            self.ftp.delete(fullname.replace('\\', msg:
            if verbose:
                sys.stdout.write('FAILED: %s>' % (msg))
                sys.stdout.flush()
            else:
                sys.stderr.write('FAILED to remove %r: %s\n' % (filename, msg))
                sys.stderr.flush()
            return
        dname = os.path.dirname(fullname)
        lst = self.ftp.nlst(dname.replace('\\', '/'))
        if '.listing' in lst:
            listing = os.path.join(dname, '.listing')
            if verbose:
                sys.stdout.write('<removing %r>' % (listing))
                sys.stdout.flush()
            self.ftp.delete(listing.replace('\\', '/'))
        if verbose:
            sys.stdout.write('ok>')
            sys.stdout.flush()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Python ftplib 模块,error_proto() 实例源码 我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用ftplib.error_proto()。
Python ftplib 模块,Error() 实例源码 我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用ftplib.Error()。
Python ftplib 模块,error_reply() 实例源码 我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用ftplib.error_reply()。
Python ftplib 模块,error_temp() 实例源码 我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用ftplib.error_temp()。
Python ftplib 模块,FTP_TLS 实例源码 我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用ftplib.FTP_TLS。
Python ftplib 模块,FTP_PORT 实例源码 我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用ftplib.FTP_PORT。
Python ftplib 模块,all_errors() 实例源码 我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ftplib.all_errors()。
Python ftplib 模块,error_perm() 实例源码 我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用ftplib.error_perm()。
Python ftplib 模块,FTP 实例源码 我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ftplib.FTP。
Python sklearn.base 模块,is_classifier() 实例源码 我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用sklearn.base.is_classifier()。
Python sklearn.base 模块,ClusterMixin() 实例源码 我们从Python开源项目中,提取了以下3个代码示例,用于说明如何使用sklearn.base.ClusterMixin()。
Python sklearn.base 模块,RegressorMixin() 实例源码 我们从Python开源项目中,提取了以下3个代码示例,用于说明如何使用sklearn.base.RegressorMixin()。
Python sklearn.base 模块,clone() 实例源码 我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sklearn.base.clone()。
Python sklearn.base 模块,ClassifierMixin() 实例源码 我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用sklearn.base.ClassifierMixin()。
Python sklearn.base 模块,TransformerMixin() 实例源码 我们从Python开源项目中,提取了以下3个代码示例,用于说明如何使用sklearn.base.TransformerMixin()。
Python sklearn.base 模块,BaseEstimator() 实例源码 我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用sklearn.base.BaseEstimator()。
Python colorama.Fore 模块,LIGHTCYAN_EX 实例源码 我们从Python开源项目中,提取了以下5个代码示例,用于说明如何使用colorama.Fore.LIGHTCYAN_EX。
Python colorama.Fore 模块,LIGHTMAGENTA_EX 实例源码 我们从Python开源项目中,提取了以下4个代码示例,用于说明如何使用colorama.Fore.LIGHTMAGENTA_EX。
Python colorama.Fore 模块,LIGHTBLUE_EX 实例源码 我们从Python开源项目中,提取了以下4个代码示例,用于说明如何使用colorama.Fore.LIGHTBLUE_EX。
Python colorama.Fore 模块,LIGHTWHITE_EX 实例源码 我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用colorama.Fore.LIGHTWHITE_EX。