Spark入门教程(Python版):如何设置Spark?

    作者:课课家教育更新于: 2016-06-08 09:44:17

      Spark是什么呢?下面我们来讨论下。所谓Spark,其实就是个通用的集群计算框架,通过将大量数据集计算任务分配到多台计算机上,以提供高效内存计算。

      Spark是第一个脱胎于资源管理框架转变的快速、通用分布式计算范式,并且很快流行起来。Spark使用函数式编程范式扩展了MapReduce模型以支持更多计算类型,可以涵盖广泛的工作流,这些工作流之前被实现为Hadoop之上的特殊系统。Spark使用内存缓存来提升性能,因此进行交互式分析也足够快速(就如同使用Python解释器,与集群进行交互一样)。缓存同时提升了迭代算法的性能,这使Spark非常适合数据理论任务,特别是机器学习。

    Spark入门(Python版):如何设置Spark_设置Spark_java_课课家

      Hadoop是对大数据集进行分布式计算的标准工具,这也是为什么当你穿过机场时能看到”大数据(Big Data)”广告的原因。它已经成为大数据的操作系统,允许使用相对便宜的商业硬件集群进行超级计算机级别的计算,提供了包括工具和技巧在内的丰富生态系统。2003年和2004年,两个来自Google的观点使Hadoop成为可能:一个分布式存储框架(Google文件系统),在Hadoop中被实现为HDFS;一个分布式计算框架(MapReduce)。

      这两个观点成为过去十年规模分析(scaling analytics)、大规模机器学习(machine learning),以及其他大数据应用出现的主要推动力!但是,从技术角度上讲,十年是一段非常长的时间,而且Hadoop还存在很多已知限制,尤其是MapReduce。对MapReduce编程明显是困难的。对大多数分析,你都必须用很多步骤将Map和Reduce任务串接起来。这造成类SQL的计算或机器学习需要专门的系统来进行。更糟的是,MapReduce要求每个步骤间的数据要序列化到磁盘,这意味着MapReduce作业的I/O成本很高,导致交互分析和迭代算法(iterative algorithms)开销很大;而事实是,几乎所有的最优化和机器学习都是迭代的。

      为了解决这些问题,Hadoop一直在向一种更为通用的资源管理框架转变——YARN(Yet Another Resource Negotiator, 即又一个资源协调者)。YARN实现了下一代的MapReduce,但同时也允许应用利用分布式资源而不必采用MapReduce进行计算。通过将集群管理一般化,研究转到分布式计算的一般化上,来扩展了MapReduce的初衷。

      Spark使用函数式编程范式扩展了MapReduce模型以支持更多计算类型,可以涵盖广泛的工作流,这些工作流之前被实现为Hadoop之上的特殊系统。Spark使用内存缓存来提升性能,因此进行交互式分析也足够快速(就如同使用Python解释器,与集群进行交互一样)。缓存同时提升了迭代算法的性能,这使得Spark非常适合数据理论任务,特别是机器学习。Spark是第一个脱胎于该转变的快速、通用分布式计算范式,并且很快流行起来。

      在本文中,我们首先讨论如何在本地机器上或者EC2的集群上设置Spark进行简单分析,然后在入门级水平探索Spark,了解Spark是什么以及它如何工作,以激发更多探索。最后我们通过命令行与Spark进行交互,演示如何用Python写Spark应用,并作为Spark作业提交到集群上。

      一、设置Spark

      在本机设置和运行Spark其实很简单,你只需下载一个预构建的安装包,只要安装了java6+和Python2.6+,就可以在Windows、MacOSX和Linux上运行Spark。确保java程序在PATH环境变量中,或者设置了JAVA_HOME环境变量。类似的,python也要在PATH中。

      假设你已经安装了Java和Python:

      1.访问Spark下载页。

      2.选择Spark最新发布版(本文写作时是1.2.0),一个预构建的Hadoop2.4包,直接下载。

      现在,如何继续依赖于你的操作系统,靠你自己去探索了。Windows用户可以在评论区对如何设置的提示进行评论。

      一般建议按照下面的步骤操作(在POSIX操作系统上):

      (1)首先解压Spark。

    ~$ tar -xzf spark-1.2.0-bin-hadoop2.4.tgz
    

      (2)然后将解压目录移动到有效应用程序目录中。

    ~$ mv spark-1.2.0-bin-hadoop2.4 /srv/spark-1.2.0

      (3)创建指向该Spark版本的符号链接到

    ~$ ln -s /srv/spark-1.2.0 /srv/spark

      (4)修改BASH配置,将Spark添加到PATH中,设置SPARK_HOME环境变量。这些小技巧在命令行上会帮到你。在Ubuntu上,只要编辑~/.bash_profile或~/.profile文件,将以下语句添加到文件中:

    export SPARK_HOME=/srv/spark
    export PATH=$SPARK_HOME/bin:$PATH
    

      (5)source配置(或者重启终端)之后,在本地运行一个pyspark解释器。执行pyspark命令,可以看到以下结果:

    ~$ pyspark
    Python 2.7.8 (default, Dec  2 2014, 12:45:58)
    [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin
    Type "help", "copyright", "credits" or "license" for more information.
    Spark assembly has been built with Hive, including Datanucleus jars on classpath
    Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties
    [… snip …]
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  `_/
       /__ / .__/\_,_/_/ /_/\_\   version 1.2.0
          /_/
     
    Using Python version 2.7.8 (default, Dec  2 2014 12:45:58)
    SparkContext available as sc.
    >>>
    

      此时Spark已经安装完毕,可以在本机以”单机模式“(standalonemode)使用。可以在本机开发应用并提交Spark作业,这些作业将以多进程/多线程模式运行的,或者,配置该机器作为一个集群的客户端(但不推荐这样做,因为在Spark作业中,驱动程序(driver)是个很重要的角色,并且应该与集群的其他部分处于相同网络)。除了开发,你在本机使用Spark做得最多的就是利用spark-ec2脚本来配置Amazon云上的一个EC2Spark集群了。

      二、简略Spark输出

      Spark(和PySpark)的执行可以特别详细,很多INFO日志消息都会打印到屏幕。这些非常恼人,因为在开发过程中,可能丢失Python栈跟踪或者print的输出。为了减少Spark输出,你可以设置$SPARK_HOME/conf下的log4j。首先,拷贝一份$SPARK_HOME/conf/log4j.properties.template文件,去掉“.template”扩展名。

    ~$ cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties
    

      编辑新文件,用WARN替换代码中出现的INFO。你的log4j.properties文件类似:

    # Set everything to be logged to the console
     log4j.rootCategory=WARN, console
     log4j.appender.console=org.apache.log4j.ConsoleAppender
     log4j.appender.console.target=System.err
     log4j.appender.console.layout=org.apache.log4j.PatternLayout
     log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
    # Settings to quiet third party logs that are too verbose
     log4j.logger.org.eclipse.jetty=WARN
     log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
     log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN
     log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN
    

      运行PySpark,输出消息将会更简略!

      三、在Spark中使用IPython Notebook

      在搜索有用的Spark小技巧时,偶然发现了一些文章提到在PySpark中配置IPython notebook。IPython notebook对数据科学家来说是个交互地呈现科学和理论工作的必备工具,它集成了文本和Python代码,IPython notebook是他们的Python入门,并且使用非常广泛。

      1.为Spark创建一个iPython notebook配置

    ~$ ipython profile create spark
    [ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_config.py'
    [ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_notebook_config.py'
    [ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_nbconvert_config.py'
    

      记住配置文件的位置,替换下文各步骤相应的路径:

      2.创建文件$HOME/.ipython/profile_spark/startup/00-pyspark-setup.py,并添加如下代码:

     import os
     import sys
     
     # Configure the environment
     if 'SPARK_HOME' not in os.environ:
         os.environ['SPARK_HOME'] = '/srv/spark'
     
     # Create a variable for our root path
     SPARK_HOME = os.environ['SPARK_HOME']
     
     # Add the PySpark/py4j to the Python Path
     sys.path.insert(0, os.path.join(SPARK_HOME, "python", "build"))
     sys.path.insert(0, os.path.join(SPARK_HOME, "python"))
    

      3.使用我们刚刚创建的配置来启动IPython notebook。

    ~$ ipython notebook --profile spark
    

      4.在notebook中,你可以看到我们刚刚创建的变量。

    ~$ ipython notebook --profile spark
    

      5.在IPython notebook最上面,确保你添加了Spark context。

    print SPARK_HOME
    from pyspark import  SparkContext
    sc = SparkContext( 'local', 'pyspark')
    

      6.使用IPython做个简单的计算来测试Spark context。

    def isprime(n):
    """
    check if integer n is a prime
    """
    # make sure n is a positive integer
    n = abs(int(n))
    # 0 and 1 are not primes
    if n < 2:
        return False
    # 2 is the only even prime number
    if n == 2:
        return True
    # all other even numbers are not primes
    if not n & 1:
        return False
    # range starts with 3 and only needs to go up the square root of n
    # for all odd numbers
    for x in range(3, int(n**0.5)+1, 2):
        if n % x == 0:
            return False
    return True
     
    # Create an RDD of numbers from 0 to 1,000,000
    nums = sc.parallelize(xrange(1000000))
     
    # Compute the number of primes in the RDD
    print nums.filter(isprime).count()
    

      如果你能得到一个数字而且没有错误发生,那么你的context正确工作了!

      编辑提示:上面配置了一个使用PySpark直接调用IPython notebook的IPython context。但是,你也可以使用PySpark按以下方式直接启动一个notebook: $ IPYTHON_OPTS=”notebook –pylab inline” pyspark

      哪个方法好用取决于你使用PySpark和IPython的具体情景。前一个允许你更容易地使用IPython notebook连接到一个集群,因此是我喜欢的方法。

      四、在EC2上使用Spark

      在讲授使用Hadoop进行分布式计算时,发现有很多可以通过在本地伪分布式节点(pseudo-distributed node)或以单节点模式(single-node mode)讲授。为了了解真正发生的细节,就需要一个集群。当数据变得庞大,这些书面讲授的技能和真实计算需求间经常出现隔膜。如果想学习详细使用,建议设置一个快速Spark集群做一下实验,每周大概使用10小时包含5个slave和1个master的集群。

      以在Spark文档中找到完整的讨论:在EC2上运行Spark,决定购买EC2集群前一定要通读这篇文档!我列出了一些关键点:

      1.通过AWS Console获取AWS EC2 key对(访问key和密钥key)。

     

      2.将key对导出到你的环境中。在shell中敲出以下命令,或者将它们添加到配置中。

     export AWS_ACCESS_KEY_ID=myaccesskeyid
     export AWS_SECRET_ACCESS_KEY=mysecretaccesskey
    

      注意不同的工具使用不同的环境名称,确保你用的是Spark脚本所使用的名称。

      3.启动集群:

    ~$ cd $SPARK_HOME/ec2
    ec2$ ./spark-ec2 -k  -i  -s  launch 
    

      4.SSH到集群来运行Spark作业。

    ec2$ ./spark-ec2 -k  -i  login 

      5.销毁集群

    ec2$ ./spark-ec2 destroy .

      这些脚本会自动创建一个本地的HDFS集群来添加数据,copy-dir命令可以同步代码和数据到该集群。但是你最好使用S3来存储数据,创建使用s3://URI来加载数据的RDDs。

      五、Spark是什么?

      设置好了Spark,现在来讨论下Spark到底是什么。Spark就是一个通用的集群计算框架,通过将大量数据集计算任务分配到多台计算机上,提供高效内存计算。熟悉Hadoop的人都知道分布式计算框架要解决两个问题:如何分发数据和如何分发计算。Hadoop使用HDFS来解决分布式数据问题,MapReduce计算范式提供有效的分布式计算。类似的,Spark拥有多种语言的函数式编程API,提供了除map和reduce之外更多的运算符,这些操作是通过一个称作弹性分布式数据集(resilientdistributeddatasets,RDDs)的分布式数据框架进行的。

      本质上,RDD是种编程抽象,代表可以跨机器进行分割的只读对象集合。RDD可以从一个继承结构(lineage)重建(因此可以容错),通过并行操作访问,可以读写HDFS或S3这样的分布式存储,还可以缓存到worker节点的内存中进行立即重用。正因为RDD可以缓存到内存中,Spark对迭代应用特别有效,因为这些应用中,数据是在整个算法运算过程中都可以被重用。大多数机器学习和最优化算法都是迭代的,使得Spark对数据科学来说是个非常有效的工具。另外,由于Spark非常快,可以通过类似PythonREPL的命令行提示符交互式访问。

      Spark库本身包含很多应用元素,这些元素可以用到大部分大数据应用中,其中包括对大数据进行类似SQL查询的支持,机器学习和图算法,甚至对实时流数据的支持。

      核心组件如下:

      SparkCore:包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和SparkCore之上的。

      SparkSQL:提供通过ApacheHive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,SparkSQL查询被转换为Spark操作。熟悉Hive和HiveQL的更懂得使用Spark。

      SparkStreaming:允许对实时数据流进行处理和控制。很多实时数据库(如ApacheStore)可以处理实时数据。SparkStreaming允许程序能够像普通RDD一样处理实时数据。

      MLlib:一个常用机器学习算法库,算法被实现为对RDD的Spark操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。之前可选的大数据机器学习库Mahout,将会转到Spark,并在未来实现。

      GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展了RDDAPI,包含控制图、创建子图、访问路径上所有顶点的操作。

      这些组件不仅满足了很多大数据需求,还满足了很多数据科学任务的算法和计算上的需要,于是Spark快速流行起来。另外,Spark提供了使用Scala、Java和Python编写的API;满足了不同团体的需求,允许更多数据科学家简便地采用Spark作为他们的大数据解决方案。

      对Spark编程

      编写Spark应用与之前实现在Hadoop上的其他数据流语言类似。代码写入一个惰性求值的驱动程序(driverprogram)中,通过一个动作(action),驱动代码被分发到集群上,由各个RDD分区上的worker来执行,结果会被发送回驱动程序进行聚合或编译。本质上,驱动程序创建一个或多个RDD,调用操作来转换RDD,然后调用动作处理被转换后的RDD。

      这些步骤大体如下:

      定义一个或多个RDD,可以通过获取存储在磁盘上的数据(HDFS,Cassandra,HBase,LocalDisk),并行化内存中的某些集合,转换(transform)一个已存在的RDD,或者,缓存或保存。

      通过传递一个闭包(函数)给RDD上的每个元素来调用RDD上的操作。Spark提供了除了Map和Reduce的80多种高级操作。

      使用结果RDD的动作(action)(如count、collect、save等),动作将会启动集群上的计算。

      当Spark在一个worker上运行闭包时,闭包中用到的所有变量都会被复制到节点上,但是由闭包的局部作用域来维护。Spark提供了两种类型的共享变量,这些变量可以按照限定的方式被所有worker访问。广播变量会被分发给所有worker,但是是只读的。累加器这种变量,worker可以使用关联操作来“加”,通常用作计数器。

      Spark应用本质上通过转换和动作来控制RDD。后续文章将会深入讨论,但是理解了这个就足以执行下面的例子了。

      Spark的执行

      简略描述下Spark的执行。本质上,Spark应用作为独立的进程运行,由驱动程序中的SparkContext协调。这个context将会连接到一些集群管理者(如YARN),这些管理者分配系统资源。集群上的每个worker由执行者(executor)管理,执行者反过来由SparkContext管理。执行者管理计算、存储,还有每台机器上的缓存。

      重点要记住的是应用代码由驱动程序发送给执行者,执行者指定context和要运行的任务。执行者与驱动程序通信进行数据分享或者交互。驱动程序是Spark作业的主要参与者,因此需要与集群处于相同的网络。这与Hadoop代码不同,Hadoop中你可以在任意位置提交作业给JobTracker,JobTracker处理集群上的执行。

      六、与Spark交互

      使用Spark最简单的方式就是使用交互式命令行提示符。打开PySpark终端,在命令行中打出pyspark。

    ~$ pyspark
    [… snip …]
    >>>
    

      PySpark将会自动使用本地Spark配置创建一个SparkContext。你可以通过sc变量来访问它。我们来创建第一个RDD。

    >>> text = sc.textFile("shakespeare.txt")
    >>> print text
    shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
    

      textFile方法将莎士比亚全部作品加载到一个RDD命名文本。如果查看了RDD,你就可以看出它是个MappedRDD,文件路径是相对于当前工作目录的一个相对路径(记得传递磁盘上正确的shakespear.txt文件路径)。我们转换下这个RDD,来进行分布式计算的“helloworld”:“字数统计”。

    >>> from operator import add
    >>> def tokenize(text):
    ...     return text.split()
    ...
    >>> words = text.flatMap(tokenize)
    >>> print words
    PythonRDD[2] at RDD at PythonRDD.scala:43
    

      导入了的add操作符,它是个命名函数,可以作为加法的闭包来使用。1.首先把文本拆分为单词。2.创建一个tokenize函数,参数是文本片段,返回根据空格拆分的单词列表。3.通过给flatMap操作符传递tokenize闭包对textRDD进行变换创建了一个wordsRDD。这时会发现,words是个PythonRDD,但是执行本应该立即进行。显然,我们还没有把整个莎士比亚数据集拆分为单词列表。

      使用过MapReduce做过Hadoop版的“字数统计”的话,应该都知道下一步是将每个单词映射到一个键值对,其中键是单词,值是1,然后使用reducer计算每个键的1总数。

      首先,我们map一下。

    >>> wc = words.map(lambda x: (x,1))
    >>> print wc.toDebugString()
    (2) PythonRDD[3] at RDD at PythonRDD.scala:43
    |  shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
    |  shakespeare.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2
    

      这里使用了一个匿名函数(用了Python中的lambda关键字)而不是命名函数。这行代码将会把lambda映射到每个单词,即每个x都是一个单词,每个单词都会被匿名闭包转换为元组(word,1)。为了查看转换关系,我们使用toDebugString方法来查看PipelinedRDD是怎么被转换的。可以使用reduceByKey动作进行字数统计,然后把统计结果写到磁盘。

    >>> counts = wc.reduceByKey(add)
    >>> counts.saveAsTextFile("wc")
    

      只要最终调用了saveAsTextFile动作,就开始执行分布式作业了。在作业“跨集群地”(或者本机的其他进程)运行时,可以看到很多INFO语句。退出解释器,可以看到当前工作目录下有个“wc”目录。

    $ ls wc/
    _SUCCESS   part-00000 part-00001
    

      每个part文件都代表你本机上的进程计算得到的被保持到磁盘上的最终RDD。如果对一个part文件进行head命令,能看到字数统计元组。

    $ head wc/part-00000
    (u'fawn', 14)
    (u'Fame.', 1)
    (u'Fame,', 2)
    (u'kinghenryviii@7731', 1)
    (u'othello@36737', 1)
    (u'loveslabourslost@51678', 1)
    (u'1kinghenryiv@54228', 1)
    (u'troilusandcressida@83747', 1)
    (u'fleeces', 1)
    (u'midsummersnightsdream@71681', 1)
    

      注意这些键没有像Hadoop一样被排序(因为Hadoop中Map和Reduce任务中有个必要的打乱和排序阶段)。但是,能保证每个单词在所有文件中只出现一次,因为你使用了reduceByKey操作符。你还可以使用sort操作符确保在写入到磁盘之前所有的键都被排过序。

      七、编写一个Spark应用

      编写Spark应用与通过交互式控制台使用Spark类似。API是相同的。首先,你需要访问

      使用Spark编写Spark应用的一个基本模板如下:

    ## Spark Application - execute with spark-submit
     
    ## Imports
    from pyspark import SparkConf, SparkContext
     
    ## Module Constants
    APP_NAME = "My Spark Application"
     
    ## Closure Functions
     
    ## Main functionality
     
    def main(sc):
        pass
     
    if __name__ == "__main__":
        # Configure Spark
        conf = SparkConf().setAppName(APP_NAME)
        conf = conf.setMaster("local[*]")
        sc   = SparkContext(conf=conf)
     
        # Execute Main functionality
        main(sc)
    

      这个模板列出了一个Spark应用所需的东西:导入Python库,模块常量,用于调试和SparkUI的可识别的应用名称,还有作为驱动程序运行的一些主要分析方法学。在ifmain中创建了SparkContext,使用了配置好的context执行main就可以直接导入驱动代码到pyspark而不用执行。注意这里Spark配置通过setMaster方法被硬编码到SparkConf,一般允许这个值通过命令行来设置,所以你能看到这行做了占位符注释。

      使用

    ## Spark Application - execute with spark-submit
     
    ## Imports
    import csv
    import matplotlib.pyplot as plt
     
    from StringIO import StringIO
    from datetime import datetime
    from collections import namedtuple
    from operator import add, itemgetter
    from pyspark import SparkConf, SparkContext
     
    ## Module Constants
    APP_NAME = "Flight Delay Analysis"
    DATE_FMT = "%Y-%m-%d"
    TIME_FMT = "%H%M"
     
    fields   = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
                'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
    Flight   = namedtuple('Flight', fields)
     
    ## Closure Functions
    def parse(row):
        """
        Parses a row and returns a named tuple.
        """
     
        row[0]  = datetime.strptime(row[0], DATE_FMT).date()
        row[5]  = datetime.strptime(row[5], TIME_FMT).time()
        row[6]  = float(row[6])
        row[7]  = datetime.strptime(row[7], TIME_FMT).time()
        row[8]  = float(row[8])
        row[9]  = float(row[9])
        row[10] = float(row[10])
        return Flight(*row[:11])
     
    def split(line):
        """
        Operator function for splitting a line with csv module
        """
        reader = csv.reader(StringIO(line))
        return reader.next()
     
    def plot(delays):
        """
        Show a bar chart of the total delay per airline
        """
        airlines = [d[0] for d in delays]
        minutes  = [d[1] for d in delays]
        index    = list(xrange(len(airlines)))
     
        fig, axe = plt.subplots()
        bars = axe.barh(index, minutes)
     
        # Add the total minutes to the right
        for idx, air, min in zip(index, airlines, minutes):
            if min > 0:
                bars[idx].set_color('#d9230f')
                axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center')
            else:
                bars[idx].set_color('#469408')
                axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center')
     
        # Set the ticks
        ticks = plt.yticks([idx+ 0.5 for idx in index], airlines)
        xt = plt.xticks()[0]
        plt.xticks(xt, [' '] * len(xt))
     
        # minimize chart junk
        plt.grid(axis = 'x', color ='white', linestyle='-')
     
        plt.title('Total Minutes Delayed per Airline')
        plt.show()
     
    ## Main functionality
    def main(sc):
     
        # Load the airlines lookup dictionary
        airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect())
     
        # Broadcast the lookup dictionary to the cluster
        airline_lookup = sc.broadcast(airlines)
     
        # Read the CSV Data into an RDD
        flights = sc.textFile("ontime/flights.csv").map(split).map(parse)
     
        # Map the total delay to the airline (joined using the broadcast value)
        delays  = flights.map(lambda f: (airline_lookup.value[f.airline],
                                         add(f.dep_delay, f.arv_delay)))
     
        # Reduce the total delay for the month to the airline
        delays  = delays.reduceByKey(add).collect()
        delays  = sorted(delays, key=itemgetter(1))
     
        # Provide output from the driver
        for d in delays:
            print "%0.0f minutes delayed\t%s" % (d[1], d[0])
     
        # Show a bar chart of the delays
        plot(delays)
     
    if __name__ == "__main__":
        # Configure Spark
        conf = SparkConf().setMaster("local[*]")
        conf = conf.setAppName(APP_NAME)
        sc   = SparkContext(conf=conf)
     
        # Execute Main functionality
        main(sc)
    

      使用

    ~$ spark-submit app.py

      这个Spark作业使用本机作为master,并搜索app.py同目录下的ontime目录下的2个CSV文件。最终结果显示,4月的总延误时间(单位分钟),既有早点的(如果你从美国大陆飞往夏威夷或者阿拉斯加),但对大部分大型航空公司都是延误的。注意,我们在app.py中使用matplotlib直接将结果可视化出来了:

    spark应用

      这些代码起了什么作用?看看与Spark最直接相关的main函数。首先,我们加载CSV文件到RDD,然后把split函数映射给它。split函数使用csv模块解析文本的每一行,并返回代表每行的元组。最后,我们将collect(收集)动作传给RDD,这个动作把数据以Python列表的形式从RDD传回驱动程序。本例中,airlines.csv是个小型的跳转表(jumptable),可以将航空公司代码与全名对应起来。我们将转移表存储为Python字典,然后使用sc.broadcast广播给集群上的每个节点。

      接着,main函数加载了数据量更大的flights.csv([译者注]作者笔误写成fights.csv,此处更正)。拆分CSV行完成之后,我们将parse函数映射给CSV行,此函数会把日期和时间转成Python的日期和时间,并对浮点数进行合适的类型转换。每行作为一个NamedTuple保存,名为Flight,以便高效简便地使用。

      有了Flight对象的RDD,我们映射一个匿名函数,这个函数将RDD转换为一些列的键值对,其中键是航空公司的名字,值是到达和出发的延误时间总和。使用reduceByKey动作和add操作符可以得到每个航空公司的延误时间总和,然后RDD被传递给驱动程序(数据中航空公司的数目相对较少)。最终延误时间按照升序排列,输出打印到了控制台,并且使用matplotlib进行了可视化。

      这个例子稍长,但是希望能演示出集群和驱动程序之间的相互作用(发送数据进行分析,结果取回给驱动程序),以及Python代码在Spark应用中的角色。

      八、结论

      现在知道了Spark是什么、以及怎么使用进行快速、内存分布式计算,就应该可以运行Spark了,并开始在本机或AmazonEC2上探索数据。你应该可以配置好iPythonnotebook来运行Spark。

      Spark不能解决分布式存储问题(通常Spark从HDFS中获取数据),但是它为分布式计算提供了丰富的函数式编程API。这个框架建立在伸缩分布式数据集(RDD)之上。RDD是种编程抽象,代表被分区的对象集合,允许进行分布式操作。RDD有容错能力(可伸缩的部分),更重要的是,可以存储到节点上的worker内存里进行立即重用。内存存储提供了快速和简单表示的迭代算法,以及实时交互分析。

      由于Spark库提供了Python、Scale、Java编写的API,以及内建的机器学习、流数据、图算法、类SQL查询等模块;Spark迅速成为当今最重要的分布式计算框架之一。与YARN结合,Spark提供了增量,并没有替代已存在的Hadoop集群,它将成为未来大数据重要的一部分,为数据科学探索铺设了一条康庄大道。

      九、有用的相关链接

      以下是一些关于本文的有用链接,这些链接可能让你学习到Spark或者除此之外的更多知识。

      http://www.kokojia.com/article/13442.htmlSpark 2.0面纱半揭,相关细节引人想象

      http://www.kokojia.com/article/12091.html2分钟让你明白Hadoop和Spark它们之间的不同之处

      http://www.kokojia.com/article/5887.html采用Spark分析大数据

      http://www.kokojia.com/article/6345.html关于Storm、Spark和MapReduce开源分布式计算系统框架的比较

      http://www.kokojia.com/article/6362.html使用ApacheSpark和MySQL开创强大的数据分析

      本节讲述Spark入门设置就到此,欲了解更多spark知识,请关注课课家教育平台,为你讲解如何使用spark进行高级分析。

python 更多推荐

未登录

1