编程语言动态数据的克星

    作者:采集小钢炮更新于: 2020-02-07 15:04:05

    大神带你学编程,欢迎选课

    爬虫系列之Pyppeteer:动态数据的克星。编程语言往往使程序员能够比使用机器语言更准确地表达他们所想表达的目的。对那些从事计算机科学的人来说,懂得程序设计语言是十分重要的,因为在当今所有的计算都需要程序设计语言才能完成。

    在处理数据采集过程中,相信大家都会遇到动态网站的采集,如果是几个或者几十个,都可以通过抓包,分析数据流直接获取数据,但是当有几千或者几万个的时候,抓包就显得太过于浪费金钱和时间了。这也是Pyppeteer、selenium、PhantomJS等存在的原因。今天主要介绍一下Pyppeteer。

    Pyppeteer其实是Puppeteer的Python版本,下面简单介绍下Pyppeteer的两大特点,chromium浏览器和asyncio框架:

    1).chromium

    Chromium是一款独立的浏览器,是Google为发展自家的浏览器Google Chrome而开启的计划,相当于Chrome的实验版,Chromium的稳定性不如Chrome但是功能更加丰富,而且更新速度很快,通常每隔数小时就有新的开发版本发布。

    Pyppeteer的web自动化是基于

    编程语言动态数据的克星_编程语言_Python视频_Python视频教程_课课家

    来实现的,由于chromium中某些特性的关系,Pyppeteer的安装配置非常简单,关于这一点稍后我们会详细介绍。

    2).asyncio

    asyncio是Python的一个异步协程库,自3.4版本引入的标准库,直接内置了对异步IO的支持,号称是Python最有野心的库,官网上有非常详细的介绍。

    由于Pyppeteer是基于asyncio实现的,所以它本身就支持异步操作,执行效率得到大幅提升。

     

    3).安装

    使用pip install pyppeteer命令就能完成pyppeteer库的安装,至于chromium浏览器,只需要一条pyppeteer-install命令就会自动下载对应的最新版本chromium浏览器到pyppeteer的默认位置。

    如果不运行pyppeteer-install命令,在第一次使用pyppeteer的时候也会自动下载并安装chromium浏览器,效果是一样的。总的来说,pyppeteer比起selenium省去了driver配置的环节。

    当然,出于某种原因,也可能会出现chromium自动安装无法顺利完成的情况,这时可以考虑手动安装:

    首先,从下列网址中找到自己系统的对应版本,下载chromium压缩包;

    1·'Linux':'https://storage.googleapis.com/chromium-browser-snapshots/Linux_x64/575458/chrome-linux.zip'

    2·'mac':'https://storage.googleapis.com/chromium-browser-snapshots/Mac/575458/chrome-mac.zip'

    3·'win32':'https://storage.googleapis.com/chromium-browser-snapshots/Win/575458/chrome-win32.zip'

    4·'win64':'https://storage.googleapis.com/chromium-browser-snapshots/Win_x64/575458/chrome-win32.zip'

    4).使用

    下面是我基于工作中常用到的方法,整理的一个公共类,大家可以借鉴一下:

    '''

    Created on Jul 27, 2019

     @author: admin

    '''

    import asyncio, tkinter, traceback

    import base64, time, random

    from pyppeteer import launch

    from com.fy.utils.http.UserAgentUtils import UserAgentUtils

    from com.fy.utils.hash.HashUtils import Hash_Utils

    from com.fy.utils.file.FileUtils import File_Utils

    class PyppeteerBrowser:

    def __init__(self):

    self.hash = Hash_Utils()

    self.url = None

    self.ua = UserAgentUtils()

    #"""使用tkinter获取屏幕大小""")

    def screen_size(self):

    tk = tkinter.Tk()

    width = tk.winfo_screenwidth()

    height = tk.winfo_screenheight()

    tk.quit()

    return width, height

     

    #构造一个浏览器对象; ; 如果需要每次初始化新的浏览器对象,则userDataDir路径必须不同,否则,始终是在第一次初始化的浏览器对象上进行操作,且容易出异常;

    async def getbrowser(self, headless=False, userDataDir=None):

    '''

    参数:

    •ignoreHTTPSErrors(bool):是否忽略 HTTPS 错误。默认为 False

    •headless(bool):是否在无头模式下运行浏览器。默认为 True除非appMode或devtools选项True

    •executablePath (str):运行 Chromium 或 Chrome 可执行文件的路径,而不是默认捆绑的 Chromium。如果指定之后就不需要使用默认的 Chromium 了,可以指定为已有的 Chrome 或 Chromium。

    •slowMo (int | float):通过传入指定的时间,可以减缓 Pyppeteer 的一些模拟操作。 (按指定的毫秒数减慢 pyppeteer 操作。)

    •args (List [str]):传递给浏览器进程的附加参数(标志)。

    •dumpio(bool):是否管道浏览器进程 stdout 和 stderr 进入process.stdout和process.stderr。默认为False。为 True时,可以解决chromium浏览器多开页面卡死问题。

    •userDataDir (str):用户数据目录的路径。即用户数据文件夹,即可以保留一些个性化配置和操作记录。(比如登录信息等;可以在以后打开时自动登录;)

    •env(dict):指定浏览器可见的环境变量。默认与 python 进程相同。

    •devtools(bool):是否为每个选项卡自动打开 DevTools 面板。如果是此选项True,headless则将设置该选项 False。

    •logLevel(int | str):用于打印日志的日志级别。默认值与根记录器相同。

    •autoClose(bool):脚本完成时自动关闭浏览器进程。默认为True。

    •loop(asyncio.AbstractEventLoop):事件循环(实验)。

    •args:常用的有['--no-sandbox','--disable-gpu', '--disable-setuid-sandbox','--window-size=1440x900']

    •dumpio: 不知道为什么,如果不加 dumpio=True 有时会出现浏览器卡顿

    •autoClose:默认就好,不过如果你需要保持浏览器状态,可以不关闭,下次直接连接这个已存在的浏览器

     ignoreDefaultArgs (bool): 不使用 Pyppeteer 的默认参数,如果使用了这个参数,那么最好通过 args 参数来设定一些参数,否则可能会出现一些意想不到的问题。这个参数相对比较危险,慎用。

    handleSIGINT (bool): 是否响应 SIGINT 信号,也就是可以使用 Ctrl + C 来终止浏览器程序,默认是 True。

    handleSIGTERM (bool): 是否响应 SIGTERM 信号,一般是 kill 命令,默认是 True。

    handleSIGHUP (bool): 是否响应 SIGHUP 信号,即挂起信号,比如终端退出操作,默认是 True。

     launch_kwargs = {

    # 控制是否为无头模式

    "headless": False,

    # chrome启动命令行参数

    "args": [

    # 浏览器代理 配合某些中间人代理使用

    "--proxy-server=http://127.0.0.1:8008",

    # 最大化窗口

    "--start-maximized",

    # 取消沙盒模式 沙盒模式下权限太小

    "--no-sandbox",

    # 不显示信息栏 比如 chrome正在受到自动测试软件的控制 ...

    "--disable-infobars",

    # log等级设置 在某些不是那么完整的系统里 如果使用默认的日志等级 可能会出现一大堆的warning信息

    "--log-level=3",

    # 设置UA

    "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36",

    ],

    # 用户数据保存目录 这个最好也自己指定一个目录

    # 如果不指定的话,chrome会自动新建一个临时目录使用,在浏览器退出的时候会自动删除临时目录

    # 在删除的时候可能会删除失败(不知道为什么会出现权限问题,我用的windows) 导致浏览器退出失败

    # 然后chrome进程就会一直没有退出 CPU就会狂飙到99%

    "userDataDir": "",

    }

    '''

    print("构造浏览器对象开始...")

    args = [ "--start-maximized", '--no-sandbox', "--disable-infobars" , "--log-level=3"]

    parameters = {}

    if userDataDir == None:

    parameters = {'headless': headless, #是否打开浏览器;False:打开浏览器;True:进程中运行;

    'args': args,

    'dumpio': True #'dumpio': True:解决chromium浏览器多开页面卡死问题。

    }

    else:

    parameters = {'headless': headless, #是否打开浏览器;False:打开浏览器;True:进程中运行;

    'args': args,

    "userDataDir": userDataDir,

    'dumpio': True #'dumpio': True:解决chromium浏览器多开页面卡死问题。

    }

    #注意:同一个用户目录(userDataDir)不能被两个chrome进程使用,如果你要多开,记得分别指定用户目录。否则会报编码错误。

    self.browser = await launch(parameters)

     self.page = await self.browser.newPage()#在此浏览器上创建新页面并返回其对象。

    width, height = self.screen_size()

    # 设置网页可视区域大小

    await self.page.setViewport({

    "width": width,

    "height": height

    })

     # 是否启用JS,enabled设为False,则无渲染效果

    await self.page.setJavaScriptEnabled(enabled=True)

     #设置请求头userAgent

    await self.page.setUserAgent(self.ua.getheaders())

     await self.preventCheckWebdriver(self.page)

    print("构造浏览器对象完毕....", self.page)

     #获取当前操作的界面

    async def getPage(self):

    return self.page

     #获取当前page对象的链接;

    async def getCurUrl(self, page):

    if page == None:

    page = self.page

    return page.url

     #打开一个新的界面;)

    async def getnewpage(self):

    return await self.browser.newPage()

     #获取当前操作的界面重新加载

    async def reload(self):

    await self.page.reload()

     #当前操作界面返回

    async def goBack(self):

    await self.page.goBack()

     #获取当前操作的界面的URL

    async def getPageUrl(self):

    await self.page.url()

     #打开连接;

    async def open(self, url, timeout=60):

    try:

    if url == None:

    print("当前传入的【url】不能为空,参数错误!!")

    self.url = url

    print("打开网页:" + (url))

    self.res = await self.page.goto(url, options={'timeout':int(timeout * 1000)})#打开连接;

    await asyncio.sleep(1)#强行等待3秒

    status = self.res.status

    curUrl = self.page.url

    await self.preventCheckWebdriver(self.page)

    return status, curUrl

    except:return 404, None

    #防止 webdriver 检测,如淘宝登录。其实淘宝主要通过 window.navigator.webdriver 来对 webdriver 进行检测,所以我们只需要使用 Javascript 将它设置为 false 即可

    async def preventCheckWebdriver(self, page):

    if page == None:

    page = self.page

    # 替换淘宝在检测浏览时采集的一些参数。

    # 就是在浏览器运行的时候,始终让window.navigator.webdriver=false

    # navigator是windiw对象的一个属性,同时修改plugins,languages,navigator

    await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }''') # 以下为插入中间js,将淘宝会为了检测浏览器而调用的js修改其结果。

    await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {}, }; }''')

    await page.evaluate('''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''')

    await page.evaluate('''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''')

     #关闭当前打开的浏览器;

    async def closeBrowser(self, browser):

    if browser == None:

    browser = self.browser

    try:

    await browser.close()

    except:pass

     #关闭当前打开的浏览器中的一个界面;

    async def closePage(self, page):

    if page == None:

    page = self.page

    await page.close()

     #关闭当前打开的浏览器中的某一个界面;

    async def closeNumPage(self, number:"号码从0开始"):

    pages = await self.browser.pages()

    await pages[number].close()

    return True

     #关闭除了最后一个所有的界面;

    async def retainLastPage(self):

    pages = await self.browser.pages()

    num = 0

    for page in pages:

    if num != (len(pages) - 1):

    await page.close()

    else:

    self.page = page

    num += 1

     #获取当前打开页面的响应状态

    async def gerReponseStatus(self):

    try:return self.res.status # 响应状态

    except:return 200

     #截个图

    async def screenshot(self, page):

    hashCode = self.hash.getMd5Hash(self.url)

    if page == None:

    page = self.page

    await page.screenshot({'path': './screenshots/' + str(hashCode) + '.png'})

     # 得到响应头;

    async def getHeader(self):

    return self.res.headers # 响应头;

     # 滚动到页面底部

    async def scrollToButtom(self, page):

    if page == None:

    page = self.page

    await page.evaluate('window.scrollBy(0, document.body.scrollHeight)')

    print("滑动到当前界面底部【完毕】")

     # 获取当前页面的cookie

    async def getCookies(self, page):

    if page == None:

    page = self.page

    return await page.cookies()

     # 获取登录后cookie尚未测试是否正常;

    async def getCookieStr(page):

    if page == None:

    page = self.page

    cookies_list = await page.cookies()

    cookies = ''

    for cookie in cookies_list:

    str_cookie = '{0}={1};'

    str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))

    cookies += str_cookie

    print(cookies)

    # 将cookie 放入 cookie 池 以便多次请求 封账号 利用cookie 对搜索内容进行爬取

    return cookies

     # 获取当前页面的cookie

    async def setCookies(self, page, cookies):

    if page == None:

    page = self.page

    return await page.setCookie(*cookies)

     # 获取所有 html源码

    async def getHtml(self, page):

    if page == None:

    page = self.page

    return (await page.content())

     #当前页标题

    async def getCurPageTitle(self, page):

    if page == None:

    page = self.page

    return (await page.title())

     #获取对象属性值;

    async def getElementFieldValue(self, page, element, field):

    if element == None:

    print("当前传入的【element】不能为空,参数错误!!")

    return None

    if field == None:

    print("当前传入的【field】不能为空,参数错误!!")

    return None

    if page == None:

    page = self.page

    if str(type(element)) == "":

    print("当前传入的【element】不是单个对象,为list集合,参数错误!!")

    return None

    fieldValue = await (await element.getProperty(field)).jsonValue()

    return fieldValue

     #获取当前界面的宽、高、像素大小比率三个值

    async def getPageWidthHight(self, page):

    if page == None:

    page = self.page

    return await page.evaluate('''() => {

    return {

    width: document.documentElement.clientWidth,

    height: document.documentElement.clientHeight,

    deviceScaleFactor: window.devicePixelRatio,

    }

    }''')

     #获取当前浏览器的所有界面集合;

    async def getCurBrowserAllPages(self):

    return await self.browser.pages()

     #获取当前界面中某个元素的内容;

    async def getElementsByXpaths(self, page, xpath:'如://div[@class="title-box"]/a'):

    if xpath == None:

    print("当前传入的【xpath】不能为空,参数错误!!")

    return None

    if page == None:

    page = self.page

    elemList = None

    try:elemList = await page.xpath(xpath)

    except:

    print("获取xpath路径为【" + str(xpath) + "】的标签对象异常...")

    return elemList#返回类型为:list集合;

     #获取当前界面的所有内容(不带html标签的内容);(效果较差;)--

    async def getPageText(self, page):

    if page == None:

    page = self.page

    '''Pyppeteer的evaluate()方法只使用JavaScript字符串,该字符串可以是函数也可以是表达式,

    Pyppeteer会进行自动判断。但有时会判断错误,如果字符串被判断成了函数,并且报错,

    可以添加选项force_expr=True,强制Pyppeteer作为表达式处理。'''

    return await page.evaluate('document.body.textContent', force_expr=True)

     #获取元素内容; --

    async def getElementText(self, page, element):

    if element == None:

    print("当前传入的【element】不能为空,参数错误!!")

    return None

    if page == None:

    page = self.page

    if str(type(element)) == "":

    print("当前传入的【element】不是单个对象,为list集合,参数错误!!")

    return None

    return await page.evaluate('(element) => element.textContent', element)

     #通过selector获取元素内容; --

    async def getElementBySelector(self, page , selector):

    if selector == None:

    print("当前传入的【selector】不能为空,参数错误!!")

    return None

    if page == None:

    page = self.page

    return await page.querySelector(selector)

     #向输入框输入数据 --

    async def inputKw(self, page, selector:"如:'input#kw.s_ipt':获取input标签中id='kw',class='s_ipt'的对象。不可用xpath路径", kw:'待输入的关键词'):

    if kw == None:

    print("当前传入的【kw】不能为空,参数错误!!")

    return None

    if selector == None:

    print("当前传入的【selector】不能为空,参数错误!!")

    return None

    if page == None:

    page = self.page

    mylist = [1.55, 0.798, 1.187]

    if len(kw) <= 5:

    mylist = [0.695, 0.798, 1.087, 0.343, 0.4067]

    else:

    mylist = [1.095, 0.798, 1.127, 1.0543, 1.1267, 0.8067]

    for i in str(kw):#逐个字符输入,减少被识别为机器的改了;

    await page.type(selector, i)

    time.sleep(random.choice(mylist))

    return None

     #鼠标单击某一个元素; --

    async def clickElement(self, page, selector:"如:'input#kw.s_ipt':获取input标签中id='kw',class='s_ipt'的对象。。不可用xpath路径"):

    if selector == None:

    print("当前传入的【selector】不能为空,参数错误!!")

    if page == None:

    page = self.page

    await page.click(selector)#如果selector获取的对象是list集合,则执行第一个元素的点击;

     #清空某个input的值

    async def removeInputValue(self, page, idValue):

    if idValue == None:

    print("当前传入的【idValue】不能为空,参数错误!!")

    if page == None:

    page = self.page

    await page.evaluate("document.querySelector('#" + str(idValue) + "').value=''")

    print("清空【" + str(idValue) + "】的内容")

     #async def clickByEle(self, ele):

    if ele == None:

    return

    return ele.click()

     #获取当前浏览器打开的【最后一个】界面对象

    async def getLastPage(self):

    pages = await self.browser.pages()

    return pages[-1]

     #获取当前浏览器打开的【最后一个】界面对象

    async def getPageTotal(self):

    pages = await self.browser.pages()

    return len(pages)

     #获取当前浏览器打开的【最一个】界面对象

    async def getFirstPage(self):

    pages = await self.browser.pages()

    return pages[0]

     #获取当前界面中所有的frame对象

    async def getAllFrames(self, page):

    if page == None:

    page = self.page

    return page.frames

     async def getScreenshotByEle(self, page, ele, screenshotFilePath:"目前测试只有.png图片可正常生成,jpg异常;"):

    picture = ''

    try:

    fu = File_Utils(None)

    fu = File_Utils(fu.getParentDir(screenshotFilePath))

    if not fu.exists(fu.getParentDir(screenshotFilePath)):fu.makeDirs()#如果图片的保存目录不存在,则创建;

    # 进行截图

    time.sleep(3)

    print("验证码路径:", screenshotFilePath)

    try:

    for _ in range(6):

    clip = await ele.boundingBox()

    picture = base64.b64encode(await page.screenshot({

    'path': screenshotFilePath, # 图片路径, 不指定就不保存

    'clip': clip, # 指定图片位置,大小

    # 'encoding': 'base64',# 返回的图片格式, 默认二进制

    }))

    if picture != '':

    break

    except Exception as e:

    print('截图获取失败')

    print(traceback.print_exc())

    except Exception as e:

    print('截图获取失败')

    print(traceback.print_exc())

    return picture

     ppy = PyppeteerBrowser()

    from com.fy.utils.date.DateUtils import Date_Utils

    du = Date_Utils()

    userDataDir = "d://pyppeteer" + str(du.getCurrentTimeLong())

    asyncio.get_event_loop() .run_until_complete(ppy.getbrowser(False, userDataDir))

    asyncio.get_event_loop() .run_until_complete(ppy.open("http://caifuhao.eastmoney.com/discover/finance", 60))

    time.sleep(2)

    print(asyncio.get_event_loop() .run_until_complete(ppy.getCookies(None)))

    asyncio.get_event_loop() .run_until_complete(ppy.closeBrowser(None))

    在过去的几十年间,大量的编程语言被发明、被取代、被修改或组合在一起。尽管人们多次试图创造一种通用的程序设计语言,却没有一次尝试是成功的。之所以有那么多种不同的编程语言存在的原因是,编写程序的初衷其实也各不相同;新手与老手之间技术的差距非常大,而且有许多语言对新手来说太难学;还有,不同程序之间的运行成本(runtime cost)各不相同。

课课家教育

未登录