Python教程

A small instance of visual analytics basing Spark(Python)

本文主要是介绍A small instance of visual analytics basing Spark(Python),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
A small instance of visual analytics basing Spark(Python)

The total delay time of the major airlines in a certain month


1.Preparation

1.1.Data

      This data set was downloaded from the U.S. Department of Transportation, Office of the Secretary of Research on November 30, 2014 and represents flight data for the domestic United States in April of 2014.

      The following CSV files are lookup tables:

    - airlines.csv

    - airports.csv

      And provided detailed information about the reference codes in the main data set. These files have header rows to identify their fields.

      The flights.csv contains flight statistics for April 2014 with the following fields:

- flight date     (yyyy-mm-dd)

- airline id      (lookup in airlines.csv)

- flight num

- origin          (lookup in airports.csv)

- destination     (lookup in airports.csv)

- departure time  (HHMM)

- departure delay (minutes)

- arrival time    (HHMM)

- arrival delay   (minutes)

- air time        (minutes)

- distance        (miles)

 

1.2.Codes—a basic template

     A basic template for writing a Spark application in Python is as follows:

## Spark Application - execute with spark-submit

## Imports

from pyspark import SparkConf, SparkContext

## Module Constants

APP_NAME = "My Spark Application"

## Closure Functions

## Main functionality

def main(sc):

    pass

if __name__ == "__main__":

    # Configure Spark

    conf = SparkConf().setAppName(APP_NAME)

    conf = conf.setMaster("local[*]")

    sc   = SparkContext(conf=conf)

    # Execute Main functionality

    main(sc)


 

1.3. Codes—a basic template

The entire app is as follows:

 
## Spark Application - execute with spark-submit
## Imports
import csv
import matplotlib.pyplot as plt
from StringIO import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from pyspark import SparkConf, SparkContext
## Module Constants
APP_NAME = "Flight Delay Analysis"
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"
fields   = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
            'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
Flight   = namedtuple('Flight', fields)
## Closure Functions
def parse(row):
    """
    Parses a row and returns a named tuple.
    """
    row[0]  = datetime.strptime(row[0], DATE_FMT).date()
    row[5]  = datetime.strptime(row[5], TIME_FMT).time()
    row[6]  = float(row[6])
    row[7]  = datetime.strptime(row[7], TIME_FMT).time()
    row[8]  = float(row[8])
    row[9]  = float(row[9])
    row[10] = float(row[10])
    return Flight(*row[:11])
def split(line):
    """
    Operator function for splitting a line with csv module
    """
    reader = csv.reader(StringIO(line))
    return reader.next()
def plot(delays):
    """
    Show a bar chart of the total delay per airline
    """
    airlines = [d[0] for d in delays]
    minutes  = [d[1] for d in delays]
    index    = list(xrange(len(airlines)))
    fig, axe = plt.subplots()
    bars = axe.barh(index, minutes)
    # Add the total minutes to the right
    for idx, air, min in zip(index, airlines, minutes):
        if min > 0:
            bars[idx].set_color('#d9230f')
            axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center')
        else:
            bars[idx].set_color('#469408')
            axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center')
    # Set the ticks
    ticks = plt.yticks([idx+ 0.5 for idx in index], airlines)
    xt = plt.xticks()[0]
    plt.xticks(xt, [' '] * len(xt))
    # minimize chart junk
    plt.grid(axis = 'x', color ='white', linestyle='-')
    plt.title('Total Minutes Delayed per Airline')
    plt.show()
## Main functionality
def main(sc):
    # Load the airlines lookup dictionary
    airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect())
    # Broadcast the lookup dictionary to the cluster
    airline_lookup = sc.broadcast(airlines)
    # Read the CSV Data into an RDD
    flights = sc.textFile("ontime/flights.csv").map(split).map(parse)
    # Map the total delay to the airline (joined using the broadcast value)
    delays  = flights.map(lambda f: (airline_lookup.value[f.airline],
                                     add(f.dep_delay, f.arv_delay)))
    # Reduce the total delay for the month to the airline
    delays  = delays.reduceByKey(add).collect()
    delays  = sorted(delays, key=itemgetter(1))
    # Provide output from the driver
    for d in delays:
        print "%0.0f minutes delayed\t%s" % (d[1], d[0])
    # Show a bar chart of the delays
    plot(delays)
if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setMaster("local[*]")
    conf = conf.setAppName(APP_NAME)
    sc   = SparkContext(conf=conf)
    # Execute Main functionality
    main(sc)

 

 

1.4. equipments

     A Ubuntu computer with spark、jdk、Scala

 

2.Steps

2.1.overview

data

codes

 

2.2. use the spark-submit command as follows:

 

2.3.result

2.4.Analysis

       what is this code doing? Let's look particularly at the main function which does the work most directly related to Spark. First, we load up a CSV file into an RDD, then map the split function to it. The split function parses each line of text using the csvmodule and returns a tuple that represents the row. Finally we pass the collect action to the RDD, which brings the data from the RDD back to the driver as a Python list. In this case, airlines.csv is a small jump table that will allow us to join airline codes with the airline full name. We will store this jump table as a Python dictionary and then broadcast it to every node in the cluster using sc.broadcast.

       Next, the main function loads the much larger flights.csv. After splitting the CSV rows, we map the parse function to the CSV row, which converts dates and times to Python dates and times, and casts floating point numbers appropriately. It also stores the row as a NamedTuple called Flight for efficient ease of use.

       With an RDD of Flight objects in hand, we map an anonymous function that transforms the RDD to a series of key-value pairs where the key is the name of the airline and the value is the sum of the arrival and departure delays. Each airline has its delay summed together using the reduceByKey action and the add operator, and this RDD is collected back to the driver (again the number airlines in the data is relatively small). Finally the delays are sorted in ascending order, then the output is printed to the console as well as visualized using matplotlib.

3.Q&A

3.1.ImportError: No module named matplotlib.pyplot

http://www.codeweblog.com/importerror-no-module-named-matplotlib-pyplot/

 

 

Note:

this demo came initially from the website

https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python

I also find the chinese version

http://blog.jobbole.com/86232/

data come from github

https://github.com/bbengfort/hadoop-fundamentals/blob/master/data/ontime.zip



这篇关于A small instance of visual analytics basing Spark(Python)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!