Fork me on GitHub

Apache Beam使用简介

创建Pipeline

管道抽象封装了数据处理任务中的所有数据和步骤。通常从构建一个Pipeline对象开始,然后使用该对象作为创建管道数据集PCollections的基础,并对其作Transforms操作。

1
2
3
4
5
6
7
8
9
10
// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();

// Then create the pipeline.
Pipeline p = Pipeline.create(options);

或者通过命令行参数的方式:
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().create();
// 参数的传递方式 --<option>=<value>

也可以通过如下的方式创建自定义的参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface MyOptions extends PipelineOptions {
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
//或者
public interface MyOptions extends PipelineOptions {
@Description("My custom command line argument.")
@Default.String("DEFAULT")
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}

//通过如下方式使用:

PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation().as(MyOptions.class);
//-myCustomOption=value

创建PCollection数据

PCollection抽象表示分布式数据集。

PCollection 特性

  • 元素类型
  • 不变性
  • 不支持随机访问
  • Size和边界
  • 元素时间戳

PCollection的使用

使用比较多的方式是从外部数据源读取。例如从外部文件中读取:

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
// Create the pipeline.
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);

// Create the PCollection 'lines' by applying a 'Read' transform.
PCollection<String> lines = p.apply(
"ReadMyFile", TextIO.read().from("protocol://path/to/some/inputData.txt"));
}

当然也支持从内存中读取的方式,不过这种方式一般只会出现在调试程序分中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) {
// Create a Java Collection, in this case a List of Strings.
static final List<String> LINES = Arrays.asList(
"To be, or not to be: that is the question: ",
"Whether 'tis nobler in the mind to suffer ",
"The slings and arrows of outrageous fortune, ",
"Or to take arms against a sea of troubles, ");

// Create the pipeline.
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);

// Apply Create, passing the list and the coder, to create the PCollection.
p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())
}

PCollection的Transforms采用如下的语法:

1
2
3
4
5
[Output PCollection] = [Input PCollection].apply([Transform])
或者采用链式的方式将多个Transform连接起来
[Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
.apply([Second Transform])
.apply([Third Transform])

PCollection的Transforms中比较核心方法如下:

  • ParDo
  • GroupByKey
  • CoGroupByKey
  • Combine
  • Flatten
  • Partition

使用ParDo

ParDo处理范例与Map / Shuffle / Reduce样式算法的“Map”阶段相似:ParDo转换考虑了输入PCollection中的每个元素,对该元素执行一些处理函数(用户代码),并输出0个,1个或多个元素到输出PCollection。
在PCollection 上调用apply 方法,用ParDo 作为参数,如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 元素类型为字符串类型的输入PCollection
PCollection<String> words = ...;

// DoFn子类,用来具体计算每1个元素的长度
static class ComputeWordLengthFn extends DoFn<String, Integer> {
@ProcessElement
public void processElement(ProcessContext c) {
// Get the input element from ProcessContext.
String word = c.element();
// Use ProcessContext.output to emit the output element.
c.output(word.length());
}
}

// 使用ParDo计算PCollection "words" 中每一个单词的长度
PCollection<Integer> wordLengths = words.apply(
ParDo
.of(new ComputeWordLengthFn()));

注意: 如果 PCollection 的元素是key/value键值对,可以通过ProcessContext.element().getKey()获取键(key), ProcessContext.element().getValue()获取值(value)

事实上在编程过程中我们更多可能会采用是通过匿名内部类的方式来实现业务的逻辑代码(如下):

1
2
3
4
5
6
7
8
9
10
11
12
13
// 输入PCollection.
PCollection<String> words = ...;

// 创建一个匿名类处理PCollection “words”.
// 输出单词的长度到新的输出PCollection
PCollection<Integer> wordLengths = words.apply(
"ComputeWordLengths",// Transform 的自定义名称
ParDo.of(new DoFn<String, Integer>() {// DoFn作为匿名内部类
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().length());
}
}));

如果我们只是想对PCollection的元素做一对一对转换,我们可以通过如下更简单的方式实现:

1
2
3
4
5
6
7
8
// 输入PCollection.
PCollection<String> words = ...;

// 在MapElements中使用匿名lambda函数处理 PCollection “words”.
//输出单词的长度到新的输出PCollection.
PCollection<Integer> wordLengths = words.apply(
MapElements.into(TypeDescriptors.integers())
.via((String word) -> word.length()));

使用GroupByKey

GroupByKey 是一个用于处理键/值对集合的Bean Transform,是一个并行Reduce操作,类似于Map / Shuffle / Reduce-style算法的Shuffle阶段。 GroupByKey 的输入是表示多重映射的键/值对的集合,其中集合包含具有相同键但具有不同值的多个对。给定这样的集合,可以使用GroupByKey 来收集与每个唯一键相关联的所有值。

例如我们的PCollection是一个如下所示的KV形式:

1
2
3
4
5
6
7
8
9
10
11
cat, 1
dog, 5
and, 1
jump, 3
tree, 2
cat, 5
dog, 2
and, 2
cat, 9
and, 6
...

通过GroupByKey会得到如下的新的PCollection

1
2
3
4
5
6
cat, [1,5,9]
dog, [5,2]
and, [1,2,6]
jump, [3]
tree, [2]
...

需要注意的是,对于有限的数据集我们可以直接调用如上的方法,但是对于流式数据来说,数据是没有边界的,因此就需要我们天加上窗口话或者触发器。在使用窗口话或者触发器时也要保证和已经设置的创建期一致,否则会在Pipeline创建的时候报:IllegalStateException error 。

CoGroupByKey

CoGroupByKey一般使用在多个PCollection的join操作中,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// collection 1
user1, address1
user2, address2
user3, address3

// collection 2
user1, order1
user1, order2
user2, order3
guest, order4
...

通过CoGroupByKey的操作后变为:
user2, [[address2], [order3]]
user3, [[address3], []]
guest, [[], [order4]]
...

Combine

Combine是一种用于组合数据中元素或值集合的Beam Transform。Combine有一种实现是对键值对PCollection进行处理,根据键值对中的键组合值。

应用Combine Transform时,必须提供一个函数用于组合元素或者键值对中的值。组合函数应该满足交换律和结合律,因为函数不一定在给定键的所有值上精确调用一次。由于输入数据(包括价值收集)可以分布在多个worker之间,所以在每个worker上都会计算出部分结果,所以可以多次调用Combine函数,以在值集合的子集上执行部分组合。Beam SDK还提供了一些预先构建的组合功能,用来对数值型的PCollection进行组合,如sum,min和max。
例如如下一个简单的实现求和的方法:

1
2
3
4
5
6
7
8
9
10
11
// Sum a collection of Integer values. The function SumInts implements the interface SerializableFunction.
public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> {
@Override
public Integer apply(Iterable<Integer> input) {
int sum = 0;
for (int item : input) {
sum += item;
}
return sum;
}
}

对于比较复杂的计算就需要我们自己实现的方法了:通过实现子类CombineFn

使用CombineFn 实现高级组合

通过继承CombineFn 类可以实现复杂的组合功能。如需要一个复杂的累加器,必须进行预处理或者后处理,输出的类型和输入的类型不一样,组合的时候需要考虑键值对的键(key)等,则需要使用CombineFn来实现。

组合由4种操作组成。当实现一个CombineFn 的子类的时候必须重写这4个操作:

  • 创建累加器
    创建一个本地变量 accumulator。以计算均值为例,accumulator变量需要记录当前的总和和元素个数,在分布式环境下,具体会执行多少次,在哪台机器上执行都是不确定的。
  • 添加输入
    把一个新的输入元素追加到accumulator,返回accumulator的值。在本例中,会更新总和,然后元素个数+1。这个操作也可能是并行执行的。
  • 合并累加器Merge Accumulators
    因为是分布式的,所以每个机器上计算了部分结果,在最终输出结果之前,需要将所有的局部累加器合并起来。
  • 输出结果Extract Output
    最终计算出全局的均值,该操作只会在合并累加器的基础上执行1次。
    代码实现如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
public static class Accum {
int sum = 0;
int count = 0;
}

@Override
public Accum createAccumulator() { return new Accum(); }

@Override
public Accum addInput(Accum accum, Integer input) {
accum.sum += input;
accum.count++;
return accum;
}

@Override
public Accum mergeAccumulators(Iterable<Accum> accums) {
Accum merged = createAccumulator();
for (Accum accum : accums) {
merged.sum += accum.sum;
merged.count += accum.count;
}
return merged;
}

@Override
public Double extractOutput(Accum accum) {
return ((double) accum.sum) / accum.count;
}
}

对以key分组的集合进行组合计算

形如如下的值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
cat, [1,5,9]
dog, [5,2]
and, [1,2,6]
jump, [3]
tree, [2]
...
*/

// PCollection is grouped by key and the Double values associated with each key are combined into a Double.
PCollection<KV<String, Double>> salesRecords = ...;
PCollection<KV<String, Double>> totalSalesPerPerson =
salesRecords.apply(Combine.<String, Double, Double>perKey(
new Sum.SumDoubleFn()));

// The combined value is of a different type than the original collection of values per key. PCollection has
// keys of type String and values of type Integer, and the combined value is a Double.
PCollection<KV<String, Integer>> playerAccuracy = ...;
PCollection<KV<String, Double>> avgAccuracyPerPlayer =
playerAccuracy.apply(Combine.<String, Integer, Double>perKey(
new MeanInts())));

Flatten

Flatten用于将多个具有相同之类型的PCollection进行合并成一个PCollection。

1
2
3
4
5
6
7
8
// Flatten takes a PCollectionList of PCollection objects of a given type.
// Returns a single PCollection that contains all of the elements in the PCollection objects in that list.
PCollection<String> pc1 = ...;
PCollection<String> pc2 = ...;
PCollection<String> pc3 = ...;
PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);

PCollection<String> merged = collections.apply(Flatten.<String>pCollections());

默认情况下结果的Coder与第一个PCollection相同。

Partition

Partition将单个PCollection拆分为固定数量的较小集合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Provide an int value with the desired number of result partitions, and a PartitionFn that represents the
// partitioning function. In this example, we define the PartitionFn in-line. Returns a PCollectionList
// containing each of the resulting partitions as individual PCollection objects.
PCollection<Student> students = ...;
// Split students up into 10 partitions, by percentile:
PCollectionList<Student> studentsByPercentile =
students.apply(Partition.of(10, new PartitionFn<Student>() {
public int partitionFor(Student student, int numPartitions) {
return student.getPercentile() // 0..99
* numPartitions / 100;
}}));

// You can extract each partition from the PCollectionList using the get method, as follows:
PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);

编写Beam Transform的通用要求

当编写一个Beam Transform代码时,需要理解最终的代码是要分布式执行的。 例如,编写的代码,会生成很多副本,在不同的机器上并行执行,相互独立,而无需与任何其他机器上的副本通信或共享状态。 根据Pipeline的执行引擎,可以选择Pipeline,代码的每个副本可能会重试或运行多次。 因此应该谨慎地在代码中包括状态依赖关系。
简单来说,编写的代码至少要满足以下两个要求:

• 函数必须是可序列化的。
• 函数必须是线程兼容的,Beam SDK并不是线程安全的。
除了一样的要求,强烈建议函数是满足幂等特性

序列化Serializability

提供给Transform的任何函数必须是完全可序列化的。 这是因为函数的副本需要序列化并传输到处理集群中的远程worker。 用户代码的父类,如DoFn,CombineFn和WindowFn,已经实现了Serializable; 但是,在子类不能添加任何不可序列化的成员。

需要时刻记住的序列化要点如下:
• 函数对象中的瞬态字段不会传输到工作实例,因为它们不会自动序列化。
• 在序列化之前避免加载大量数据的字段。
• 函数对象实例之间不能共享数据。
• 函数对象在应用后会变得无效。
• 通过使用匿名内部类实例来内联声明函数对象时要小心。 在非静态上下文中,内部类实例将隐含地包含一个指向封闭类和该类的状态的指针。 该内部类也将被序列化,因此适用于函数对象本身的相同注意事项也适用于此外部类。

线程兼容Thread-compatibility

编写的函数应该兼容线程的特性。在执行时,每一个worker会启动一个线程执行代码,如果想实现多线程,需要在代码中自己实现。但是Beam SDK不是线程安全的,所以实现多线程需要开发者自己控制同步。注意,静态变量并不会序列化传递到不同的worker上,还可能会被多个线程使用。

幂等Idempotence

强烈建议开发者编写的函数符合幂等性—即无论重复执行多少次都不会带来意外的副作用。Beam模型中,并不能保证函数的执行次数,鉴于此,符合幂等性,可以让Pipeline的是确定的,Transform的行为是可预测的,更容易测试。

窗口函数

• 固定时间窗口Fixed Time Windows
• 滑动时间窗口Sliding Time Windows
• 会话窗口Per-Session Windows
• 单一全局窗口Single Global Window
• 基于日历的时间窗口Calendar-based Windows

使用固定时间窗口

以下示例代码显示了如何1分钟长度的固定时间窗口应用在PCollection上:

1
2
3
PCollection<String> items = ...;
PCollection<String> fixed_windowed_items = items.apply(
Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));

使用滑动时间窗口

以下示例代码显示了如何使用滑动时间窗口将PCollection切分。 每个窗口长度为30分钟,每5秒钟开1个新窗口:

1
2
3
PCollection<String> items = ...;
PCollection<String> sliding_windowed_items = items.apply(
Window.<String>into(SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5))));

使用会话时间窗口

以下示例代码显示了如何使用会话窗口切分PCollection,最小的时间跨度为10分钟:

1
2
3
PCollection<String> items = ...;
PCollection<String> session_windowed_items = items.apply(
Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(10))));

注意:会话窗口首先是基于key的,每个key有自己的会话窗口,有多少个会话窗口,取决于数据在时间上的分布。

使用单一全局窗口

如果您的PCollection是有限的(大小是固定的),可以将所有元素分配给单一全局窗口。 以下示例代码显示如何为PCollection设置单一全局窗口:

1
2
3
PCollection<String> items = ...;
PCollection<String> batch_items = items.apply(
Window.<String>into(new GlobalWindows()));

-------------本文结束感谢您的阅读-------------
坚持技术分享,您的支持将鼓励我继续创作!