1.概述
Apache Spark是一个快速的分布式数据处理系统。它执行内存中的数据处理,并使用内存中的缓存和优化的执行,从而实现了快速的性能。它为流行的编程语言(例如Scala,Python,Java和R)提供了高级API。
在本快速教程中,我们将介绍Spark的三个基本概念:数据帧,数据集和RDD。
2.数据框
从Spark 1.3开始,Spark SQL引入了表格形式的数据抽象,称为DataFrame。从那时起,它已成为Spark中最重要的功能之一。当我们要处理结构化和半结构化的分布式数据时,此API很有用。
在第3节中,我们将讨论弹性分布式数据集(RDD)。 DataFrame以比RDD更有效的方式存储数据,这是因为DataFrame使用RDD的不变,内存,弹性,分布式和并行功能,但它们也将架构应用于数据。 DataFrames还可以将SQL代码转换为优化的低级RDD操作。
我们可以通过三种方式创建DataFrames:
转换现有的RDD
运行SQL查询
加载外部数据
Spark团队SparkSession ,它统一了所有不同的上下文,从而确保开发人员无需担心创建不同的上下文:
SparkSession session = SparkSession.builder()
.appName("TouristDataFrameExample")
.master("local[*]")
.getOrCreate();
DataFrameReader dataFrameReader = session.read();我们将分析Tourist.csv文件:
Dataset<Row> data = dataFrameReader.option("header", "true")
.csv("data/Tourist.csv");由于Spark 2.0 DataFrame成为Row类型Dataset ,因此我们可以将DataFrame用作**Dataset<Row>** .
我们可以选择感兴趣的特定列。我们还可以对给定的列进行过滤和分组:
data.select(col("country"), col("year"), col("value"))
.show();
data.filter(col("country").equalTo("Mexico"))
.show();
data.groupBy(col("country"))
.count()
.show();3. Datasets
数据集是一组强类型的结构化数据。它们提供了熟悉的面向对象编程风格以及类型安全性的好处,因为数据集可以在编译时检查语法并捕获错误。
Dataset是DataFrame的扩展,因此我们可以将DataFrame视为数据集的无类型视图。
Spark团队Dataset API,正如他们提到的那样:“ Spark Datasets的目标是提供一个API,使用户可以轻松地表达对象域上的转换,同时还提供Spark SQL执行的性能和鲁棒性优势。引擎”。
首先,我们需要创建一个类型TouristData的类:
public class TouristData { private String region; private String country; private String year; private String series; private Double value; private String footnotes; private String source; // ... getters and setters
}要将每个记录映射到指定的类型,我们将需要使用编码器。编码器在Java对象和Spark的内部二进制格式之间进行转换:
// SparkSession initialization and data load
Dataset<Row> responseWithSelectedColumns = data.select(col("region"),
col("country"), col("year"), col("series"), col("value").cast("double"),
col("footnotes"), col("source"));
Dataset<TouristData> typedDataset = responseWithSelectedColumns
.as(Encoders.bean(TouristData.class));与DataFrame一样,我们可以按特定的列进行过滤和分组:
typedDataset.filter((FilterFunction) record -> record.getCountry()
.equals("Norway"))
.show();
typedDataset.groupBy(typedDataset.col("country"))
.count()
.show();我们还可以进行操作,例如按列匹配特定范围进行过滤或计算特定列的总和,以获取其总值:
typedDataset.filter((FilterFunction) record -> record.getYear() != null
&& (Long.valueOf(record.getYear()) > 2010
&& Long.valueOf(record.getYear()) < 2017)).show();
typedDataset.filter((FilterFunction) record -> record.getValue() != null
&& record.getSeries()
.contains("expenditure"))
.groupBy("country")
.agg(sum("value"))
.show();4. RDD
弹性分布式数据集或RDD是Spark的主要编程抽象。它表示元素的集合,这些元素是不可变的,有弹性的和分布式的。
一个RDD封装了一个大型数据集,Spark将自动在整个集群中分布RDD中包含的数据,并使我们对它们执行的操作并行化。
我们只能通过稳定存储中的数据操作或其他RDD上的操作来创建RDD。
当我们处理大量数据并且数据分布在群集计算机上时,容错能力至关重要。由于Spark内置的故障恢复机制,因此RDD具有弹性。 Spark依赖于以下事实:RDD会记住它们的创建方式,以便我们可以轻松地追溯沿袭来恢复分区。
我们可以对RDD执行两种类型的操作: Transformations和Actions 。
4.1 转变
我们可以将转换应用于RDD以操纵其数据。执行完此操作后,我们将获得全新的RDD,因为RDD是不可变的对象。
我们将检查如何实现Map和Filter,这是两种最常见的转换。
首先,我们需要创建一个JavaSparkContext Tourist.csv文件中将数据作为RDD加载:
SparkConf conf = new SparkConf().setAppName("uppercaseCountries")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> tourists = sc.textFile("data/Tourist.csv");接下来,让我们应用map函数从每个记录中获取国家的名称,并将名称转换为大写。我们可以将此新生成的数据集保存为磁盘上的文本文件:
JavaRDD<String> upperCaseCountries = tourists.map(line -> {
String[] columns = line.split(COMMA_DELIMITER); return columns[1].toUpperCase();
}).distinct();
upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");如果只想选择一个特定国家/地区,则可以对原始游客RDD应用过滤功能:
JavaRDD<String> touristsInMexico = tourists
.filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico"));
touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");4.2 动作
在对数据进行一些计算之后,动作将返回最终值或将结果保存到磁盘。
Spark中经常使用的两个动作是Count和Reduce。
让我们在CSV文件中计算国家总数:
// Spark Context initialization and data load
JavaRDD<String> countries = tourists.map(line -> {
String[] columns = line.split(COMMA_DELIMITER); return columns[1];
}).distinct();
Long numberOfCountries = countries.count();现在,我们将按国家/地区计算总支出。我们需要过滤描述中包含支出的记录。
如果不使用的JavaRDD ,我们将使用JavaPairRDD 。一对RDD是可以存储键值对的一种RDD 。接下来让我们检查一下:
JavaRDD<String> touristsExpenditure = tourists
.filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure"));
JavaPairRDD<String, Double> expenditurePairRdd = touristsExpenditure
.mapToPair(line -> {
String[] columns = line.split(COMMA_DELIMITER); return new Tuple2<>(columns[1], Double.valueOf(columns[6]));
});
List<Tuple2<String, Double>> totalByCountry = expenditurePairRdd
.reduceByKey((x, y) -> x + y)
.collect();5.结论
综上所述,当我们需要特定于域的API,需要聚合,求和或SQL查询等高级表达式时,应使用DataFrames或Datasets。或者,当我们想要在编译时进行类型安全时。
另一方面,当数据是非结构化的并且不需要实现特定的架构时,或者在需要低级的转换和操作时,我们应该使用RDD。
0 评论