Python利用Selenium模拟浏览器自动操作

发布时间:2020-11-19 发布网站:编程之家
编程之家收集整理的这篇文章主要介绍了Python利用Selenium模拟浏览器自动操作编程之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

概述

在进行网站爬取数据的时候,会发现很多网站都进行了反爬虫的处理,如JS加密,Ajax加密,反Debug等方法,通过请求获取数据和页面展示的内容完全不同,这时候就用到Selenium技术,来模拟浏览器的操作,然后获取数据。本文以一个简单的小例子,简述Python搭配Tkinter和Selenium进行浏览器的模拟操作,仅供学习分享使用,如有不足之处,还请指正。

什么是Selenium?

Selenium 是一个用于Web应用程序测试的工具,Selenium测试直接运行在浏览器中,就像真正的用户在操作一样。支持的浏览器包括IE(7,8,9,10,11),Mozilla Firefox,Safari,Google Chrome,Opera等。Selenium支持多种操作系统,如Windows、Linux、IOS等,如果需要支持Android,则需要特殊的selenium,本文主要以IE11浏览器为例。

安装Selenium

 

通过pip install selenium 进行安装即可,如果速度慢,则可以使用国内的镜像进行安装。

涉及知识点

程序虽小,除了需要掌握的Html ,JavaScript,CSS等基础知识外,本例涉及的Python相关知识点还是蛮多的,具体如下:

  • Selenium相关:
    • Selenium进行元素定位,主要有ID,Name,ClassName,Css Selector,Partial LinkText,LinkText,XPath,TagName等8种方式。
    • Selenium获取单一元素(如:find_element_by_xpath)和获取元素数组(如:find_elements_by_xpath)两种方式。
    • Selenium元素定位后,可以给元素进行赋值和取值,或者进行相应的事件操作(如:click)。
  • 线程(Thread)相关:
    • 为了防止前台页面卡主,本文用到了线程进行后台操作,如果要定义一个新的线程,只需要定义一个类并继承threading.Thread,然后重写run方法即可。
    • 在使用线程的过程中,为了保证线程的同步,本例用到了线程锁,如:threading.Lock()。
  • 队列(queue)相关:
    • 本例将Selenium执行的过程信息,保存到对列中,并通过线程输出到页面显示。queue默认先进先出方式。
    • 对列通过put进行压栈,通过get进行出栈。通过qsize()用于获取当前对列元素个数。
  • 日志(logging.Logger)相关:
    • 为了保存Selenium执行过程中的日志,本例用到了日志模块,为Pyhton自带的模块,不需要额外安装。
    • Python的日志共六种级别,分别是:NOTSET,DEBUG,INFO,WARN,ERROR,FATAL,CRITICAL。

示例效果图

本例主要针对某一配置好的商品ID进行轮询,监控是否有货,有货则加入购物车,无货则继续轮询,如下图所示:

核心代码

本例最核心的代码,就是利用Selenium进行网站的模拟操作,如下所示:

  1 class Smoking:
  2     """定义Smoking类"""
  3 
  4     # 浏览器驱动
  5     __driver: webdriver = None
  6      配置帮助类
  7     __cfg_info: dict = {}
  8      日志帮助类
  9     __log_helper: LogHelper = 10      主程序目录
 11     __work_path: str = ''
 12      是否正在运行
 13     __running: bool = False
 14      无货
 15     __no_stock = 'Currently Out of Stock'
 16      线程等待秒数
 17     __wait_sec = 2
 18 
 19     def __init__(self,work_path,cfg_info,log_helper: LogHelper):
 20         初始化 21         self.__cfg_info = cfg_info
 22         self.__log_helper = log_helper
 23         self.__work_path = work_path
 24         self.__wait_sec = int(cfg_info[wait_sec'])
 25          如果小于2,则等于2
 26         self.__wait_sec = (2 if self.__wait_sec < 2 else self.__wait_sec)
 27 
 28     def checkIsExistsById(self,id):
 29         通过ID判断是否存在 30         try:
 31             i = 0
 32             while self.__running and i < 3 33                 if len(self.__driver.find_elements_by_id(id)) > 0:
 34                     break
 35                 else 36                     time.sleep(self. 37                     i = i + 1
 38             return len(self. 39         except BaseException as e:
 40             return 41 
 42      checkIsExistsByName(self,name):
 43         通过名称判断是否存在 44          45             i = 46              47                 __driver.find_elements_by_name(name)) > 48                      49                  50                     time.sleep(self. 51                     i = i + 1
 52              53          54              55 
 56      checkIsExistsByPath(self,path):
 57         通过xpath判断是否存在 58          59             i = 60              61                 __driver.find_elements_by_xpath(path)) > 62                      63                  64                     time.sleep(self. 65                     i = i + 1
 66              67          68              69 
 70      checkIsExistsByClass(self,cls):
 71         通过class名称判断是否存在 72          73             i = 74              75                 __driver.find_elements_by_class_name(cls)) > 76                      77                  78                     time.sleep(self. 79                     i = i + 1
 80              81          82              83 
 84      checkIsExistsByLinkText(self,link_text):
 85         判断LinkText是否存在 86          87             i = 88              89                 __driver.find_elements_by_link_text(link_text)) > 90                      91                  92                     time.sleep(self. 93                     i = i + 1
 94              95          96              97 
 98      checkIsExistsByPartialLinkText(self,1)"> 99         判断包含LinkText是否存在100         101             i =102             103                 __driver.find_elements_by_partial_link_text(link_text)) >104                     105                 106                     time.sleep(self.107                     i = i + 1
108             109         110             111 
112      def waiting(self,*locator):
113          """等待完成"""
114          # self.__driver.switch_to.window(self.__driver.window_handles[1])
115          Wait(self.__driver,60).until(EC.visibility_of_element_located(locator))
116 
117      login(self,username,password):
118         登录119          5. 点击链接跳转到登录页面
120         self.__driver.find_element_by_link_text(账户登录).click()
121          6. 输入账号密码
122          判断是否加载完成
123          self.waiting((By.ID,"email"))
124         if self.checkIsExistsById(email):
125             self.__driver.find_element_by_id().send_keys(username)
126             self.password).send_keys(password)
127              7. 点击登录按钮
128             self.sign-in129 
130      working(self,item_id):
131         工作状态132         __running133             134                  正常获取信息
135                 string136                     self.).clear()
137                     self.).send_keys(item_id)
138                     self.).send_keys(Keys.ENTER)
139                  判断是否查询到商品
140                 xpath = "//div[@class='specialty-header search']/div[@class='specialty-description']/div[" \
141                         @class='gt-450']/span[2] "
142                 if self.checkIsExistsByPath(xpath):
143                     count = int(self.__driver.find_element_by_xpath(xpath).text)
144                     if count < 1145                         time.sleep(self.146                         self.__log_helper.put(没有查询到item id =' + item_id + 对应的信息147                         continue
148                 149                     time.sleep(self.150                     self.没有查询到item id2 =151                     152                  判断当前库存是否有货
153 
154                 xpath1 = //div[@class='product-list']/div[@class='product']/div[@class='price-and-detail']/div[155                          @class='price']/span[@class='noStock'] 156                  self.checkIsExistsByPath(xpath1):
157                     txt = self..find_element_by_xpath(xpath1).text
158                     if txt == self.__no_stock159                          当前无货
160                         time.sleep(self.161                         self.查询一次,无货162                         163 
164                  链接path1
165                 xpath2 = //div[@class='product-list']/div[@class='product']/div[@class='imgDiv']/a166                  判断是否加载完毕
167                  self.waiting((By.CLASS_NAME,"imgDiv"))
168                  self.checkIsExistsByPath(xpath2):
169                     self..find_element_by_xpath(xpath2).click()
170                     time.sleep(self.171                      加入购物车
172                     if self.checkIsExistsByClass(add-to-cart173                         self.__driver.find_element_by_class_name(174                         self.加入购物车成功,商品item-id:' + item_id)
175                         176                     177                         self.未找到加入购物车按钮178                 179                     self.没有查询到,可能是商品编码不对,或者已下架180             181                 self.__log_helper.put(e)
182 
183      startRun(self):
184         运行起来185         186             self.__running = True
187             url: str = self.__cfg_info[url]
188             username = self.username189             password = self.190             item_id = self.item_id191             if url is None or len(url) == 0 or username or len(username) == 0 or password or len(
192                     password) == 0 or item_id or len(item_id) ==193                 self.配置信息不全,请检查config.cfg文件是否为空,然后再重启194                 return
195             __driver is None:
196                 options = webdriver.IeOptions()
197                 options.add_argument(encoding=UTF-8198                 options.add_argument(Accept= text / css,* / *199                 options.add_argument(Accept - Language= zh - Hans - CN,zh - Hans;q = 0.5200                 options.add_argument(Accept - Encoding= gzip,deflate201                 options.add_argument(user-agent=Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko202                  2. 定义浏览器驱动对象
203                 self.__driver = webdriver.Ie(executable_path=self.__work_path + r\IEDriverServer.exe',options=options)
204             self.run(url,password,item_id)
205         206             self.运行过程中出错,请重新打开再试207 
208      run(self,url,1)">209         210          3. 访问网站
211         self..get(url)
212          4. 最大化窗口
213         self..maximize_window()
214         if self.checkIsExistsByLinkText(215              判断是否登录:未登录
216             self.login(username,password)
217         if self.checkIsExistsByPartialLinkText(欢迎回来218              判断是否登录:已登录
219             self.登录成功,下一步开始工作了220             self.working(item_id)
221         222             self.登录失败,请设置账号密码223 
224      stop(self):
225         停止226         227             self.228              如果驱动不为空,则关闭
229             self.close_browser_nicely(self.230             is not231                 self..quit()
232                  关闭后切要为None,否则启动报错
233                 self.__driver =234         235             print(Stop Failure236         finally237             self.238 
239      close_browser_nicely(self,browser):
240         241             browser.execute_script(window.onunload=null; window.onbeforeunload=null242          Exception as err:
243             Fail to execute_script:'window.onunload=null; window.onbeforeunload=null'244 
245         socket.setdefaulttimeout(10246         247             browser.quit()
248             Close browser and firefox by calling quit()249         250             Fail to quit from browser,error-type:%s,reason:%s" % (type(err),str(err)))
251         socket.setdefaulttimeout(30)
View Code

其他辅助类

日志类(LogHelper),代码如下:

 1  LogHelper:
 2     日志帮助类 3     __queue: queue.Queue = None   队列
 4     __logging: logging.Logger = None   日志
 5     __running: bool = False   是否记录日志
 6 
 7      8         初始化类 9         self.__queue = queue.Queue(100010         self.init_log(log_path)
11 
12      put(self,value):
13         添加数据14          记录日志
15         self.__logging.info(value)
16          添加到队列
17         __queue.qsize() < self.__queue.maxsize:
18             self..put(value)
19 
20      get(self):
21         获取数据22         __queue.qsize() >23             24                 return self.__queue.get(block=False)
25             26                 27         28             29 
30      init_log(self,1)">31         初始化日志32         self.__logging = logging.getLogger()
33         self..setLevel(logging.INFO)
34         35         rq = time.strftime(%Y%m%d%H%M,time.localtime(time.time()))
36         log_name = log_path + rq + .log37         logfile = log_name
38          if not os.path.exists(logfile):
39              # 创建空文件
40              open(logfile,mode='r')
41         fh = logging.FileHandler(logfile,mode=aUTF-842         fh.setLevel(logging.DEBUG)   输出到file的log等级的开关
43          第三步,定义handler的输出格式
44         formatter = logging.Formatter(%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s45         fh.setFormatter(formatter)
46          第四步,将logger添加到handler里面
47         self..addHandler(fh)
48 
49      get_running(self):
50          获取当前记录日志的状态
51         __running
52 
53      set_running(self,v: bool):
54          设置当前记录日志的状态
55 
56         self.__running = v
View Code

配置类(ConfigHelper)

 ConfigHelper:
初始化数据类 3 
__config_dir =__dic_cfg = config_dir
10 
11      ReadConfigInfo(self):
12         得到配置项13         parser = ConfigParser()
14         parser.read(self.__config_dir + r\config.cfg15         section = parser.sections()[0]
16         items = parser.items(section)
17         self.__dic_cfg.clear()
18         for item in items:
19             self.__dic_cfg.__setitem__(item[0],item[120 
21      getConfigInfo(self):
22         获取配置信息23         __dic_cfg) ==24             self.ReadConfigInfo()
25         __dic_cfg
View Code

线程类(MyThread)

 MyThread(threading.Thread):
后台监控线程 4      5         线程初始化 6         threading.Thread.(self)
 7         self.threadID = tid
 8         self.name = name
 9         self.smoking = smoking
10         self.log_helper = run(self):
13         开启线程: " + self.name)
14         self.log_helper.put(15          获取锁,用于线程同步
 lock = threading.Lock()
 lock.acquire()
18         self.smoking.startRun()
19          释放锁,开启下一个线程
20          lock.release()
21         结束线程: 22         self.log_helper.put(" + self.name)
View Code

备注

 

侠客行  [唐:李白]
赵客缦胡缨,吴钩霜雪明。银鞍照白马,飒沓如流星。
十步杀一人,千里不留行。事了拂衣去,深藏身与名。
闲过信陵饮,脱剑膝前横。将炙啖朱亥,持觞劝侯嬴。
三杯吐然诺,五岳倒为轻。眼花耳热后,意气素霓生。
救赵挥金槌,邯郸先震惊。千秋二壮士,烜赫大梁城。
纵死侠骨香,不惭世上英。谁能书阁下,白首太玄经。

总结

以上是编程之家为你收集整理的Python利用Selenium模拟浏览器自动操作全部内容,希望文章能够帮你解决Python利用Selenium模拟浏览器自动操作所遇到的程序开发问题。

如果觉得编程之家网站内容还不错,欢迎将编程之家网站推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您喜欢交流学习经验,点击链接加入编程之家官方QQ群:1065694478