创建SSIS包:ETL中典型的数据清洗的方法

    作者:课课家更新于: 2015-11-02 11:28:51

    在众多学习中,文章也许不起眼,但是重要的下面我们就来讲解一下!!

    这个例子的情景是一个信用卡公司,目前正着手于拓展Florida州新成立的一些公司的业务。市场部门每周都会向这些公司发送一些邮件,我们要为所有的邮件准备抽取数据。假设Florida州提供的一个上面这个dat文件,它是从老的计算机系统里面得到的,它是定长分隔的,这意味着文件中没有分隔符,必须手工设置分隔列的长度。从下面的连接下载这个.dat文件:010305c.dat。如果使用工具查看,它们的模样类似下面的:

    1. 01  ANNUAL_MICRO_DATA_REC. 
    2.     03  ANNUAL_COR_NUMBER                       PIC X(12). 
    3.     03  ANNUAL_COR_NAME                         PIC X(48). 
    4.     03  ANNUAL_COR_STATUS                       PIC X(01). 
    5.     03  ANNUAL_COR_FILING_TYPE                  PIC X(15). 
    6.     03  ANNUAL_COR_2ND_MAIL_ADD_1               PIC X(42). 
    7.     03  ANNUAL_COR_2ND_MAIL_ADD_2               PIC X(42). 
    8.     03  ANNUAL_COR_2ND_MAIL_CITY                PIC X(28). 
    9.     03  ANNUAL_COR_2ND_MAIL_STATE               PIC X(02). 
    10.     03  ANNUAL_COR_2ND_MAIL_ZIP                 PIC X(10). 
    11.     03  ANNUAL_COR_2ND_MAIL_COUNTRY             PIC X(02). 
    12.     03  ANNUAL_COR_FILE_DATE                    PIC X(08). 
    13.     03  ANNUAL_COR_FEI_NUMBER                   PIC X(14). 
    14.     03  ANNUAL_MORE_THAN_SIX_OFF_FLAG           PIC X(01). 
    15.     03  ANNUAL_LAST_TRX_DATE                    PIC X(08). 
    16.     03  ANNUAL_STATE_COUNTRY                    PIC X(02). 
    17.     03  ANNUAL_REPORT_YEAR_1                    PIC X(04). 
    18.     03  ANNUAL_HOUSE_FLAG_1                     PIC X(01). 
    19.     03  ANNUAL_REPORT_DATE_1                    PIC X(08). 
    20.     03  ANNUAL_REPORT_YEAR_2                    PIC X(04). 
    21.     03  ANNUAL_HOUSE_FLAG_2                     PIC X(01). 
    22.     03  ANNUAL_REPORT_DATE_2                    PIC X(08). oracle教程
    23.     03  ANNUAL_REPORT_YEAR_3                    PIC X(04). 
    24.     03  ANNUAL_HOUSE_FLAG_3                     PIC X(01). 
    25.     03  ANNUAL_REPORT_DATE_3                    PIC X(08). 
    26.     03  ANNUAL_RA_NAME                          PIC X(42). 
    27.     03  ANNUAL_RA_NAME_TYPE                     PIC X(01). 
    28.     03  ANNUAL_RA_ADD_1                         PIC X(42). 
    29.     03  ANNUAL_RA_CITY                          PIC X(28). 
    30.     03  ANNUAL_RA_STATE                         PIC X(02). 
    31.     03  ANNUAL_RA_ZIP5                          PIC X(05). 
    32.     03  ANNUAL_RA_ZIP4                          PIC X(04). 
    33.     03  ANNUAL_PRINCIPALS                       OCCURS 6 TIMES. 
    34.     05  ANNUAL_PRINC_TITLE                      PIC X(04). 
    35.     05  ANNUAL_PRINC_NAME_TYPE                  PIC X(01). 
    36.     05  ANNUAL_PRINC_NAME                       PIC X(42). 
    37.     05  ANNUAL_PRINC_ADD_1                      PIC X(42). 
    38.     05  ANNUAL_PRINC_CITY                       PIC X(28). 
    39.     05  ANNUAL_PRINC_STATE                      PIC X(02). 
    40.     05  ANNUAL_PRINC_ZIP5                       PIC X(05). 
    41.     05  ANNUAL_PRINC_ZIP4                       PIC X(04). 
    42.     03  FILLER                                  PIC X(04). 

    创建数据源
    这个文件的内容看上去不知所云,不可能像普通的文本文件一样处理它们。下面要建一个package来清洗和这个类似的数据,得到有用的信息。package完成下面的任务:
    从010305c.dat的存放路径下将文件内容抽取出来存放到本地数据库
    将文件归档避免多次下载
    当一列数据丢失,这一列需要自动重新添加
    当一行数据丢失,需要输出错误的行
    新添加一个Package,重命名为CorporationLoad.dtsx,右击Connection Managers选择新添加一个连接,选择AdventureWorks。创建一个新的Flat File Connection连接,重命名为Corporation Extract连接到010305c.dat。这里不需要设置分隔符,而是选择定长格式,也不需要设置列分隔符,也没有必要选择第一行设置为列名选择项。
    定长格式文件意味着每一列不是由分隔符来分隔,必须手动设置每一列的开始和结束。大多数的大型机文件都是这种格式,你会发现这种设置会有些繁琐。打开文件界面并推断每一列的开始位置。点击Column标签界面如图5-6。在Row Widh栏内输入1172字符(这个数字表示文件的开始)。

    图5-6
    下一步,在列上设置竖直线标示每一列。在标尺刻度上左击设置竖直线,在这个例子中,可以使用下面的表中的提示来设置列。在竖直线上双击可以删除,左击拖动可以移动竖直线。

    标尺刻度值 列名
    12 CorporateNumber
    60 CorporationName
    61 CorporationStatus
    65 FilingType
    118 MailingAddressLine1
    160 MailingAddressLine2
    188 City
    190 State
    200 ZipCode
    202 Country
    210 FilingDate
    224 FEINumber
    1172 Records you will throw out

    在这个表中可以看到丢弃了大部分数据。添加完竖直线之后,点击Advanced标签界面,点击Suggest Types,在Suggest Column Types对话框中接受默认设置,点击OK。默认数据类型设置会满足大部分的数据类型需要,但是也会有一些错误。在Column 8(ZipCode column)需要修改DataType选择项为String[DT_STR],OutputColumnWidth设置为10。最后修改Column 10为String[DT_STR],OutputColumnWidth保持默认,点击OK保存设置。
    创建数据流
    在Control Flow 界面内拖放一个Data Flow task,双击进入数据流标签界面。在界面中拖放一个Flat File数据源重命名为Uncleansed Corporate Data,双击并选择上文中新建的数据连接,点击进入Columns标签界面反选Column 11和Column 12,这意味着市场部门不需要这两列数据。在后面的工作中将添加Destination和数据转换任务。
    处理脏数据
    在进行下一步操作之前,先暂停来了解一下数据。我们创建了数据连接,你可能会注意到有一些数据行是空白的 ,例如city和state的一些记录是没有的。为了解决这个问题,需要使用一些任务将规范的的数据送到一个路径,有缺损的数据送到另一个路径。然后尝试清洗坏的数据并送回到主要路径中。也可能有一些不能清洗的数据,需要写入错误日志。
    首先,设置邮政编码为5位字符,一些包含破折号的有10位字符,还有9位的。使用Derived Column转换来标准化,从工具栏中拖放一个Derived Column数据转换任务重命名为Standardize Zip Code。
    使用箭头连线将Load Corporate Data和Standardize Zip Code连接起来,双击Standardize Zip Code打开编辑界面,展开左边栏中的Column树形结构,点击Column 8拖放到下方的表格内,这里会在表格内预先填入一些信息。为了输出5位邮政编码需要编写一个表达式只截取5位。使用SUBSTRING函数可以实现这种功能,代码如下:
    SUBSTRING([Column 8],1,5)
    在表格Expression列中输入上面代码,在Derived列中选择replace the existing Column 8,最后可以看到界面如图5-7,完成编辑后点击OK退出界面。
    图5-7
    用Conditional Split进行数据转换
    现在数据规范化了,从工具栏中拖放一个Conditional Split数据转换任务,使用箭头连线把它和Standardize Zip Code连接起来,将Conditional Split重命名为Find Bad Record。Conditional Split将把一些不符合要求的数据进行清洗。
    为了去掉没有city或state的数据行,需要编写条件将缺失city或state的数据转移到一个数据流。双击Find Bad Record打开编辑界面,新建一个Missing State or City条件,在Output Name列内输入该名字。编写一个表达式来查找空的记录。一种方法是使用LTRIM函数。两个竖线|用来实现逻辑或。下面的代码用来查找Column 6和Column7。
    LTRIM([Column 6]) == "" || LTRIM([Column 7]) == ""
    最后要给不满足条件的数据命名。这里不满足上述条件的数据命名为Good Data,如图5-8oracle视频教程

    图5-8

    经李克强总理签批,2015年9月,国务院印发《促进大数据发展行动纲要》(以下简称《纲要》),系统部署大数据发展工作。
    《纲要》明确,推动大数据发展和应用,在未来5至10年打造精准治理、多方协作的社会治理新模式,建立运行平稳、安全高效的经济运行新机制,构建以人为本、惠及全民的民生服务新体系,开启大众创业、万众创新的创新驱动新格局,培育高端智能、新兴繁荣的产业发展新生态。
    《纲要》部署三方面主要任务。一要加快政府数据开放共享,推动资源整合,提升治理能力。大力推动政府部门数据共享,稳步推动公共数据资源开放,统筹规划大数据基础设施建设,支持宏观调控科学化,推动政府治理精准化,推进商事服务便捷化,促进安全保障高效化,加快民生服务普惠化。二要推动产业创新发展,培育新兴业态,助力经济转型。发展大数据在工业、新兴产业、农业农村等行业领域应用,推动大数据发展与科研创新有机结合,推进基础研究和核心技术攻关,形成大数据产品体系,完善大数据产业链。三要强化安全保障,提高管理水平,促进健康发展。健全大数据安全保障体系,强化安全支撑。[11] 
    2015年9月18日贵州省启动我国首个大数据综合试验区的建设工作,力争通过3至5年的努力,将贵州大数据综合试验区建设成为全国数据汇聚应用新高地、综合治理示范区、产业发展聚集区、创业创新首选地、政策创新先行区。
    围绕这一目标,贵州省将重点构建“三大体系”,重点打造“七大平台”,实施“十大工程”。
    “三大体系”是指构建先行先试的政策法规体系、跨界融合的产业生态体系、防控一体的安全保障体系;“七大平台”则是指打造大数据示范平台、大数据集聚平台、大数据应用平台、大数据交易平台、大数据金融服务平台、大数据交流合作平台和大数据创业创新平台;“十大工程”即实施数据资源汇聚工程、政府数据共享开放工程、综合治理示范提升工程、大数据便民惠民工程、大数据三大业态培育工程、传统产业改造升级工程、信息基础设施提升工程、人才培养引进工程、大数据安全保障工程和大数据区域试点统筹发展工程。
    此外,贵州省将计划通过综合试验区建设,探索大数据应用的创新模式,培育大数据交易新的做法,开展数据交易的市场试点,鼓励产业链上下游之间的数据交换,规范数据资源的交易行为,促进形成新的业态。
    国家发展改革委有关专家表示,大数据综合试验区建设不是简单的建产业园、建数据中心、建云平台等,而是要充分依托已有的设施资源,把现有的利用好,把新建的规划好,避免造成空间资源的浪费和损失。探索大数据应用新的模式,围绕有数据、用数据、管数据,开展先行先试,更好地服务国家大数据发展战略。


    使用Look Up转换数据
    从工具栏中拖放LookUp数据转换重命名为Fix Bad Records,当你把它和上一个数据转换Find Bad Record连接起来的时候,将会弹出Input Output Selection对话框如图5-9。下拉列表框中选择Missing State or City选择项,点击OK。这将有缺损的数据从Find Bad Record中送出。

    图5-9
    LookUp转换可以根据数据库中ZipCode表中的数据来补全数据行中缺失的city和state。双击打开编辑界面,点击Connection标签界面,选择AdventureWorks数据源和ZipCode数据表。点击Columns标签界面,点击Column 8不放拖放到右边ZipCode列上,这样在两边的表上建立一个连线如图5-10。然后再右边表中选中State和City列,这两列会出现在下方的表格中,ZipName会替换Column 6,State会替换Column 7 如图5-10。点击OK退出编辑界面。

    图5-10
    使用Union All合并
    现在数据被清洗,要使用Union All转换将清洗后的数据送回到主要数据流中。在工具栏中拖放一个Union All转换,从Fix Bad Records向Union All拖放一个连线,从Find Bad Records向Union拖放一个连线,Union All不再需要其他配置。
    最后设置
    最后需要将数据流送到一个OLE DB 目的中。从工具栏中拖放一个OLE DB Destination,重命名为Mail Merge Table。从Union All向它拖放一个连线,双击选择AdventureWorks数据源,Use a Table or View选择项中点击New button。默认的建表语句使用的是表名是Mail Merge Table,数据类型可能有些不是很合适的,代码如下:

    1. CREATE TABLE [Mail Merge Table] ( oracle视频
    2.     [Column 0] VARCHAR(12), 
    3.     [Column 1] VARCHAR(48), 
    4.     [Column 2] VARCHAR(1), 
    5.     [Column 3] VARCHAR(4), 
    6.     [Column 4] VARCHAR(53), 
    7.     [Column 5] VARCHAR(42), 
    8.     [Column 6] VARCHAR(28), 
    9.     [Column 7] VARCHAR(2), 
    10.     [Column 8] VARCHAR(10), 
    11.     [Column 9] VARCHAR(2), 
    12.     [Column 10] VARCHAR(10) 

    修改代码中的表名和列名,修改后的代码如下:

    1. CREATE TABLE MarketingCorporation( 
    2.     CorporateNumber varchar(12), 
    3.     CorporationName varchar(48), 
    4.     FilingStatus char(1), 
    5.     FilingType char(4), 
    6.     AddressLine1 varchar(53), 
    7.     AddressLine2 varchar(42), 
    8.     City varchar(28), 
    9.     State char(2), 
    10.     ZipCode varchar(10), 
    11.     Country char(2), 
    12.     FilingDate varchar(50) NULL 

    由于列名是不同的需要点击Mapping标签将列对应起来。

    处理更多的脏数据

    这个Package基本上完成了,但是这里有一个致命的缺陷。010305c.dat这个文件中有一些多余的数据,在Find Bad Records和Fix Bad Records之间添加一个data viewer可以查看这些多余的数据。
    这样可以查看在010305c.dat文件中有4条数据被Fix Bad Records处理,只有2条被清洗。另外2条不能被定为到Fix Bad Records中。在这个Package的需求中有一条是为市场部门提供一份地址列表用在邮件内容中。下图5-11中显示了错误所在。

    图5-11
    在输出界面中你可以看到如下的错误信息:
    Error: 0xC020901E at Load Corporate Data, Fix Bad Records [87]: Row yielded no
    match
    during lookup.
    Error: 0xC0209029 at Load Corporate Data, Fix Bad Records [87]: The "component "Fix
    Bad
    Records" (87)" failed because error code 0xC020901E occurred, and the error row
    disposition on "output "Lookup Output" (89)" specifies failure on error. An error
    occurred on the specified object of the specified component.
    Error: 0xC0047022 at Load Corporate Data, DTS.Pipeline: The ProcessInput method on
    component "Fix Bad Records" (87) failed with error code 0xC0209029. The identified
    component returned an error from the ProcessInput method. The error is specific to
    the
    component, but the error is fatal and will cause the Data Flow task to stop
    running.
    Error: 0xC0047021 at Load Corporate Data, DTS.Pipeline: Thread "WorkThread0" has
    exited
    with error code 0xC0209029.
    Error: 0xC0047039 at Load Corporate Data, DTS.Pipeline: Thread "WorkThread1"
    received a
    shutdown signal and is terminating. The user requested a shutdown, or an error in
    another
    thread is causing the pipeline to shutdown.
    Error: 0xC0047021 at Load Corporate Data, DTS.Pipeline: Thread "WorkThread1" has
    exited
    with error code 0xC0047039.
    不能因为这些错误提示而放弃这个Package,需要将错误信息输入到错误消息队列中以备查看。需要创建一个ErrorQueue表,从工具栏中拖放一个Audit转换,重命名为Add Auditing Info,从Fix Bad Records中拖拽红色箭头连线连接到Add Auditing Info。
    如图5-12,可以看到Configure Error Output对话框,在这个对话框中配置如果错误出现时SSIS如何反应。Truncation列表明如果一行太长而不能加入到转换中时所作的反应。Error列表明遇到转换错误时如何反应。在Description列中可以看到期望捕获的错误。例如对于Lookup转换,需要捕获的错误是lookup failure,意思是lookup转换不能找到对应的输入。在这个例子中选择错误类型如图5-10。默认情况会使任务失败,结果会使整个Package失败。也可以从下拉列表中选择完全忽略错误。

    图5-10
    完成配置之后点击OK退出界面。
    错误处理之后,双击Audit transform打开编辑界面,添加两列。继续添加两列,在Audit Type列中选择Task Name和Package Name,Output Column Name默认同名,去掉名字中的空格,如图5-13。由于可能有多个Package向表中写入数据,所以这些信息是必须的。

    图5-13
    最后的工作是将脏数据送入到SQL Server中的ErrorQueue表中,从工具栏中拖放另外一个OLE DB目的,重命名为Error Queue,双击选择AdventureWorks数据源,点击New新添加一个表,重命名表名ErrorQueue,代码如下:

    1. CREATE TABLE ErrorQueue( 
    2.     CorporateNumber varchar(12), 
    3.     CorporationName varchar(48), 
    4.     FilingStatus char(1), 
    5.     FilingType char(4), 
    6.     AddressLine1 varchar(53), 
    7.     AddressLine2 varchar(42), 
    8.     City varchar(28), 
    9.     State char(2), 
    10.     ZipCode varchar(10), 
    11.     Country char(2), 
    12.     FilingDate varchar(10) NULL, 
    13.     ErrorCode INT, 
    14.     ErrorColumn INT, oracle数据库教程
    15.     TaskName NVARCHAR(19), 
    16.     PackageName NVARCHAR(30) 

    注意:可以看到这个表中的信息是很笼统的。
    这次需要点击mapping将列一一对应起来,点击OK退出编辑界面。现在可以执行这个Package了,4条记录被清洗,2条送到error queue。执行成功之后的界面如图5-14。

    更多视频课程文章的课程,可到课课家官网查看。我在等你哟!!!

     

课课家教育

未登录