通过python分布式开发 网页数据抓取(三)——代码

    作者:课课家教育更新于: 2016-01-08 14:04:31

    大神带你学编程,欢迎选课

      这一节主要是贴代码

          本程序连跑了24小时,然后分布式在10台机器上部署,长时间续航基本没有问题。

      之后每天将进行10万次网页的爬取。

      源码如下:

      内容爬取及工具

       Created on 2010-9-15

       @author: chenggong

       import urllib2

       import re

       import socket

       DEBUG = 0

       工具类

       class Tools():

       #log函数

       @staticmethod

       def writelog(level,info,notify=False):

       if DEBUG == 0:

       try:

       print "["+level+"]"+info.decode('UTF-8').encode('GBK')

      except:

      print "["+level+"]"+info.encode('GBK')

      else:

      print "["+level+"]"+info

      #if notify:

      # print "[notify]报告管理员!!"

      #转unicode

      @staticmethod

      def toUnicode(s,charset):

      if( charset == "" ):

      return s

      else:

      try:

      u = unicode( s, charset )

      except:

      u = ""

      return u

     

     

    #正则抓取

     

    #@param single 是否只抓取一个

     

    @staticmethod

     

    def getFromPatten(patten,src,single=False):

     

    rst = "";

     

    p = re.compile(patten,re.S)

     

    all = p.findall(src)

     

    for matcher in all:

     

    rst += matcher + " "

     

    if( single ):

     

    break

     

    return rst.strip()

    '''

     

    网页内容爬虫

     

    '''

     

    class PageGripper():

     

    URL_OPEN_TIMEOUT = 10 #网页超时时间

     

    MAX_RETRY = 3 #最大重试次数

     

     

    def __init__(self):

     

    socket.setdefaulttimeout(self.URL_OPEN_TIMEOUT)

     

     

    #获取字符集

     

    def getCharset(self,s):

     

    rst = Tools.getFromPatten(u'charset=(.*?)"',s,True)

     

    if rst != "":

     

    if rst == "utf8":

     

    rst = "utf-8"

     

    return rst

     

     

    #尝试获取页面

     

    def downloadUrl(self,url):

     

    charset = ""

     

    page = ""

     

    retry = 0

     

    while True:

     

    try:

     

    fp = urllib2.urlopen(url)

     

    break

     

    except urllib2.HTTPError,e: #状态错误

     

    Tools.writelog('error','HTTP状态错误 code='+e.code)

     

    raise urllib2.HTTPError

     

    except urllib2.URLError,e: #网络错误超时

     

    Tools.writelog('warn','页面访问超时,重试..')

     

    retry+=1

     

    if( retry > self.MAX_RETRY ):

     

    Tools.writelog('warn','超过最大重试次数,放弃')

     

    raise urllib2.URLError

     

     

    while True:

     

    line = fp.readline()

     

    if charset == "":

     

    charset = self.getCharset(line)

     

    if not line:

     

    break

     

    page += Tools.toUnicode(line,charset)

     

    fp.close()

     

    return page

     

     

    #获取页面

     

    def getPageInfo(self,url):

     

    Tools.writelog( "info","开始抓取网页,url= "+url)

     

    info = ""

     

    try:

     

    info = self.downloadUrl(url)

     

    except:

     

    raise

     

    Tools.writelog("debug","网页抓取成功")

     

    return info

    '''

     

    内容提取类

     

    '''

     

    class InfoGripper():

     

    pageGripper = PageGripper()

     

     

    def __init__(self):

     

    Tools.writelog('debug',"爬虫启动")

     

     

    #抓取标题

     

    def griptitle(self,data):

     

    title = Tools.getFromPatten(u'box2t sp">

    (.*?)

    ', data, True)

     

    if title == "":

     

    title = Tools.getFromPatten(u'(.*?)[-<]',data,True)

     

    return title.strip()

     

     

    #抓取频道

     

    def gripchannel(self,data):

     

    zone = Tools.getFromPatten(u'频道:(.*?)',data,True)

     

    channel = Tools.getFromPatten(u'(.*?)',zone,True)

     

    return channel

     

     

    #抓取标签

     

    def griptag(self,data):

     

    zone = Tools.getFromPatten(u'标签:(.*?)',data,True);

     

    rst = Tools.getFromPatten(u'>(.*?)',zone,False);

     

    return rst

     

     

    #抓取观看次数

     

    def gripviews(self,data):

     

    rst = Tools.getFromPatten(u'已经有(.*?)次观看',data);

     

    return rst

     

     

    #抓取发布时间

     

    def griptime(self,data):

     

    rst = Tools.getFromPatten(u'在(.*?)发布',data,True)

     

    return rst

     

     

    #抓取发布者

     

    def gripuser(self,data):

     

    rst = Tools.getFromPatten(u'title="点击进入(.*?)的用户空间"',data,True)

     

    return rst

     

     

    #获取页面字符集

     

    def getPageCharset(self,data):

     

    charset = Tools.getFromPatten(u'charset=(.*?)"',data,True)

     

     

    if( charset == "utf8" ):

     

    charset = "utf-8"

     

    return charset

     

     

    #获取CC相关数据

     

    def getCCData(self,data):

     

     

    zone = Tools.getFromPatten(u'SWFObject(.*?)',data,True)

     

     

    #判断是否使用bokecc播放

     

    isFromBokeCC = re.match('.*bokecc.com.*', zone)

     

    if( not isFromBokeCC ):

     

    return "",""

     

     

    ccSiteId = Tools.getFromPatten(u'siteid=(.*?)[&,"]',zone,True)

     

    ccVid = Tools.getFromPatten(u'vid=(.*?)[&,"]',zone,True)

     

    return ccSiteId,ccVid

     

     

    #获取站内vid

     

    def gripVideoId(self,data):

     

    vid = Tools.getFromPatten(u'var vid = "(.*?)"',data,True)

     

    return vid

     

     

    #获取点击量

     

    def gripViewsajax(self,vid,url,basedir):

     

    host = Tools.getFromPatten(u'http://(.*?)/',url,True)

     

    ajaxAddr = "http://" + host + basedir + "/index.php/ajax/video_statistic/" + vid

     

    '''

     

    try:

     

    content = self.pageGripper.getPageInfo(ajaxAddr)

     

    except Exception,e:

     

    print e

     

    Tools.writelog ("error", ajaxAddr+u"抓取失败")

     

    return "error"

     

    '''

     

    Tools.writelog('debug', u"开始获取点击量,url="+ajaxAddr)

     

    while True:

     

    try:

     

    fp = urllib2.urlopen(ajaxAddr)

     

    break

     

    except urllib2.HTTPError,e: #状态错误

     

    Tools.writelog('error','HTTP状态错误 code='+"%d"%e.code)

     

    return ""

     

    except urllib2.URLError,e: #网络错误超时

     

    Tools.writelog('warn','页面访问超时,重试..')

     

    retry+=1

     

    if( retry > self.MAX_RETRY ):

     

    Tools.writelog('warn','超过最大重试次数,放弃')

     

    return ""

     

    content = fp.read()

     

    fp.close()

     

    views = Tools.getFromPatten(u'"viewcount":(.*?),',content,True)

     

    views = views.replace('"','')

     

    return views

     

     

    #从网页内容中爬取点击量

     

    def gripViewsFromData(self,data):

     

    views = Tools.getFromPatten(u'已经有<.*?>(.*?)<.*?>次观看',data,True)

     

    return views

    def gripBaseDir(self,data):

     

    dir = Tools.getFromPatten(u"base_dir = '(.*?)'",data,True)

     

    return dir

    #抓取数据

     

    def gripinfo(self,url):

     

     

    try:

     

    data = self.pageGripper.getPageInfo(url)

     

    except:

     

    Tools.writelog ("error", url+" 抓取失败")

     

    raise

     

     

    Tools.writelog('info','开始内容匹配')

     

    rst = {}

     

    rst['title'] = self.griptitle(data)

     

    rst['channel'] = self.gripchannel(data)

     

    rst['tag'] = self.griptag(data)

     

    rst['release'] = self.griptime(data)

     

    rst['user'] = self.gripuser(data)

     

    ccdata = self.getCCData(data)

     

    rst['ccsiteId'] = ccdata[0]

     

    rst['ccVid'] = ccdata[1]

     

    views = self.gripViewsFromData(data)

     

    if views =="" or not views:

     

    vid = self.gripVideoId(data)

     

    basedir = self.gripBaseDir(data)

     

    views = self.gripViewsAjax(vid,url,basedir)

     

    if( views == "" ):

     

    views = "error"

     

    if( views == "error"):

     

    Tools.writelog("error","获取观看次数失败")

     

    Tools.writelog("debug","点击量:"+views)

     

    rst['views'] = views

     

    Tools.writelog('debug','title=%s,channel=%s,tag=%s'%(rst['title'],rst['channel'],rst['tag']))

     

    return rst

    '''

     

    单元测试

     

    '''

     

    if __name__ == '__main__':

     

    list = [

     

    'http://008yx.com/xbsp/index.php/video/index/3138',

     

    'http://vblog.xwhb.com/index.php/video/index/4067',

     

    'http://demo.ccvms.bokecc.com/index.php/video/index/3968',

     

    'http://vlog.cnhubei.com/wuhan/20100912_56145.html',

     

    'http://vlog.cnhubei.com/html/js/30271.html',

     

    'http://www.ddvtv.com/index.php/video/index/15',

     

    'http://boke.2500sz.com/index.php/video/index/60605',

     

    'http://video.zgkqw.com/index.php/video/index/334',

     

    'http://yule.hitmv.com/html/joke/27041.html',

     

    'http://www.ddvtv.com/index.php/video/index/11',

     

    'http://www.zgnyyy.com/index.php/video/index/700',

     

    'http://www.kdianshi.com/index.php/video/index/5330',

     

    'http://www.aoyatv.com/index.php/video/index/127',

     

    'http://v.ourracing.com/html/channel2/64.html',

     

    'http://v.zheye.net/index.php/video/index/93',

     

    'http://vblog.thmz.com/index.php/video/index/7616',

     

    'http://kdianshi.com/index.php/video/index/5330',

     

    'http://tv.seeyoueveryday.com/index.php/video/index/95146',

     

    'http://sp.zgyangzhi.com/html/ji/2.html',

     

    'http://www.xjapan.cc/index.php/video/index/146',

     

    'http://www.jojy.cn/vod/index.php/video/index/399',

     

    'http://v.cyzone.cn/index.php/video/index/99',

     

    ]

     

     

    list1 = ['http://192.168.25.7:8079/vinfoant/versionasdfdf']

    infoGripper = InfoGripper()

     

    for url in list:

     

    infoGripper.gripinfo(url)

     

    del infoGripper

    WEB服务及任务调度

    '''

     

    Created on 2010-9-15

    @author: chenggong

     

    '''

     

    # -*- coding: utf-8 -*-

     

    import string,cgi,time

     

    from os import curdir,sep

     

    from BaseHTTPServer import BaseHTTPRequestHandler,HTTPServer

     

    from InfoGripper import *

     

    import re

     

    import MySQLdb

     

    import time

     

    import threading

     

    import urllib

     

    import urllib2

    PORT = 8079

     

    VERSION = 0.1

     

    DBCHARSET = "utf8"

     

    PARAMS = [

     

    'callback',

     

    'sessionId',

     

    'retry',

     

    'retryInterval',

     

    'dbhost',

     

    'dbport',

     

    'db',

     

    'dbuser',

     

    'dbpass',

     

    'videoId'

     

    ]

    DBMAP = ['video_id',

     

    'ccsiteid',

     

    'ccvid',

     

    'desc_url',

     

    'site_id',

     

    'title',

     

    'post_time',

     

    'author',

     

    'elapse',

     

    'channel',

     

    'tags',

     

    'create_time',

     

    'check_time',

     

    'status']

    '''

     

    ERROR CODE定义

     

    '''

     

    ERR_OK = 0

     

    ERR_PARAM = 1

     

    ERR_HTTP_TIMEOUT = 5

     

    ERR_HTTP_STATUS = 6

     

    ERR_DB_CONNECT_FAIL = 8

     

    ERR_DB_SQL_FAIL = 9

     

    ERR_GRIPVIEW = 11

     

    ERR_UNKNOW = 12

    '''

     

    数据库适配器

     

    '''

     

    class DBAdapter(object):

     

     

    def __init__(self):

     

    self.param = {'ip':'',

     

    'port':0,

     

    'user':'',

     

    'pw':'',

     

    'db':''}

     

    self.connect_once = False #是否连接过数据库

     

     

    '''

     

    创建/更新数据库连接池

     

    '''

     

    def connect(self,ip,port,user,pw,db):

     

    if( ip != self.param['ip'] or

     

    port != self.param['port'] or

     

    user != self.param['user'] or

     

    pw != self.param['pw'] or

     

    db != self.param['db']):

     

    Tools.writelog('info','更换数据库连接池,ip='+ip+',port='+port+',user='+user+',pw='+pw+',db='+db)

     

    try:

     

    if self.connect_once == True: #释放上次连接

     

    self.cur.close()

     

    self.conn.close()

     

    self.conn=MySQLdb.connect(user=user,passwd=pw,db=db,host=ip,port=int(port))

     

    self.conn.set_character_set(DBCHARSET)

     

    self.connect_once = True

     

    self.cur=self.conn.cursor(MySQLdb.cursors.Cursor)

     

    self.param['ip'] = ip

     

    self.param['port'] = port

     

    self.param['user'] = user

     

    self.param['pw'] = pw

     

    self.param['db'] = db

     

    except:

     

    Tools.writelog('error',u'数据库连接失败',True)

     

    raise

     

    else:

     

    Tools.writelog('info',u'数据库连接成功')

     

     

    '''

     

    执行SQL语句

     

    '''

     

    def execute(self,sql):

     

    Tools.writelog('debug',u'执行SQL: '+sql)

     

    try:

     

    self.cur.execute(sql)

     

    except:

     

    Tools.writelog('error',u'SQL执行错误:'+sql)

     

    raise

     

     

    '''

     

    查询数据库

     

    '''

     

    def query(self,sql):

     

    row = {}

     

    self.execute(sql)

     

    row=self.cur.fetchall()

     

    return row

     

     

    '''

     

    视频错误

     

    '''

     

    def updateErr(self,videoId):

     

    nowtime = time.strftime('%Y-%m-%d-%H-%M-%S',time.localtime(time.time()))

     

    sql = "UPDATE videos SET "

     

    sql += "check_time='" + nowtime +"',"

     

    sql += "status=-1 "

     

    sql += "WHERE video_id="+videoId

     

    self.execute(sql)

     

    self.conn.commit()

     

     

    '''

     

    更新查询结果

     

    '''

     

    def update(self,obj,videoId,isUpdateTitle=True):

     

     

    Tools.writelog('debug','开始更新数据库')

     

    try:

     

    #更新video表

     

    sql = "UPDATE videos SET "

     

    if(obj['ccsiteId'] !="" ):

     

    sql += "ccsiteid='" + obj['ccsiteId'] + "',"

     

    if(obj['ccVid'] != "" ):

     

    sql += "ccvid='" + obj['ccVid'] + "',"

     

    if isUpdateTitle:

     

    sql += "title='" + obj['title'] + "',"

     

    sql += "post_time='" + obj['release'] + "',"

     

    sql += "author='" + obj['user'] + "',"

     

    sql += "channel='" + obj['channel'] + "',"

     

    sql += "tags='" + obj['tag'] + "',"

     

    nowtime = time.strftime('%Y-%m-%d-%H-%M-%S',time.localtime(time.time()))

     

    sql += "check_time='" + nowtime +"',"

     

    sql += "status=0 "

     

    sql += "WHERE video_id="+videoId

     

     

    self.execute(sql)

     

     

    #更新count表

     

    if( obj['views'] != 'error' ):

     

    nowdate = time.strftime('%Y-%m-%d',time.localtime(time.time()))

     

    sql = "SELECT * FROM counts WHERE "

     

    sql += "date = '" + nowdate + "' and video_id=" + videoId

     

    rst = self.query(sql)

     

    if len(rst) > 0:#如果当天已有记录,则更新

     

    sql = "UPDATE counts SET count="+obj['views']

     

    sql +=" WHERE video_id=" + videoId + " AND date='" + nowdate+ "'"

     

    else:#否则插入

     

    sql = "INSERT INTO counts VALUES"

     

    sql += "(null," +videoId+",'"+nowdate+"',"+obj['views'] + ")"

     

    self.execute(sql)

     

    self.conn.commit()

     

    Tools.writelog('debug', "db commit ok")

     

    return ERR_OK

     

    except Exception,e:

     

    print e

     

    return ERR_DB_SQL_FAIL

    '''

     

    任务线程类

     

    '''

     

    class TaskThread(threading.Thread):

     

     

    def setTaskTool(self,dbAdapter,gripper):

     

    self.dbAdapter = dbAdapter

     

    self.gripper = gripper

     

     

    def setParam(self,param):

     

    self.param = param

     

    self.videoId = param['videoId']

     

    assert self.videoId != ""

     

     

    def init(self):

     

    self.views = "0"

     

    self.errcode = ERR_OK

     

     

    def run(self):

     

    Tools.writelog('debug','开始爬虫任务,sessionId='+self.param['sessionId'])

     

    self.init()

     

    try:

     

    #更新数据库连接

     

    self.dbAdapter.connect(self.param['dbhost'],self.param['dbport'],self.param['dbuser'],self.param['dbpass'],self.param['db'])

     

    except:

     

    self.errcode = ERR_DB_CONNECT_FAIL #数据库连接失败

     

    callback(self.errcode)

     

    return

     

     

    #查询该vid的视频

     

    sql = "SELECT "

     

    for column in DBMAP:

     

    sql += column

     

    if column != DBMAP[len(DBMAP)-1]:

     

    sql += ","

    sql += " FROM videos"

     

    sql += " WHERE video_id="+self.videoId

     

    video = self.dbAdapter.query(sql)

     

    assert not (len(video)>1 or len(video)==0) #有且仅有一条记录

     

     

    url = video[0][3]

     

    assert url != ""

     

    try:

     

    rst = self.gripper.gripinfo(url)

     

    except urllib2.HTTPError,e:

     

    self.errcode = ERR_HTTP_STATUS #HTTP状态错误

     

    self.dbAdapter.updateErr(self.videoId)

     

    except urllib2.URLError,e:

     

    self.errcode = ERR_HTTP_TIMEOUT #HTTP连接超时

     

    self.dbAdapter.updateErr(self.videoId)

     

    except:

     

    self.errcode = ERR_UNKNOW #未知错误

     

    self.dbAdapter.updateErr(self.videoId)

     

    else:

     

    self.views = rst['views']

     

    if self.views == "error":

     

    self.views = "-1"

     

    self.errcode = ERR_GRIPVIEW #数据抓取成功,点击量抓取失败

     

    #更新数据库(特殊处理,如果原title中有 "-" 则不更新title字段)

     

    title = video[0][5]

     

    assert title != ""

     

    if re.match('.*-.*', title):

     

    self.errocde = self.dbAdapter.update(rst,self.videoId,True)

     

    else:

     

    self.errcode = self.dbAdapter.update(rst,self.videoId)

     

    self.callback(self.errcode)

     

    Tools.writelog('info','任务结束,sessionId='+self.param['sessionId'])

     

    return

     

     

    def callback(self,errcode):

     

    results = {'errorcode':errcode,'count':int(self.views)}

     

    results = urllib.urlencode(results)

     

    results = results.replace('&', '%26')

     

    url = self.param['callback']

     

    url += "?"

     

    url += "sessionId=" + self.param['sessionId']

     

    url += "&results=" + results

     

    retry = 0

     

    while True:

     

    try:

     

    Tools.writelog('debug',"回调主控,url="+url)

     

    urllib2.urlopen(url)

     

    Tools.writelog('debug','回调成功')

     

    break

     

    except urllib2.URLError, e: #超时、错误

     

    Tools.writelog('debug','回调主控超时,%s秒后重试'%self.param['retryInterval'])

     

    retry+=1

     

    time.sleep(int(self.param['retryInterval']))

     

    if( retry > int(self.param['retry'])):

     

    Tools.writelog('error','回调主控失败')

     

    return

    '''

     

    WEB服务类

     

    '''

     

    class MyHandler(BaseHTTPRequestHandler):

     

     

    dbAdapter = DBAdapter()

     

    gripper = InfoGripper()

     

     

    def pageSuccess(self):

     

    self.send_response(200)

     

    self.send_header('Content-type', 'text/html')

     

    self.end_headers()

     

     

    def pageFail(self):

     

    self.send_error(404, "not found")

     

     

    def getValue(self,param):

     

    src = self.path + '&'

     

    reg = param + '=' + '(.*?)&'

     

     

    value = Tools.getFromPatten(reg,src,True)

     

    return value

     

     

    def do_GET(self):

     

    isGetVersion = re.match('.*vinfoant/version.*', self.path)

     

    isTask = re.match('.*vinfoant/run.*', self.path)

     

    if( isGetVersion ):

     

    self.pageSuccess()

     

    self.wfile.write(VERSION)

     

    elif( isTask ):

     

    self.pageSuccess()

     

    param = {}

     

    for p in PARAMS:

     

    param[p] = self.getValue(p) #获取各项参数

     

    taskThread = TaskThread()

     

    taskThread.setTaskTool(self.dbAdapter, self.gripper)

     

    taskThread.setParam(param)

     

    taskThread.start()#启动任务线程

     

    self.wfile.write("ok")

     

    else:

     

    self.pageFail()

     

    return

     

     

    '''

     

    启动WEB服务,全局入口

     

    '''

     

    def startHttpd():

     

    try:

     

    Tools.writelog('debug','httpd start..listen on '+str(PORT))

     

    httpd = HTTPServer(('',PORT), MyHandler )

     

    Tools.writelog('debug','success')

     

    httpd.serve_forever()

     

    except KeyboardInterrupt:

     

    Tools.writelog('debug','httpd close..')

     

    httpd.socket.close()

     

     

    if __name__ == '__main__':

     

    startHttpd()

    转载自:http://blog.csdn.net/rcfalcon/article/details/5891781  (课课家

python 更多推荐

课课家教育

未登录